package com.kdgcsoft.plugin.collect.kafka.reader;

import com.alibaba.fastjson2.JSONObject;
import com.kdgcsoft.plugin.api.DataNumEstimateType;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.data.IDataReadPlugin;
import com.kdgcsoft.plugin.api.message.MessageBoxWrapper;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.api.record.Item;
import com.kdgcsoft.plugin.api.record.ItemType;
import com.kdgcsoft.plugin.api.record.Record;
import com.kdgcsoft.plugin.kafka.common.KafkaConnector;
import com.kdgcsoft.plugin.kafka.common.KafkaResourcePluginParam;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

/* loaded from: input_file:com/kdgcsoft/plugin/collect/kafka/reader/KafkaReaderPlugin.class */
public class KafkaReaderPlugin extends Plugin {

    @Extension
    /* loaded from: input_file:com/kdgcsoft/plugin/collect/kafka/reader/KafkaReaderPlugin$KafkaIDataReadPlugin.class */
    public static class KafkaIDataReadPlugin extends MessageBoxWrapper implements IDataReadPlugin, ExtensionPoint {
        private static final int CAPACITY = 65536;
        private static final int MIN_BATCH_SIZE = 500;
        private KafkaConsumer<String, String> consumer;
        private BlockingQueue<String> queue;
        private AtomicInteger counter;
        private volatile boolean stopped;
        private ExecutorService es;
        private KafkaReaderPluginParam pluginParam;
        private ReadWriteLock readWriteLock;
        private boolean continuousListen;
        private PluginContext context;

        public void init(PluginContext pluginContext, PluginParam pluginParam) throws Exception {
            this.pluginParam = (KafkaReaderPluginParam) pluginParam;
            this.context = pluginContext;
            KafkaResourcePluginParam resourcePluginParam = pluginContext.resourcePluginParam(KafkaResourcePluginParam.class, this.pluginParam.getResourceCode());
            long currentTimeMillis = System.currentTimeMillis();
            this.consumer = KafkaConnector.getConsumer(resourcePluginParam);
            long currentTimeMillis2 = System.currentTimeMillis();
            this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), this.pluginParam.getResourceCode());
            this.mb.writeLog("连接Kafka Consumer耗时: {}ms", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            this.consumer.subscribe(Collections.singletonList(this.pluginParam.getTopic()));
            this.queue = new ArrayBlockingQueue(CAPACITY);
            this.counter = new AtomicInteger(0);
            this.readWriteLock = new ReentrantReadWriteLock();
            this.es = Executors.newSingleThreadExecutor();
            this.mb.writeLog("监听等待时间：{}", new Object[]{Integer.valueOf(this.pluginParam.getListenWaitTime())});
            this.continuousListen = this.pluginParam.getListenWaitTime() <= 0;
            long millis = this.continuousListen ? Long.MAX_VALUE : TimeUnit.MINUTES.toMillis(this.pluginParam.getListenWaitTime());
            this.es.execute(() -> {
                AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis());
                while (!this.stopped) {
                    Lock readLock = this.readWriteLock.readLock();
                    readLock.lock();
                    try {
                        ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(5L));
                        readLock.unlock();
                        if (null != poll && !poll.isEmpty()) {
                            atomicLong.set(System.currentTimeMillis());
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                try {
                                    this.queue.put((String) ((ConsumerRecord) it.next()).value());
                                } catch (Exception e) {
                                    this.mb.writeExceptionLog(e);
                                }
                            }
                        } else if (!this.continuousListen && System.currentTimeMillis() - atomicLong.get() >= millis) {
                            this.stopped = true;
                        }
                    } catch (Throwable th) {
                        readLock.unlock();
                        throw th;
                    }
                }
            });
        }

        public boolean hasNext() throws Exception {
            return (this.stopped && this.queue.size() == 0) ? false : true;
        }

        public Record next() throws Exception {
            String take = this.continuousListen ? this.queue.take() : this.queue.poll(1L, TimeUnit.MINUTES);
            if (null == take) {
                return null;
            }
            try {
                Set<Map.Entry> entrySet = JSONObject.parseObject(take).entrySet();
                Record record = new Record();
                for (Map.Entry entry : entrySet) {
                    record.add(new Item((String) entry.getKey(), ItemType.STRING, entry.getValue().toString()));
                }
                if (this.counter.incrementAndGet() >= MIN_BATCH_SIZE) {
                    Lock writeLock = this.readWriteLock.writeLock();
                    writeLock.lock();
                    try {
                        this.consumer.commitSync();
                        writeLock.unlock();
                        this.counter.set(0);
                    } catch (Throwable th) {
                        writeLock.unlock();
                        throw th;
                    }
                }
                return record;
            } catch (Exception e) {
                this.mb.writeExceptionLog(e);
                throw new IllegalArgumentException("不可识别的数据：" + take);
            }
        }

        public long estimateDataNum() {
            return 0L;
        }

        public DataNumEstimateType estimateType() {
            return DataNumEstimateType.CANNOT_ESTIMATE;
        }

        public void begin() throws Exception {
        }

        public void end() throws Exception {
            this.stopped = true;
            if (null != this.es) {
                this.es.shutdown();
                this.es = null;
            }
            if (null != this.consumer) {
                this.consumer.close();
                this.consumer = null;
                this.mb.sendResourceConnectInfo(this.context.flowCode(), this.context.taskCode(), this.context.jobCode(), this.pluginParam.getResourceCode());
            }
            this.mb.writeLog("Kafka读插件停止读取Topic[{}]消息.", new Object[]{this.pluginParam.getTopic()});
        }

        public PluginType type() {
            return PluginType.READER;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return KafkaReaderPluginParam.class;
        }

        public String configComponent() {
            return "KafkaReaderConfigForm";
        }
    }

    public KafkaReaderPlugin(PluginWrapper pluginWrapper) {
        super(pluginWrapper);
    }
}
