package com.kdgcsoft.plugin.collect.redis.reader;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONReader;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import com.kdgcsoft.plugin.api.DataNumEstimateType;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.data.IDataReadPlugin;
import com.kdgcsoft.plugin.api.message.MessageBoxWrapper;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.api.record.Item;
import com.kdgcsoft.plugin.api.record.ItemType;
import com.kdgcsoft.plugin.api.record.Record;
import com.kdgcsoft.plugin.redis.common.RedisConnector;
import com.kdgcsoft.plugin.redis.common.RedisConnectorFactory;
import com.kdgcsoft.plugin.redis.common.RedisKeyType;
import com.kdgcsoft.plugin.redis.common.RedisResourcePluginParam;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

/* loaded from: input_file:com/kdgcsoft/plugin/collect/redis/reader/RedisReaderPlugin.class */
public class RedisReaderPlugin extends Plugin {

    @Extension
    /* loaded from: input_file:com/kdgcsoft/plugin/collect/redis/reader/RedisReaderPlugin$RedisIDataReadPlugin.class */
    public static class RedisIDataReadPlugin extends MessageBoxWrapper implements IDataReadPlugin, ExtensionPoint {
        private static final long STEP = 500;
        private RedisConnector connector;
        private String key;
        private long length;
        private AtomicLong start;
        private AtomicLong stop;
        private AtomicInteger currentLength;
        private AtomicInteger currentPosition;
        private List<String> currentBatch;
        private RedisKeyType keyType;
        private RedisReaderPluginParam pluginParam;
        private List<Record> oneRowRecords;
        private PluginContext context;

        public void init(PluginContext pluginContext, PluginParam pluginParam) throws Exception {
            this.pluginParam = (RedisReaderPluginParam) pluginParam;
            this.context = pluginContext;
            RedisResourcePluginParam resourcePluginParam = pluginContext.resourcePluginParam(RedisResourcePluginParam.class, this.pluginParam.getResourceCode());
            this.key = this.pluginParam.getKey();
            long currentTimeMillis = System.currentTimeMillis();
            this.connector = RedisConnectorFactory.get(resourcePluginParam);
            long currentTimeMillis2 = System.currentTimeMillis();
            this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), this.pluginParam.getResourceCode());
            this.mb.writeLog("连接redis耗时: {}ms", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
            this.keyType = RedisKeyType.of(this.connector.type(this.key), this.key);
            if (!this.connector.supportType(this.keyType)) {
                throw new UnsupportedOperationException("不支持的KEY的类型:" + this.keyType.getText());
            }
            this.start = new AtomicLong(0L);
            this.stop = new AtomicLong(0L);
            this.length = this.connector.length(this.keyType, this.key);
            this.currentPosition = new AtomicInteger(0);
            this.currentLength = new AtomicInteger(0);
            this.oneRowRecords = new ArrayList();
        }

        public void begin() throws Exception {
            if (this.length > 0) {
                startNewBatch();
            }
        }

        public boolean hasNext() throws Exception {
            return !this.connector.isConnectionBroken() && this.length > 0 && this.stop.get() < this.length;
        }

        private void parseStrRecord(String str) {
            try {
                String str2 = str;
                if (StrUtil.isNotBlank(this.pluginParam.getJsonPath())) {
                    str2 = JSON.toJSONString(JsonPath.read(str, this.pluginParam.getJsonPath(), new Predicate[0]));
                }
                if (str2.startsWith("[")) {
                    JSONArray parseArray = JSONArray.parseArray(str2, new JSONReader.Feature[0]);
                    for (int i = 0; i < parseArray.size(); i++) {
                        this.oneRowRecords.add(buildRecord(parseArray.getJSONObject(i)));
                    }
                } else if (str2.startsWith("{")) {
                    this.oneRowRecords.add(buildRecord(JSONObject.parseObject(str2)));
                } else {
                    Record record = new Record();
                    record.add(new Item("data", ItemType.STRING, str2));
                    this.oneRowRecords.add(record);
                }
            } catch (Exception e) {
                this.mb.writeErrorLog(str);
                this.mb.writeExceptionLog(e);
                Record record2 = new Record();
                record2.add(new Item("data", ItemType.STRING, str));
                this.oneRowRecords.add(record2);
            }
        }

        private Record buildRecord(JSONObject jSONObject) {
            Record record = new Record();
            for (Map.Entry entry : jSONObject.entrySet()) {
                record.add(new Item((String) entry.getKey(), ItemType.STRING, entry.getValue().toString()));
            }
            return record;
        }

        public Record next() throws Exception {
            if (this.oneRowRecords.size() == 0) {
                synchronized (this) {
                    if (this.currentPosition.get() == this.currentLength.get()) {
                        startNewBatch();
                    }
                }
                parseStrRecord(this.currentBatch.get(this.currentPosition.get()));
                this.currentPosition.incrementAndGet();
                this.stop.incrementAndGet();
            }
            return this.oneRowRecords.remove(0);
        }

        public long estimateDataNum() {
            return this.length;
        }

        public DataNumEstimateType estimateType() {
            return DataNumEstimateType.STATIC_NUM;
        }

        private void startNewBatch() {
            long min = Long.min(this.start.get() + STEP, this.length);
            this.currentBatch = this.connector.range(this.keyType, this.key, this.start.get(), min);
            this.start.set(min);
            this.currentPosition.set(0);
            this.currentLength.set(this.currentBatch.size());
        }

        public void end() throws Exception {
            if (null != this.connector) {
                this.connector.close();
                this.connector = null;
                this.mb.sendResourceDisconnectInfo(this.context.flowCode(), this.context.taskCode(), this.context.jobCode(), this.pluginParam.getResourceCode());
            }
        }

        public PluginType type() {
            return PluginType.READER;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return RedisReaderPluginParam.class;
        }

        public String configComponent() {
            return "RedisReaderConfigForm";
        }
    }

    public RedisReaderPlugin(PluginWrapper pluginWrapper) {
        super(pluginWrapper);
    }
}
