package org.apache.seatunnel.engine.server.rest.service;

import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
import com.hazelcast.internal.util.JsonUtil;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.DAGUtils;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import org.apache.seatunnel.engine.server.rest.RestConstant;
import org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/rest/service/BaseService.class */
public abstract class BaseService {
    private static final Logger log = LoggerFactory.getLogger(BaseService.class);
    protected final NodeEngineImpl nodeEngine;

    public BaseService(NodeEngineImpl nodeEngineImpl) {
        this.nodeEngine = nodeEngineImpl;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SeaTunnelServer getSeaTunnelServer(boolean z) {
        SeaTunnelServer seaTunnelServer = (SeaTunnelServer) this.nodeEngine.getNode().getNodeExtension().createExtensionServices().get(SeaTunnelServer.SERVICE_NAME);
        if (!z || seaTunnelServer.isMasterNode()) {
            return seaTunnelServer;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonObject convertToJson(JobInfo jobInfo, long j) {
        String jsonString;
        JobStatus jobStatus;
        JsonObject jsonObject = new JsonObject();
        JobImmutableInformation jobImmutableInformation = (JobImmutableInformation) this.nodeEngine.getSerializationService().toObject(this.nodeEngine.getSerializationService().toObject(jobInfo.getJobImmutableInformation()));
        SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
        ClassLoaderService classLoaderService = seaTunnelServer == null ? getSeaTunnelServer(false).getClassLoaderService() : seaTunnelServer.getClassLoaderService();
        LogicalDag logicalDag = (LogicalDag) CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine.getSerializationService(), classLoaderService.getClassLoader(j, jobImmutableInformation.getPluginJarsUrls()), jobImmutableInformation.getLogicalDag());
        classLoaderService.releaseClassLoader(j, jobImmutableInformation.getPluginJarsUrls());
        if (seaTunnelServer == null) {
            jsonString = (String) NodeEngineUtil.sendOperationToMasterNode(this.nodeEngine, new GetJobMetricsOperation(j)).join();
            jobStatus = JobStatus.values()[((Integer) NodeEngineUtil.sendOperationToMasterNode(this.nodeEngine, new GetJobStatusOperation(j)).join()).intValue()];
        } else {
            jsonString = seaTunnelServer.getCoordinatorService().getJobMetrics(j).toJsonString();
            jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(j);
        }
        jsonObject.add(RestConstant.JOB_ID, String.valueOf(j)).add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName()).add(RestConstant.JOB_STATUS, jobStatus.toString()).add(RestConstant.ENV_OPTIONS, JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions())).add(RestConstant.CREATE_TIME, DateTimeUtils.toString(jobImmutableInformation.getCreateTime(), DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add(RestConstant.JOB_DAG, DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(), true, new ExecutionAddress(this.nodeEngine.getMasterAddress().getHost(), this.nodeEngine.getMasterAddress().getPort()), new HashSet()).toJsonObject()).add(RestConstant.PLUGIN_JARS_URLS, (JsonValue) jobImmutableInformation.getPluginJarsUrls().stream().map(url -> {
            JsonObject jsonObject2 = new JsonObject();
            jsonObject2.add(RestConstant.JAR_PATH, url.toString());
            return jsonObject2;
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        })).add(RestConstant.IS_START_WITH_SAVE_POINT, jobImmutableInformation.isStartWithSavePoint()).add(RestConstant.METRICS, metricsToJsonObject(getJobMetrics(jsonString)));
        return jsonObject;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonObject getJobInfoJson(JobHistoryService.JobState jobState, String str, JobDAGInfo jobDAGInfo) {
        return new JsonObject().add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId())).add(RestConstant.JOB_NAME, jobState.getJobName()).add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString()).add(RestConstant.ERROR_MSG, jobState.getErrorMessage()).add(RestConstant.CREATE_TIME, DateTimeUtils.toString(jobState.getSubmitTime(), DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add(RestConstant.FINISH_TIME, DateTimeUtils.toString(jobState.getFinishTime().longValue(), DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)).add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject()).add(RestConstant.PLUGIN_JARS_URLS, new JsonArray()).add(RestConstant.METRICS, metricsToJsonObject(getJobMetrics(str)));
    }

