package org.apache.seatunnel.engine.server.rest;

import com.hazelcast.cluster.Cluster;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
import com.hazelcast.internal.ascii.rest.HttpStatusCode;
import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
import com.hazelcast.internal.util.JsonUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

/* loaded from: input_file:org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.class */
public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCommand> {
    private final Log4j2HttpGetCommandProcessor original;
    private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
    private static final String SINK_WRITE_COUNT = "SinkWriteCount";
    private NodeEngine nodeEngine;

    public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
        this(textCommandService, new Log4j2HttpGetCommandProcessor(textCommandService));
    }

    public RestHttpGetCommandProcessor(TextCommandService textCommandService, Log4j2HttpGetCommandProcessor log4j2HttpGetCommandProcessor) {
        super(textCommandService, textCommandService.getNode().getLogger(Log4j2HttpGetCommandProcessor.class));
        this.original = log4j2HttpGetCommandProcessor;
    }

    public void handle(HttpGetCommand httpGetCommand) {
        String uri = httpGetCommand.getURI();
        try {
            if (uri.startsWith(RestConstant.RUNNING_JOBS_URL)) {
                handleRunningJobsInfo(httpGetCommand);
            } else if (uri.startsWith(RestConstant.FINISHED_JOBS_INFO)) {
                handleFinishedJobsInfo(httpGetCommand, uri);
            } else if (uri.startsWith(RestConstant.RUNNING_JOB_URL)) {
                handleJobInfoById(httpGetCommand, uri);
            } else if (uri.startsWith(RestConstant.SYSTEM_MONITORING_INFORMATION)) {
                getSystemMonitoringInformation(httpGetCommand);
            } else if (uri.startsWith(RestConstant.RUNNING_THREADS)) {
                getRunningThread(httpGetCommand);
            } else {
                this.original.handle(httpGetCommand);
            }
        } catch (IndexOutOfBoundsException e) {
            httpGetCommand.send400();
        } catch (Throwable th) {
            this.logger.warning("An error occurred while handling request " + httpGetCommand, th);
            prepareResponse(HttpStatusCode.SC_500, httpGetCommand, exceptionResponse(th));
        }
        this.textCommandService.sendResponse(httpGetCommand);
    }

    public void handleRejection(HttpGetCommand httpGetCommand) {
        handle(httpGetCommand);
    }

    private void getSystemMonitoringInformation(HttpGetCommand httpGetCommand) {
        Cluster cluster = this.textCommandService.getNode().hazelcastInstance.getCluster();
        this.nodeEngine = this.textCommandService.getNode().hazelcastInstance.node.nodeEngine;
        prepareResponse(httpGetCommand, (JsonArray) cluster.getMembers().stream().map(member -> {
            String str = null;
            try {
                str = (String) NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, new GetClusterHealthMetricsOperation(), member.getAddress()).get();
            } catch (InterruptedException | ExecutionException e) {
                this.logger.severe("get system monitoring information fail", e);
            }
            String[] split = str.split(", ");
            JsonObject jsonObject = new JsonObject();
            Arrays.stream(split).forEach(str2 -> {
                String[] split2 = str2.split("=");
                jsonObject.add(split2[0], split2[1]);
            });
            return jsonObject;
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        }));
    }

    private void handleRunningJobsInfo(HttpGetCommand httpGetCommand) {
        prepareResponse(httpGetCommand, (JsonArray) this.textCommandService.getNode().getNodeEngine().getHazelcastInstance().getMap("engine_runningJobInfo").entrySet().stream().map(entry -> {
            return convertToJson((JobInfo) entry.getValue(), ((Long) entry.getKey()).longValue());
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        }));
    }

    private void handleFinishedJobsInfo(HttpGetCommand httpGetCommand, String str) {
        String stripTrailingSlash = StringUtil.stripTrailingSlash(str);
        int indexOf = stripTrailingSlash.indexOf(47, "/hazelcast/rest/maps/".length());
        String substring = indexOf == -1 ? "" : stripTrailingSlash.substring(indexOf + 1);
        IMap map = this.textCommandService.getNode().getNodeEngine().getHazelcastInstance().getMap("engine_finishedJobState");
        this.textCommandService.getNode().getNodeEngine().getHazelcastInstance().getMap("engine_finishedJobMetrics");
        IMap map2 = this.textCommandService.getNode().getNodeEngine().getHazelcastInstance().getMap("engine_finishedJobVertexInfo");
        String str2 = substring;
        prepareResponse(httpGetCommand, (JsonArray) map.values().stream().filter(jobState -> {
            if (str2.isEmpty()) {
                return true;
            }
            return jobState.getJobStatus().name().equals(str2.toUpperCase());
        }).sorted(Comparator.comparing((v0) -> {
            return v0.getFinishTime();
        })).map(jobState2 -> {
            Long jobId = jobState2.getJobId();
            SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
            return convertToJson(jobState2, seaTunnelServer == null ? (String) NodeEngineUtil.sendOperationToMasterNode(getNode().nodeEngine, new GetJobMetricsOperation(jobId.longValue())).join() : seaTunnelServer.getCoordinatorService().getJobMetrics(jobId.longValue()).toJsonString(), Json.parse(JsonUtils.toJsonString((JobDAGInfo) map2.get(jobId))).asObject(), jobId.longValue());
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        }));
    }

    private void handleJobInfoById(HttpGetCommand httpGetCommand, String str) {
        String stripTrailingSlash = StringUtil.stripTrailingSlash(str);
        String substring = stripTrailingSlash.substring(stripTrailingSlash.indexOf(47, "/hazelcast/rest/maps/".length()) + 1);
        JobInfo jobInfo = (JobInfo) this.textCommandService.getNode().getNodeEngine().getHazelcastInstance().getMap("engine_runningJobInfo").get(Long.valueOf(substring));
        if (substring.isEmpty() || jobInfo == null) {
            prepareResponse(httpGetCommand, new JsonObject());
        } else {
            prepareResponse(httpGetCommand, convertToJson(jobInfo, Long.parseLong(substring)));
        }
    }

    private void getRunningThread(HttpGetCommand httpGetCommand) {
        prepareResponse(httpGetCommand, Thread.getAllStackTraces().keySet().stream().sorted(Comparator.comparing((v0) -> {
            return v0.getName();
        })).map(thread -> {
            JsonObject jsonObject = new JsonObject();
            jsonObject.add("threadName", thread.getName());
            jsonObject.add("classLoader", String.valueOf(thread.getContextClassLoader()));
            return jsonObject;
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        }));
    }

    private Map<String, Long> getJobMetrics(String str) {
        HashMap hashMap = new HashMap();
        long j = 0;
        long j2 = 0;
        try {
            JsonNode readTree = new ObjectMapper().readTree(str);
            JsonNode jsonNode = readTree.get(SOURCE_RECEIVED_COUNT);
            JsonNode jsonNode2 = readTree.get(SINK_WRITE_COUNT);
            for (int i = 0; i < readTree.get(SOURCE_RECEIVED_COUNT).size(); i++) {
                JsonNode jsonNode3 = jsonNode.get(i);
                JsonNode jsonNode4 = jsonNode2.get(i);
                j += jsonNode3.get("value").asLong();
                j2 += jsonNode4.get("value").asLong();
            }
            hashMap.put(SOURCE_RECEIVED_COUNT, Long.valueOf(j));
            hashMap.put(SINK_WRITE_COUNT, Long.valueOf(j2));
            return hashMap;
        } catch (JsonProcessingException | NullPointerException e) {
            return hashMap;
        }
    }

    private SeaTunnelServer getSeaTunnelServer() {
        SeaTunnelServer seaTunnelServer = (SeaTunnelServer) this.textCommandService.getNode().getNodeExtension().createExtensionServices().get(SeaTunnelServer.SERVICE_NAME);
        if (seaTunnelServer.isMasterNode()) {
            return seaTunnelServer;
        }
        return null;
    }

    private JsonObject convertToJson(JobInfo jobInfo, long j) {
        String jsonString;
        JobStatus jobStatus;
        JsonObject jsonObject = new JsonObject();
        JobImmutableInformation jobImmutableInformation = (JobImmutableInformation) this.textCommandService.getNode().getNodeEngine().getSerializationService().toObject(this.textCommandService.getNode().getNodeEngine().getSerializationService().toObject(jobInfo.getJobImmutableInformation()));
        LogicalDag logicalDag = (LogicalDag) CustomClassLoadedObject.deserializeWithCustomClassLoader(this.textCommandService.getNode().getNodeEngine().getSerializationService(), new SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()), jobImmutableInformation.getLogicalDag());
        SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
        if (seaTunnelServer == null) {
            jsonString = (String) NodeEngineUtil.sendOperationToMasterNode(getNode().nodeEngine, new GetJobMetricsOperation(j)).join();
            jobStatus = JobStatus.values()[((Integer) NodeEngineUtil.sendOperationToMasterNode(getNode().nodeEngine, new GetJobStatusOperation(j)).join()).intValue()];
        } else {
            jsonString = seaTunnelServer.getCoordinatorService().getJobMetrics(j).toJsonString();
            jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(j);
        }
        jsonObject.add(RestConstant.JOB_ID, String.valueOf(j)).add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName()).add(RestConstant.JOB_STATUS, jobStatus.toString()).add(RestConstant.ENV_OPTIONS, JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions())).add(RestConstant.CREATE_TIME, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(jobImmutableInformation.getCreateTime()))).add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson()).add(RestConstant.PLUGIN_JARS_URLS, (JsonValue) jobImmutableInformation.getPluginJarsUrls().stream().map(url -> {
            JsonObject jsonObject2 = new JsonObject();
            jsonObject2.add(RestConstant.JAR_PATH, url.toString());
            return jsonObject2;
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        })).add(RestConstant.IS_START_WITH_SAVE_POINT, jobImmutableInformation.isStartWithSavePoint()).add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jsonString)));
        return jsonObject;
    }

    private JsonObject convertToJson(JobHistoryService.JobState jobState, String str, JsonObject jsonObject, long j) {
        JsonObject jsonObject2 = new JsonObject();
        jsonObject2.add(RestConstant.JOB_ID, String.valueOf(j)).add(RestConstant.JOB_NAME, jobState.getJobName()).add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString()).add(RestConstant.ERROR_MSG, jobState.getErrorMessage()).add(RestConstant.CREATE_TIME, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(jobState.getSubmitTime()))).add(RestConstant.FINISH_TIME, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(jobState.getFinishTime().longValue()))).add(RestConstant.JOB_DAG, jsonObject).add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(str)));
        return jsonObject2;
    }
}
