package org.apache.seatunnel.e2e.common.container.seatunnel;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

@AutoService({TestContainer.class})
/* loaded from: input_file:org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.class */
public class SeaTunnelContainer extends AbstractTestContainer {
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelContainer.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String JDK_DOCKER_IMAGE = "openjdk:8";
    private static final String CLIENT_SHELL = "seatunnel.sh";
    private static final String SERVER_SHELL = "seatunnel-cluster.sh";
    protected GenericContainer<?> server;
    private final AtomicInteger runningCount = new AtomicInteger();

    @Override // org.apache.seatunnel.e2e.common.TestResource
    public void startUp() throws Exception {
        this.server = new GenericContainer(getDockerImage()).withNetwork(NETWORK).withEnv("TZ", "UTC").withCommand(ContainerUtil.adaptPathForWin(Paths.get(AbstractTestContainer.SEATUNNEL_HOME, "bin", SERVER_SHELL).toString())).withNetworkAliases(new String[]{"server"}).withExposedPorts(new Integer[0]).withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("seatunnel-engine:openjdk:8"))).waitingFor(Wait.forListeningPort());
        copySeaTunnelStarterToContainer(this.server);
        this.server.setPortBindings(Collections.singletonList("5801:5801"));
        this.server.withCopyFileToContainer(MountableFile.forHostPath(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"), Paths.get(AbstractTestContainer.SEATUNNEL_HOME, "config").toString());
        this.server.withCopyFileToContainer(MountableFile.forHostPath(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"), Paths.get(AbstractTestContainer.SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
        this.server.start();
        executeExtraCommands(this.server);
    }

    @Override // org.apache.seatunnel.e2e.common.TestResource
    public void tearDown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getDockerImage() {
        return JDK_DOCKER_IMAGE;
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getStartModuleName() {
        return "seatunnel-starter";
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getStartShellName() {
        return CLIENT_SHELL;
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getConnectorModulePath() {
        return "seatunnel-connectors-v2";
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getConnectorType() {
        return "seatunnel";
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getConnectorNamePrefix() {
        return "connector-";
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected List<String> getExtraStartShellCommands() {
        return Collections.emptyList();
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public TestContainerId identifier() {
        return TestContainerId.SEATUNNEL;
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getSavePointCommand() {
        return "-s";
    }

    @Override // org.apache.seatunnel.e2e.common.container.AbstractTestContainer
    protected String getRestoreCommand() {
        return "-r";
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public void executeExtraCommands(ContainerExtendedFactory containerExtendedFactory) throws IOException, InterruptedException {
        containerExtendedFactory.extend(this.server);
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public Container.ExecResult executeJob(String str) throws IOException, InterruptedException {
        log.info("test in container: {}", identifier());
        List<String> jVMThreadNames = ContainerUtil.getJVMThreadNames(this.server);
        this.runningCount.incrementAndGet();
        Container.ExecResult executeJob = executeJob(this.server, str);
        if (this.runningCount.decrementAndGet() <= 0 && !removeSystemThread(jVMThreadNames, ContainerUtil.getJVMThreadNames(this.server)).isEmpty()) {
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                List<String> removeSystemThread = removeSystemThread(jVMThreadNames, ContainerUtil.getJVMThreadNames(this.server));
                Assertions.assertTrue(removeSystemThread.isEmpty(), "There are still threads running in the container: \n" + ((String) ContainerUtil.getJVMThreads(this.server).stream().filter(tuple2 -> {
                    return removeSystemThread.contains(tuple2.getV1());
                }).map((v0) -> {
                    return v0.getV2();
                }).map(str2 -> {
                    return str2 + "\n";
                }).collect(Collectors.joining())));
            });
            return executeJob;
        }
        return executeJob;
    }

    private List<String> removeSystemThread(List<String> list, List<String> list2) throws IOException {
        Pattern compile = Pattern.compile("pool-[0-9]-thread-[0-9]");
        list2.removeIf(str -> {
            return str.startsWith("hz.main") || str.startsWith("seatunnel-coordinator-service") || str.startsWith("GC task thread") || str.contains("CompilerThread") || str.contains("NioNetworking-closeListenerExecutor") || str.contains("ForkJoinPool.commonPool") || str.contains("DestroyJavaVM") || str.contains("main-query-state-checker") || str.contains("Keep-Alive-SocketCleaner") || str.contains("process reaper") || str.startsWith("Timer-") || str.contains("InterruptTimer") || str.contains("Java2D Disposer") || str.contains("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner") || str.startsWith("Log4j2-TF-") || compile.matcher(str).matches();
        });
        list.getClass();
        list2.removeIf((v1) -> {
            return r1.contains(v1);
        });
        List<String> list3 = (List) getThreadClassLoader().entrySet().stream().filter(entry -> {
            if (((String) entry.getKey()).contains("process reaper")) {
                return false;
            }
            String str2 = (String) entry.getValue();
            return (str2.contains("AppClassLoader") || str2.equals("null")) ? false : true;
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
        list3.addAll(list2);
        list3.removeIf(this::isIssueWeAlreadyKnow);
        return list3;
    }

    private void classLoaderObjectCheck(Integer num) throws IOException, InterruptedException {
        Map<String, Integer> jVMLiveObject = ContainerUtil.getJVMLiveObject(this.server);
        String str = "org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader";
        if (!jVMLiveObject.containsKey("org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader") || jVMLiveObject.get("org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader").intValue() <= num.intValue()) {
            return;
        }
        Awaitility.await().atMost(20L, TimeUnit.SECONDS).untilAsserted(() -> {
            Map<String, Integer> jVMLiveObject2 = ContainerUtil.getJVMLiveObject(this.server);
            if (jVMLiveObject2.containsKey(str)) {
                Assertions.assertTrue(jVMLiveObject2.get(str).intValue() <= num.intValue(), "There are still SeaTunnelChildFirstClassLoader objects in the seatunnel server");
            }
        });
    }

    private Map<String, String> getThreadClassLoader() throws IOException {
        HttpGet httpGet = new HttpGet("http://localhost:5801/hazelcast/rest/maps/running-threads");
        CloseableHttpClient createDefault = HttpClients.createDefault();
        Throwable th = null;
        try {
            try {
                Map<String, String> map = (Map) ((List) OBJECT_MAPPER.readValue(EntityUtils.toString(createDefault.execute(httpGet).getEntity()), new TypeReference<List<Map<String, String>>>() { // from class: org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer.1
                })).stream().collect(Collectors.toMap(map2 -> {
                    return (String) map2.get("threadName");
                }, map3 -> {
                    return (String) map3.get("classLoader");
                }, (str, str2) -> {
                    return str + " && " + str2;
                }));
                if (createDefault != null) {
                    if (0 != 0) {
                        try {
                            createDefault.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createDefault.close();
                    }
                }
                return map;
            } finally {
            }
        } catch (Throwable th3) {
            if (createDefault != null) {
                if (th != null) {
                    try {
                        createDefault.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDefault.close();
                }
            }
            throw th3;
        }
    }

    private boolean isIssueWeAlreadyKnow(String str) {
        return str.startsWith("ClickHouseClientWorker") || str.startsWith("Okio Watchdog") || str.startsWith("OkHttp TaskRunner") || str.startsWith("SessionExecutor") || str.contains("oracle.jdbc.driver.BlockSource.ThreadedCachingBlockSource.BlockReleaser") || str.startsWith("AsyncAppender-Dispatcher-Thread") || str.startsWith("BufferPoolPruner") || str.startsWith("MaintenanceTimer") || str.startsWith("cluster-");
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public Container.ExecResult savepointJob(String str) throws IOException, InterruptedException {
        return savepointJob(this.server, str);
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public Container.ExecResult restoreJob(String str, String str2) throws IOException, InterruptedException {
        this.runningCount.incrementAndGet();
        Container.ExecResult restoreJob = restoreJob(this.server, str, str2);
        this.runningCount.decrementAndGet();
        return restoreJob;
    }

    @Override // org.apache.seatunnel.e2e.common.container.TestContainer
    public String getServerLogs() {
        return this.server.getLogs();
    }
}