    private Map<String, Object> getJobMetrics(String str) {
        Map<String, Object> hashMap = new HashMap<>();
        String[] strArr = {"SourceReceivedCount", "SinkWriteCount", "SourceReceivedBytes", "SinkWriteBytes"};
        String[] strArr2 = {"SourceReceivedQPS", "SinkWriteQPS", "SourceReceivedBytesPerSeconds", "SinkWriteBytesPerSeconds"};
        String[] strArr3 = {RestConstant.TABLE_SOURCE_RECEIVED_COUNT, RestConstant.TABLE_SINK_WRITE_COUNT, RestConstant.TABLE_SOURCE_RECEIVED_BYTES, RestConstant.TABLE_SINK_WRITE_BYTES};
        String[] strArr4 = {RestConstant.TABLE_SOURCE_RECEIVED_QPS, RestConstant.TABLE_SINK_WRITE_QPS, RestConstant.TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS, RestConstant.TABLE_SINK_WRITE_BYTES_PER_SECONDS};
        Long[] lArr = (Long[]) Stream.generate(() -> {
            return 0L;
        }).limit(strArr.length).toArray(i -> {
            return new Long[i];
        });
        Double[] dArr = (Double[]) Stream.generate(() -> {
            return Double.valueOf(0.0d);
        }).limit(strArr2.length).toArray(i2 -> {
            return new Double[i2];
        });
        Map[] mapArr = {new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashMap()};
        try {
            JsonNode readTree = new ObjectMapper().readTree(str);
            readTree.fieldNames().forEachRemaining(str2 -> {
                if (str2.contains("#")) {
                    processMetric(str2, TablePath.of(str2.split("#")[1]).getFullName(), readTree.get(str2), mapArr);
                }
            });
            aggregateMetrics(readTree, lArr, dArr, (String[]) ArrayUtils.addAll(strArr, strArr2));
            populateMetricsMap(hashMap, mapArr, (String[]) ArrayUtils.addAll(strArr3, strArr4), strArr.length);
            populateMetricsMap(hashMap, Stream.concat(Arrays.stream(lArr), Arrays.stream(dArr)).toArray(i3 -> {
                return new Number[i3];
            }), (String[]) ArrayUtils.addAll(strArr, strArr2), lArr.length);
            return hashMap;
        } catch (JsonProcessingException e) {
            return hashMap;
        }
    }

    private void processMetric(String str, String str2, JsonNode jsonNode, Map<String, JsonNode>[] mapArr) {
        if (jsonNode == null) {
            return;
        }
        if (str.startsWith("SourceReceivedCount#")) {
            mapArr[0].put(str2, jsonNode);
            return;
        }
        if (str.startsWith("SinkWriteCount#")) {
            mapArr[1].put(str2, jsonNode);
            return;
        }
        if (str.startsWith("SourceReceivedBytes#")) {
            mapArr[2].put(str2, jsonNode);
            return;
        }
        if (str.startsWith("SinkWriteBytes#")) {
            mapArr[3].put(str2, jsonNode);
            return;
        }
        if (str.startsWith("SourceReceivedQPS#")) {
            mapArr[4].put(str2, jsonNode);
            return;
        }
        if (str.startsWith("SinkWriteQPS#")) {
            mapArr[5].put(str2, jsonNode);
        } else if (str.startsWith("SourceReceivedBytesPerSeconds#")) {
            mapArr[6].put(str2, jsonNode);
        } else if (str.startsWith("SinkWriteBytesPerSeconds#")) {
            mapArr[7].put(str2, jsonNode);
        }
    }

    private void aggregateMetrics(JsonNode jsonNode, Long[] lArr, Double[] dArr, String[] strArr) {
        for (int i = 0; i < strArr.length; i++) {
            JsonNode jsonNode2 = jsonNode.get(strArr[i]);
            if (jsonNode2 != null && jsonNode2.isArray()) {
                Iterator it = jsonNode2.iterator();
                while (it.hasNext()) {
                    JsonNode jsonNode3 = (JsonNode) it.next();
                    if (i < lArr.length) {
                        int i2 = i;
                        lArr[i2] = Long.valueOf(lArr[i2].longValue() + jsonNode3.path("value").asLong());
                    } else {
                        int length = i - lArr.length;
                        dArr[length] = Double.valueOf(dArr[length].doubleValue() + jsonNode3.path("value").asDouble());
                    }
                }
            }
        }
    }

    private void populateMetricsMap(Map<String, Object> map, Object[] objArr, String[] strArr, int i) {
        int i2 = 0;
        while (i2 < objArr.length) {
            if (objArr[i2] != null) {
                if (objArr[i2] instanceof Map) {
                    map.put(strArr[i2], aggregateMap((Map) objArr[i2], i2 >= i));
                } else {
                    map.put(strArr[i2], objArr[i2]);
                }
            }
            i2++;
        }
    }

