package com.kdgcsoft.dtp.plugin.extend.read.rabbitmq;

import com.kdgcsoft.dtp.plugin.extend.read.common.LoadConfig;
import com.kdgcsoft.dtp.plugin.extend.read.common.LoadUtils;
import com.kdgcsoft.dtp.plugin.extend.read.rabbitmq.properties.RabbitmqProperties;
import com.kdgcsoft.dtp.plugin.writer.databaseWriter.config.ConfigureHandler;
import com.kdgcsoft.dtp.plugin.writer.databaseWriter.service.impl.StringArrayDatabaseUpdate;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kdgcsoft/dtp/plugin/extend/read/rabbitmq/RabbitmqReadUtil.class */
public class RabbitmqReadUtil {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    public LoadCallBack load(RabbitmqProperties rabbitmqProperties, DataSource dataSource, String str, LoadConfig loadConfig) throws InterruptedException, IOException {
        AtomicReference atomicReference = new AtomicReference();
        RabbitmqStreamReader rabbitmqStreamReader = new RabbitmqStreamReader(rabbitmqProperties);
        StringArrayDatabaseUpdate stringArrayDatabaseUpdate = new StringArrayDatabaseUpdate(dataSource, rabbitmqStreamReader);
        rabbitmqStreamReader.start();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        LoadCallBack loadCallBack = new LoadCallBack(rabbitmqStreamReader, newSingleThreadExecutor);
        loadCallBack.setStatus(LoadCallBack.NOT_START);
        ConfigureHandler configureHandler = new ConfigureHandler();
        configureHandler.configureDatabaseLoadExceptionHandler((exc, j) -> {
            exc.printStackTrace();
            loadCallBack.setStatus(LoadCallBack.ERROR);
        });
        stringArrayDatabaseUpdate.setConfigureHandler(configureHandler);
        newSingleThreadExecutor.execute(() -> {
            while (true) {
                if (rabbitmqStreamReader.hasMessage()) {
                    loadCallBack.setStatus(LoadCallBack.IS_RUNNING);
                    this.logger.info("接收到消息队列中的数据信息，开始进行数据载入操作...");
                    LoadUtils.load(atomicReference, stringArrayDatabaseUpdate, dataSource, str, loadConfig);
                    try {
                        TimeUnit.SECONDS.sleep(10L);
                    } catch (InterruptedException e) {
                        loadCallBack.setStatus(LoadCallBack.ERROR);
                        throw new RuntimeException(e);
                    }
                } else if (LoadCallBack.IS_RUNNING == loadCallBack.getStatus() && !rabbitmqStreamReader.hasMessage()) {
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                        if (!rabbitmqStreamReader.hasMessage()) {
                            loadCallBack.setStatus(LoadCallBack.SUCCESS);
                            return;
                        }
                    } catch (InterruptedException e2) {
                        loadCallBack.setStatus(LoadCallBack.ERROR);
                        throw new RuntimeException(e2);
                    }
                } else if (LoadCallBack.ERROR == loadCallBack.getStatus()) {
                    return;
                }
            }
        });
        newSingleThreadExecutor.shutdown();
        return loadCallBack;
    }

    public static void main(String[] strArr) throws InterruptedException, IOException {
    }
}
