/*
 * Decompiled with CFR 0.152.
 */
package com.kdgcsoft.plugin.collect.kafka.writer;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.kdgcsoft.plugin.api.DataWriteResult;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.data.AbstractDataWritePlugin;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.api.record.Item;
import com.kdgcsoft.plugin.api.record.Record;
import com.kdgcsoft.plugin.collect.kafka.writer.KafkaWriterPluginParam;
import com.kdgcsoft.plugin.common.model.WriteColumn;
import com.kdgcsoft.plugin.kafka.common.KafkaConnector;
import com.kdgcsoft.plugin.kafka.common.KafkaResourcePluginParam;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

public class KafkaWriterPlugin
extends Plugin {
    public KafkaWriterPlugin(PluginWrapper wrapper) {
        super(wrapper);
    }

    @Extension
    public static class KafkaIDataWritePlugin
    extends AbstractDataWritePlugin
    implements ExtensionPoint {
        private KafkaWriterPluginParam pluginParam;
        private KafkaProducer<String, String> producer;
        private volatile boolean stopped;
        private Set<String> columns;
        private boolean writeAll = true;
        private PluginContext context;

        public void init(PluginContext context, PluginParam param) throws Exception {
            this.pluginParam = (KafkaWriterPluginParam)param;
            this.context = context;
            KafkaResourcePluginParam resourcePluginParam = (KafkaResourcePluginParam)context.resourcePluginParam(KafkaResourcePluginParam.class, this.pluginParam.getResourceCode());
            long connectBegin = System.currentTimeMillis();
            this.producer = KafkaConnector.getProducer((KafkaResourcePluginParam)resourcePluginParam);
            long connectEnd = System.currentTimeMillis();
            this.mb.sendResourceConnectInfo(context.flowCode(), context.taskCode(), context.jobCode(), this.pluginParam.getResourceCode());
            this.mb.writeLog("\u8fde\u63a5Kafka Producer\u8017\u65f6: {}ms", new Object[]{connectEnd - connectBegin});
            if (CollUtil.isNotEmpty(this.pluginParam.getColumns())) {
                this.writeAll = false;
                this.columns = this.pluginParam.getColumns().stream().map(WriteColumn::getColumnName).collect(Collectors.toSet());
            }
        }

        public void begin() throws Exception {
        }

        public void end() throws Exception {
            this.stopped = true;
            if (null != this.producer) {
                this.producer.close();
                this.producer = null;
                this.mb.sendResourceDisconnectInfo(this.context.flowCode(), this.context.taskCode(), this.context.jobCode(), this.pluginParam.getResourceCode());
            }
        }

        public void write(Record record) {
            if (!this.stopped) {
                try {
                    JSONObject json = new JSONObject();
                    for (Item item : record.getColumns()) {
                        if (!this.writeAll && !this.columns.contains(item.getName())) continue;
                        json.put((Object)item.getName(), (Object)item.getValue());
                    }
                    this.producer.send(new ProducerRecord(this.pluginParam.getTopic(), (Object)json.toString()));
                    this.callBack.writeResult(record, DataWriteResult.INSERT);
                }
                catch (Exception e) {
                    this.mb.writeErrorLog("\u6570\u636e\u9879[{}]\u5199\u5165\u5230Kafka\u9519\u8bef\uff0c\u539f\u56e0\u662f:{}", new Object[]{JSON.toJSONString((Object)record), e.getMessage()});
                    this.callBack.writeResult(record, DataWriteResult.ERROR);
                }
            } else {
                this.callBack.writeResult(record, DataWriteResult.OMIT);
            }
        }

        public PluginType type() {
            return PluginType.WRITER;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return KafkaWriterPluginParam.class;
        }

        public String configComponent() {
            return "KafkaWriterConfigForm";
        }
    }
}