    private Map<String, Object> aggregateMap(Map<String, JsonNode> map, boolean z) {
        return z ? (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Double.valueOf(StreamSupport.stream(((JsonNode) entry.getValue()).spliterator(), false).mapToDouble(jsonNode -> {
                return jsonNode.path("value").asDouble();
            }).sum());
        })) : (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            return Long.valueOf(StreamSupport.stream(((JsonNode) entry2.getValue()).spliterator(), false).mapToLong(jsonNode -> {
                return jsonNode.path("value").asLong();
            }).sum());
        }));
    }

    private JsonObject metricsToJsonObject(Map<String, Object> map) {
        JsonObject jsonObject = new JsonObject();
        map.forEach((str, obj) -> {
            if (obj instanceof Map) {
                jsonObject.add(str, metricsToJsonObject((Map) obj));
            } else {
                jsonObject.add(str, obj.toString());
            }
        });
        return jsonObject;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonNode requestHandle(byte[] bArr) {
        if (bArr.length == 0) {
            throw new IllegalArgumentException("Request body is empty.");
        }
        try {
            return RestUtil.convertByteToJsonNode(bArr);
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid JSON format in request body.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleStopJob(Map<String, Object> map, SeaTunnelServer seaTunnelServer, Node node) {
        boolean z = false;
        if (map.get(RestConstant.JOB_ID) == null) {
            throw new IllegalArgumentException("jobId cannot be empty.");
        }
        long parseLong = Long.parseLong(map.get(RestConstant.JOB_ID).toString());
        if (map.get(RestConstant.IS_STOP_WITH_SAVE_POINT) != null) {
            z = Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
        }
        if (!seaTunnelServer.isMasterNode()) {
            if (z) {
                NodeEngineUtil.sendOperationToMasterNode(node.nodeEngine, new SavePointJobOperation(parseLong)).join();
                return;
            } else {
                NodeEngineUtil.sendOperationToMasterNode(node.nodeEngine, new CancelJobOperation(parseLong)).join();
                return;
            }
        }
        CoordinatorService coordinatorService = seaTunnelServer.getCoordinatorService();
        if (z) {
            coordinatorService.savePoint(parseLong);
        } else {
            coordinatorService.cancelJob(parseLong);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String mapToUrlParams(Map<String, String> map) {
        return (String) map.entrySet().stream().map(entry -> {
            return ((String) entry.getKey()) + "=" + ((String) entry.getValue());
        }).collect(Collectors.joining("&", "?", ""));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonObject submitJobInternal(Config config, Map<String, String> map, SeaTunnelServer seaTunnelServer, Node node) {
        String str = (String) ReadonlyConfig.fromConfig(config.getConfig("env")).get(EnvCommonOptions.JOB_NAME);
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName(StringUtils.isEmpty(map.get(RestConstant.JOB_NAME)) ? str : map.get(RestConstant.JOB_NAME));
        boolean parseBoolean = Boolean.parseBoolean(map.get(RestConstant.IS_START_WITH_SAVE_POINT));
        String str2 = map.get(RestConstant.JOB_ID);
        JobImmutableInformation build = new RestJobExecutionEnvironment(seaTunnelServer, jobConfig, config, node, parseBoolean, StringUtils.isNotBlank(str2) ? Long.valueOf(Long.parseLong(str2)) : null).build();
        long jobId = build.getJobId();
        if (seaTunnelServer.isMasterNode()) {
            submitJob(node, seaTunnelServer, build, jobConfig);
        } else {
            NodeEngineUtil.sendOperationToMasterNode(node.nodeEngine, new SubmitJobOperation(jobId, node.nodeEngine.toData(build), build.isStartWithSavePoint())).join();
        }
        return new JsonObject().add(RestConstant.JOB_ID, String.valueOf(jobId)).add(RestConstant.JOB_NAME, jobConfig.getName());
    }

    private void submitJob(Node node, SeaTunnelServer seaTunnelServer, JobImmutableInformation jobImmutableInformation, JobConfig jobConfig) {
        seaTunnelServer.getCoordinatorService().submitJob(Long.parseLong(jobConfig.getJobContext().getJobId()), node.nodeEngine.getSerializationService().toData(jobImmutableInformation), jobImmutableInformation.isStartWithSavePoint()).join();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonArray getSystemMonitoringInformationJsonValues() {
        return (JsonArray) this.nodeEngine.getHazelcastInstance().getCluster().getMembers().stream().map(member -> {
            String str = null;
            try {
                str = (String) NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, new GetClusterHealthMetricsOperation(), member.getAddress()).get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("Failed to get cluster health metrics", e);
            }
            String[] split = str.split(", ");
            JsonObject jsonObject = new JsonObject();
            Arrays.stream(split).forEach(str2 -> {
                String[] split2 = str2.split("=");
                jsonObject.add(split2[0], split2[1]);
            });
            return jsonObject;
        }).collect(JsonArray::new, (v0, v1) -> {
            v0.add(v1);
        }, (v0, v1) -> {
            v0.add(v1);
        });
    }
}
