package com.kdgcsoft.plugin.collect.tdengine.sync;

import cn.hutool.core.collection.CollUtil;
import com.kdgcsoft.plugin.api.IOperatePlugin;
import com.kdgcsoft.plugin.api.PluginContext;
import com.kdgcsoft.plugin.api.PluginType;
import com.kdgcsoft.plugin.api.message.MessageBoxWrapper;
import com.kdgcsoft.plugin.api.param.PluginParam;
import com.kdgcsoft.plugin.common.resource.db.AbstractDBResourcePlugin;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;

/* loaded from: input_file:com/kdgcsoft/plugin/collect/tdengine/sync/TDEngineSyncPlugin.class */
public class TDEngineSyncPlugin extends Plugin {

    @Extension
    /* loaded from: input_file:com/kdgcsoft/plugin/collect/tdengine/sync/TDEngineSyncPlugin$TDEngineSyncIOperatePlugin.class */
    public static class TDEngineSyncIOperatePlugin extends MessageBoxWrapper implements IOperatePlugin, ExtensionPoint {
        public Object operate(PluginContext pluginContext, PluginParam pluginParam) {
            TDEngineSyncPluginParam tDEngineSyncPluginParam = (TDEngineSyncPluginParam) pluginParam;
            AbstractDBResourcePlugin resourcePlugin = pluginContext.resourcePlugin(tDEngineSyncPluginParam.getSrcResourceCode());
            AbstractDBResourcePlugin resourcePlugin2 = pluginContext.resourcePlugin(tDEngineSyncPluginParam.getDestResourceCode());
            Map<String, String> importTables = tDEngineSyncPluginParam.getImportTables();
            String generateQuerySQL = new QueryCondition(tDEngineSyncPluginParam.getTimestampColumn(), tDEngineSyncPluginParam.getOperator(), tDEngineSyncPluginParam.getLeftOperand(), tDEngineSyncPluginParam.getRightOperand()).generateQuerySQL();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            boolean isNotEmpty = CollUtil.isNotEmpty(importTables);
            long currentTimeMillis = System.currentTimeMillis();
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicLong atomicLong2 = new AtomicLong(0L);
            if (isNotEmpty) {
                CountDownLatch countDownLatch = new CountDownLatch(importTables.size());
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.min(importTables.size(), 10), runnable -> {
                    return new Thread(runnable, "TDEngineSyncThread");
                });
                for (Map.Entry<String, String> entry : importTables.entrySet()) {
                    try {
                        String key = entry.getKey();
                        String value = entry.getValue();
                        CompletableFuture.runAsync(() -> {
                            try {
                                try {
                                    TDEngineTableSynchronizer tDEngineTableSynchronizer = new TDEngineTableSynchronizer(resourcePlugin.openConnection(), resourcePlugin2.openConnection(), resourcePlugin.getDb(), key, resourcePlugin2.getDb(), value, generateQuerySQL);
                                    this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getSrcResourceCode());
                                    this.mb.sendResourceConnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getDestResourceCode());
                                    long sync = tDEngineTableSynchronizer.sync();
                                    atomicLong.addAndGet(sync);
                                    atomicLong2.addAndGet(tDEngineTableSynchronizer.getTotalSize());
                                    tDEngineTableSynchronizer.close();
                                    atomicInteger.incrementAndGet();
                                    this.mb.writeLog(key + " --> " + value + " 同步完成，共同步" + sync + "条数据。");
                                    countDownLatch.countDown();
                                    this.mb.sendResourceDisconnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getSrcResourceCode());
                                    this.mb.sendResourceDisconnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getDestResourceCode());
                                } catch (Exception e) {
                                    atomicInteger2.incrementAndGet();
                                    this.mb.writeErrorLog(key + " --> " + value + " 同步失败，原因是：" + e.getMessage());
                                    this.mb.writeExceptionLog(e);
                                    countDownLatch.countDown();
                                    this.mb.sendResourceDisconnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getSrcResourceCode());
                                    this.mb.sendResourceDisconnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getDestResourceCode());
                                }
                            } catch (Throwable th) {
                                countDownLatch.countDown();
                                this.mb.sendResourceDisconnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getSrcResourceCode());
                                this.mb.sendResourceDisconnectInfo(pluginContext.flowCode(), pluginContext.taskCode(), pluginContext.jobCode(), tDEngineSyncPluginParam.getDestResourceCode());
                                throw th;
                            }
                        }, newFixedThreadPool);
                    } catch (Throwable th) {
                        newFixedThreadPool.shutdown();
                        throw th;
                    }
                }
                try {
                    countDownLatch.await();
                    newFixedThreadPool.shutdown();
                    newFixedThreadPool = null;
                } catch (Exception e) {
                    this.mb.writeExceptionLog(e);
                    newFixedThreadPool.shutdown();
                    newFixedThreadPool = null;
                }
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            this.mb.writeLogEvent("同步完成，同步成功" + atomicInteger.get() + "张表，同步失败" + atomicInteger2.get() + "张表。");
            this.mb.writeProgressEvent(currentTimeMillis2 - currentTimeMillis, atomicLong.get(), atomicLong2.get());
            return null;
        }

        public PluginType type() {
            return PluginType.SYNC;
        }

        public Class<? extends PluginParam> pluginParamClass() {
            return TDEngineSyncPluginParam.class;
        }

        public String configComponent() {
            return "TDEngineSyncConfigForm";
        }
    }

    public TDEngineSyncPlugin(PluginWrapper pluginWrapper) {
        super(pluginWrapper);
    }
}
