package com.kdgcsoft.plugin.collect.rabbitmq.reader;

import com.alibaba.fastjson2.JSONObject;
import com.kdgcsoft.plugin.api.DataNumEstimateType;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.data.IDataReadPlugin;
import com.kdgcsoft.plugin.api.message.MessageBoxWrapper;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.api.record.Item;
import com.kdgcsoft.plugin.api.record.ItemType;
import com.kdgcsoft.plugin.api.record.Record;
import com.kdgcsoft.plugin.rabbit.common.RabbitConnector;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

/* loaded from: input_file:com/kdgcsoft/plugin/collect/rabbitmq/reader/RabbitMQReaderPlugin.class */
public class RabbitMQReaderPlugin extends Plugin {

    @Extension
    /* loaded from: input_file:com/kdgcsoft/plugin/collect/rabbitmq/reader/RabbitMQReaderPlugin$RabbitMQIDataReadPlugin.class */
    public static class RabbitMQIDataReadPlugin extends MessageBoxWrapper implements IDataReadPlugin, ExtensionPoint {
        private static final int CAPACITY = 65536;
        private RabbitMQReaderPluginParam pluginParam;
        private Connection connection;
        private Channel channel;
        private BlockingQueue<MessageTuple> queue;
        private AtomicLong lastUpdate;
        private volatile boolean stopped = false;
        private boolean continuousListen;
        private long maxWaitTime;
        private PluginContext context;

        public void init(PluginContext pluginContext, PluginParam pluginParam) throws Exception {
            this.pluginParam = (RabbitMQReaderPluginParam) pluginParam;
            this.context = pluginContext;
            this.queue = new ArrayBlockingQueue(CAPACITY);
            long currentTimeMillis = System.currentTimeMillis();
            this.connection = RabbitConnector.connectionFactory(pluginContext, this.pluginParam.getResourceCode()).newConnection();
            long currentTimeMillis2 = System.currentTimeMillis();
            this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), this.pluginParam.getResourceCode());
            this.mb.writeLog("连接RabbitMQ耗时: {}ms", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            this.channel = this.connection.createChannel();
            this.lastUpdate = new AtomicLong(System.currentTimeMillis());
            this.mb.writeLog("监听等待时间：{}", new Object[]{Integer.valueOf(this.pluginParam.getListenWaitTime())});
            this.continuousListen = this.pluginParam.getListenWaitTime() <= 0;
            this.maxWaitTime = this.continuousListen ? Long.MAX_VALUE : TimeUnit.MINUTES.toMillis(this.pluginParam.getListenWaitTime());
        }

        public void begin() throws Exception {
            this.lastUpdate.set(System.currentTimeMillis());
            this.channel.basicConsume(this.pluginParam.getQueue(), false, new DefaultConsumer(this.channel) { // from class: com.kdgcsoft.plugin.collect.rabbitmq.reader.RabbitMQReaderPlugin.RabbitMQIDataReadPlugin.1
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    if (RabbitMQIDataReadPlugin.this.stopped) {
                        return;
                    }
                    try {
                        RabbitMQIDataReadPlugin.this.lastUpdate.set(System.currentTimeMillis());
                        RabbitMQIDataReadPlugin.this.queue.put(new MessageTuple(deliveryTag, new String(bArr)));
                    } catch (Exception e) {
                        RabbitMQIDataReadPlugin.this.mb.writeExceptionLog(e);
                    }
                }
            });
            if (this.continuousListen) {
                return;
            }
            Thread thread = new Thread(() -> {
                do {
                    try {
                        TimeUnit.MINUTES.sleep(1L);
                    } catch (InterruptedException e) {
                    }
                } while (System.currentTimeMillis() - this.lastUpdate.get() <= this.maxWaitTime);
                this.stopped = true;
            });
            thread.setDaemon(true);
            thread.start();
        }

        public boolean hasNext() throws Exception {
            return (this.stopped && this.queue.size() == 0) ? false : true;
        }

        public Record next() throws Exception {
            MessageTuple take = this.continuousListen ? this.queue.take() : this.queue.poll(1L, TimeUnit.MINUTES);
            if (null == take) {
                return null;
            }
            String msg = take.getMsg();
            this.channel.basicAck(take.getDeliveryTag(), true);
            try {
                Set<Map.Entry> entrySet = JSONObject.parseObject(msg).entrySet();
                Record record = new Record();
                for (Map.Entry entry : entrySet) {
                    record.add(new Item((String) entry.getKey(), ItemType.STRING, entry.getValue().toString()));
                }
                return record;
            } catch (Exception e) {
                throw new IllegalArgumentException("不可识别的数据：" + msg);
            }
        }

        public long estimateDataNum() {
            return 0L;
        }

        public DataNumEstimateType estimateType() {
            return DataNumEstimateType.CANNOT_ESTIMATE;
        }

        public void end() throws Exception {
            this.stopped = true;
            if (null != this.channel) {
                this.channel.close();
                this.channel = null;
            }
            if (null != this.connection) {
                this.connection.close();
                this.connection = null;
                this.mb.sendResourceDisconnectInfo(this.context.flowCode(), this.context.taskCode(), this.context.jobCode(), this.pluginParam.getResourceCode());
            }
            if (null != this.queue) {
                this.queue.clear();
                this.queue = null;
            }
            this.mb.writeLog("RabbitMQ读插件停止读取Queue[{}]消息.", new Object[]{this.pluginParam.getQueue()});
        }

        public PluginType type() {
            return PluginType.READER;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return RabbitMQReaderPluginParam.class;
        }

        public String configComponent() {
            return "RabbitMQReaderConfigForm";
        }
    }

    public RabbitMQReaderPlugin(PluginWrapper pluginWrapper) {
        super(pluginWrapper);
    }
}
