package com.kdgcsoft.plugin.collect.rabbitmq.writer;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson2.JSONObject;
import com.kdgcsoft.plugin.api.DataWriteResult;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.data.AbstractDataWritePlugin;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.api.record.Item;
import com.kdgcsoft.plugin.api.record.Record;
import com.kdgcsoft.plugin.rabbit.common.RabbitConnector;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

/* loaded from: input_file:com/kdgcsoft/plugin/collect/rabbitmq/writer/RabbitMQWriterPlugin.class */
public class RabbitMQWriterPlugin extends Plugin {

    @Extension
    /* loaded from: input_file:com/kdgcsoft/plugin/collect/rabbitmq/writer/RabbitMQWriterPlugin$RabbitMQIDataWritePlugin.class */
    public static class RabbitMQIDataWritePlugin extends AbstractDataWritePlugin implements ExtensionPoint {
        private static final String DEFAULT_EXCHANGE = "";
        private Connection connection;
        private Channel channel;
        private RabbitMQWriterPluginParam pluginParam;
        private Set<String> columns;
        private PluginContext context;
        private volatile boolean stopped = false;
        private boolean writeAll = true;

        public void init(PluginContext pluginContext, PluginParam pluginParam) throws Exception {
            this.pluginParam = (RabbitMQWriterPluginParam) pluginParam;
            this.context = pluginContext;
            long currentTimeMillis = System.currentTimeMillis();
            this.connection = RabbitConnector.connectionFactory(pluginContext, this.pluginParam.getResourceCode()).newConnection();
            long currentTimeMillis2 = System.currentTimeMillis();
            this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), this.pluginParam.getResourceCode());
            this.mb.writeLog("连接RabbitMQ耗时: {}ms", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            this.channel = this.connection.createChannel();
            if (CollUtil.isNotEmpty(this.pluginParam.getColumns())) {
                this.writeAll = false;
                this.columns = (Set) this.pluginParam.getColumns().stream().map((v0) -> {
                    return v0.getColumnName();
                }).collect(Collectors.toSet());
            }
            if (this.pluginParam.isCreateOnNotExists()) {
                this.channel.queueDeclare(this.pluginParam.getQueue(), true, false, false, (Map) null);
            }
        }

        public void begin() throws Exception {
        }

        public void end() throws Exception {
            this.stopped = true;
            if (null != this.channel) {
                this.channel.close();
                this.channel = null;
            }
            if (null != this.connection) {
                this.connection.close();
                this.mb.sendResourceDisconnectInfo(this.context.flowCode(), this.context.taskCode(), this.context.jobCode(), this.pluginParam.getResourceCode());
                this.connection = null;
            }
        }

        public void write(Record record) {
            if (this.stopped) {
                this.callBack.writeResult(record, DataWriteResult.OMIT);
                return;
            }
            JSONObject jSONObject = new JSONObject();
            for (Item item : record.getColumns()) {
                if (this.writeAll || this.columns.contains(item.getName())) {
                    jSONObject.put(item.getName(), item.getValue());
                }
            }
            try {
                this.channel.basicPublish(DEFAULT_EXCHANGE, this.pluginParam.getQueue(), true, (AMQP.BasicProperties) null, jSONObject.toString().getBytes(StandardCharsets.UTF_8));
                this.callBack.writeResult(record, DataWriteResult.INSERT);
            } catch (IOException e) {
                this.mb.writeExceptionLog(e);
                this.callBack.writeResult(record, DataWriteResult.ERROR);
            }
        }

        public PluginType type() {
            return PluginType.WRITER;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return RabbitMQWriterPluginParam.class;
        }

        public String configComponent() {
            return "RabbitMQWriterConfigForm";
        }
    }

    public RabbitMQWriterPlugin(PluginWrapper pluginWrapper) {
        super(pluginWrapper);
    }
}
