package com.kdgcsoft.plugin.collect.redis.writer;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson2.JSONObject;
import com.kdgcsoft.plugin.api.DataWriteResult;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.data.AbstractDataWritePlugin;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.api.record.Item;
import com.kdgcsoft.plugin.api.record.Record;
import com.kdgcsoft.plugin.redis.common.RedisConnector;
import com.kdgcsoft.plugin.redis.common.RedisConnectorFactory;
import com.kdgcsoft.plugin.redis.common.RedisKeyType;
import com.kdgcsoft.plugin.redis.common.RedisResourcePluginParam;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

/* loaded from: input_file:com/kdgcsoft/plugin/collect/redis/writer/RedisWriterPlugin.class */
public class RedisWriterPlugin extends Plugin {

    @Extension
    /* loaded from: input_file:com/kdgcsoft/plugin/collect/redis/writer/RedisWriterPlugin$RedisIDataWritePlugin.class */
    public static class RedisIDataWritePlugin extends AbstractDataWritePlugin implements ExtensionPoint {
        private static final int BATCH_SIZE = 500;
        private RedisConnector connector;
        private String key;
        private List<String> batch;
        private Set<String> fields;
        private boolean chooseFields = false;
        private RedisKeyType keyType;
        private Lock lock;
        private PluginContext context;
        RedisWriterPluginParam pluginParam;

        public void init(PluginContext pluginContext, PluginParam pluginParam) throws Exception {
            this.pluginParam = (RedisWriterPluginParam) pluginParam;
            this.context = pluginContext;
            RedisResourcePluginParam resourcePluginParam = pluginContext.resourcePluginParam(RedisResourcePluginParam.class, this.pluginParam.getResourceCode());
            long currentTimeMillis = System.currentTimeMillis();
            this.connector = RedisConnectorFactory.get(resourcePluginParam);
            long currentTimeMillis2 = System.currentTimeMillis();
            this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), this.pluginParam.getResourceCode());
            this.mb.writeLog("连接redis耗时: {}ms", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            this.key = this.pluginParam.getKey();
            this.keyType = RedisKeyType.of(this.connector.type(this.key), this.key);
            if (!this.connector.supportType(this.keyType)) {
                throw new UnsupportedOperationException("不支持的KEY的类型:" + this.keyType.getText());
            }
            this.lock = new ReentrantLock();
            this.batch = new ArrayList();
            if (CollUtil.isNotEmpty(this.pluginParam.getFields())) {
                this.fields = new HashSet();
                this.chooseFields = true;
                Iterator<String> it = this.pluginParam.getFields().iterator();
                while (it.hasNext()) {
                    this.fields.add(it.next().trim());
                }
            }
        }

        public void write(Record record) {
            JSONObject jSONObject = new JSONObject();
            for (Item item : record.getColumns()) {
                if (!this.chooseFields || this.fields.contains(item.getName())) {
                    jSONObject.put(item.getName(), item.getValue());
                }
            }
            this.lock.lock();
            try {
                this.batch.add(jSONObject.toString());
                this.lock.unlock();
                if (this.batch.size() == BATCH_SIZE) {
                    flush();
                }
                this.callBack.writeResult(record, DataWriteResult.INSERT);
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        public synchronized void flush() {
            if (null == this.batch || this.batch.size() <= 0) {
                return;
            }
            this.lock.lock();
            try {
                if (!this.connector.isConnectionBroken()) {
                    this.connector.push(this.keyType, this.key, this.batch);
                }
                this.batch.clear();
            } finally {
                this.lock.unlock();
            }
        }

        public void begin() throws Exception {
        }

        public void end() throws Exception {
            flush();
            this.batch = null;
            if (null != this.connector) {
                this.connector.close();
                this.connector = null;
                this.mb.sendResourceDisconnectInfo(this.context.flowCode(), this.context.taskCode(), this.context.jobCode(), this.pluginParam.getResourceCode());
            }
        }

        public PluginType type() {
            return PluginType.WRITER;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return RedisWriterPluginParam.class;
        }

        public String configComponent() {
            return "RedisWriterConfigForm";
        }
    }

    public RedisWriterPlugin(PluginWrapper pluginWrapper) {
        super(pluginWrapper);
    }
}
