package org.apache.seatunnel.engine.server;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.api.event.EventProcessor;
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.event.JobEventHttpReportHandler;
import org.apache.seatunnel.engine.server.event.JobEventProcessor;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.PendingSourceState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.JobCounter;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.PeekBlockingQueue;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import scala.Tuple2;

/* loaded from: input_file:org/apache/seatunnel/engine/server/CoordinatorService.class */
public class CoordinatorService {
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private volatile ResourceManager resourceManager;
    private JobHistoryService jobHistoryService;
    private IMap<Long, JobInfo> runningJobInfoIMap;
    private IMap<Object, Object> runningJobStateIMap;
    private IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
    private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;
    private ExecutorService executorService;
    private final SeaTunnelServer seaTunnelServer;
    private final ScheduledExecutorService masterActiveListener;
    private final EngineConfig engineConfig;
    private ConnectorPackageService connectorPackageService;
    private EventProcessor eventProcessor;
    private PassiveCompletableFuture restoreAllJobFromMasterNodeSwitchFuture;
    private final boolean isWaitStrategy;
    private final ScheduleStrategy scheduleStrategy;
    private final Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap();
    private final Map<Long, Tuple2<PendingSourceState, JobMaster>> pendingJobMasterMap = new ConcurrentHashMap();
    private volatile boolean isActive = false;
    private PeekBlockingQueue<JobMaster> pendingJob = new PeekBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.seatunnel.engine.server.CoordinatorService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/seatunnel/engine/server/CoordinatorService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus = new int[JobStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.SCHEDULED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.FAILING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.CANCELING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.CANCELED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[JobStatus.FINISHED.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    public CoordinatorService(@NonNull NodeEngineImpl nodeEngineImpl, @NonNull SeaTunnelServer seaTunnelServer, EngineConfig engineConfig) {
        if (nodeEngineImpl == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (seaTunnelServer == null) {
            throw new NullPointerException("seaTunnelServer is marked non-null but is null");
        }
        this.nodeEngine = nodeEngineImpl;
        this.engineConfig = engineConfig;
        this.logger = nodeEngineImpl.getLogger(getClass());
        this.executorService = new ThreadPoolExecutor(engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(), engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(), 60L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setNameFormat("seatunnel-coordinator-service-%d").build(), new ThreadPoolStatus.RejectionCountingHandler());
        this.seaTunnelServer = seaTunnelServer;
        this.masterActiveListener = Executors.newSingleThreadScheduledExecutor();
        this.masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0L, 100L, TimeUnit.MILLISECONDS);
        this.scheduleStrategy = engineConfig.getScheduleStrategy();
        this.isWaitStrategy = this.scheduleStrategy.equals(ScheduleStrategy.WAIT);
        this.logger.info("Start pending job schedule thread");
        startPendingJobScheduleThread();
    }

    private void startPendingJobScheduleThread() {
        this.executorService.submit(() -> {
            Thread.currentThread().setName("pending-job-schedule-runner");
            while (true) {
                try {
                    try {
                        pendingJobSchedule();
                        this.pendingJob.release();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                } catch (Throwable th) {
                    this.pendingJob.release();
                    throw th;
                }
            }
        });
    }

    private void pendingJobSchedule() throws InterruptedException {
        JobMaster peekBlocking = this.pendingJob.peekBlocking();
        if (Objects.isNull(peekBlocking)) {
            this.logger.warning("The peek job master is null");
            Thread.sleep(3000L);
            return;
        }
        this.logger.fine(String.format("Start pending job schedule, pendingJob Size : %s", this.pendingJob.size()));
        Long jobId = peekBlocking.getJobId();
        this.logger.fine(String.format("Start calculating whether pending task resources are enough: %s", jobId));
        if (peekBlocking.preApplyResources()) {
            this.logger.info(String.format("Resources enough, start running: %s", jobId));
            queueRemove(peekBlocking);
            PendingSourceState pendingSourceState = (PendingSourceState) this.pendingJobMasterMap.get(jobId)._1;
            MDCTracer.tracing(jobId, this.executorService).submit(() -> {
                try {
                    String jobFullName = peekBlocking.getPhysicalPlan().getJobFullName();
                    JobStatus jobStatus = (JobStatus) this.runningJobStateIMap.get(jobId);
                    if (pendingSourceState == PendingSourceState.RESTORE) {
                        peekBlocking.getPhysicalPlan().getPipelineList().forEach((v0) -> {
                            v0.restorePipelineState();
                        });
                    }
                    this.logger.info(String.format("The %s %s is in %s state, restore pipeline and take over this job running", pendingSourceState, jobFullName, jobStatus));
                    this.pendingJobMasterMap.remove(jobId);
                    this.runningJobMasterMap.put(jobId, peekBlocking);
                    peekBlocking.run();
                    if (jobMasterCompletedSuccessfully(peekBlocking, pendingSourceState)) {
                        this.runningJobMasterMap.remove(jobId);
                    }
                } catch (Throwable th) {
                    if (jobMasterCompletedSuccessfully(peekBlocking, pendingSourceState)) {
                        this.runningJobMasterMap.remove(jobId);
                    }
                    throw th;
                }
            });
            return;
        }
        this.logger.info(String.format("Current strategy is %s, and resources is not enough, skipping this schedule, JobID: %s", this.scheduleStrategy, jobId));
        if (!this.isWaitStrategy) {
            queueRemove(peekBlocking);
            completeFailJob(peekBlocking);
        } else {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                this.logger.severe(ExceptionUtils.getMessage(e));
            }
        }
    }

    private void queueRemove(JobMaster jobMaster) throws InterruptedException {
        if (this.pendingJob.take() != jobMaster) {
            this.logger.severe("The job master is not equal to the peek job master");
        }
    }

    private void completeFailJob(JobMaster jobMaster) {
        JobResult jobResult = new JobResult(JobStatus.FAILED, ExceptionUtils.getMessage(new NoEnoughResourceException()));
        jobMaster.getPhysicalPlan().updateJobState(JobStatus.FAILED);
        jobMaster.getPhysicalPlan().completeJobEndFuture(jobResult);
        this.logger.info(String.format("The job %s is not running because the resources is not enough insufficient", jobMaster.getJobId()));
    }

    private boolean jobMasterCompletedSuccessfully(JobMaster jobMaster, PendingSourceState pendingSourceState) {
        return (!jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally() && pendingSourceState == PendingSourceState.RESTORE) || (!jobMaster.getJobMasterCompleteFuture().isCancelled() && pendingSourceState == PendingSourceState.SUBMIT);
    }

    private JobEventProcessor createJobEventProcessor(String str, Map<String, String> map, NodeEngineImpl nodeEngineImpl) {
        List loadEventHandlers = EventProcessor.loadEventHandlers(Thread.currentThread().getContextClassLoader());
        if (str != null) {
            nodeEngineImpl.getHazelcastInstance().getConfig().addRingBufferConfig(new Config().getRingbufferConfig("zeta-job-event").setCapacity(2000).setBackupCount(0).setAsyncBackupCount(1).setTimeToLiveSeconds(0));
            loadEventHandlers.add(new JobEventHttpReportHandler(str, map, nodeEngineImpl.getHazelcastInstance().getRingbuffer("zeta-job-event")));
        }
        this.logger.info("Loaded event handlers: " + loadEventHandlers);
        return new JobEventProcessor(loadEventHandlers);
    }

    public JobHistoryService getJobHistoryService() {
        return this.jobHistoryService;
    }

    public JobMaster getJobMaster(Long l) {
        return (JobMaster) Optional.ofNullable(this.pendingJobMasterMap.get(l)).map(tuple2 -> {
            return (JobMaster) tuple2._2;
        }).orElse(this.runningJobMasterMap.get(l));
    }

    public EventProcessor getEventProcessor() {
        return this.eventProcessor;
    }

    private void initCoordinatorService() {
        this.runningJobInfoIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobInfo");
        this.runningJobStateIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobState");
        this.runningJobStateTimestampsIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_stateTimestamps");
        this.ownedSlotProfilesIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_ownedSlotProfilesIMap");
        this.metricsImap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobMetrics");
        this.jobHistoryService = new JobHistoryService(this.nodeEngine, this.runningJobStateIMap, this.logger, this.pendingJobMasterMap, this.runningJobMasterMap, this.nodeEngine.getHazelcastInstance().getMap("engine_finishedJobState"), this.nodeEngine.getHazelcastInstance().getMap("engine_finishedJobMetrics"), this.nodeEngine.getHazelcastInstance().getMap("engine_finishedJobVertexInfo"), this.engineConfig.getHistoryJobExpireMinutes());
        this.eventProcessor = createJobEventProcessor(this.engineConfig.getEventReportHttpApi(), this.engineConfig.getEventReportHttpHeaders(), this.nodeEngine);
        if (this.engineConfig.getConnectorJarStorageConfig().getEnable().booleanValue()) {
            this.connectorPackageService = new ConnectorPackageService(this.seaTunnelServer);
        }
        this.restoreAllJobFromMasterNodeSwitchFuture = new PassiveCompletableFuture(CompletableFuture.runAsync(this::restoreAllRunningJobFromMasterNodeSwitch, this.executorService));
    }

    private void restoreAllRunningJobFromMasterNodeSwitch() {
        List list = (List) this.runningJobInfoIMap.entrySet().stream().filter(entry -> {
            return !this.runningJobMasterMap.keySet().contains(entry.getKey());
        }).collect(Collectors.toList());
        if (list.size() == 0) {
            return;
        }
        while (getResourceManager().workerCount(Collections.emptyMap()) == 0) {
            try {
                this.logger.info("Waiting for worker registered");
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                this.logger.severe(ExceptionUtils.getMessage(e));
                throw new SeaTunnelEngineException("wait worker register error", e);
            }
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) ((List) list.stream().map(entry2 -> {
                return CompletableFuture.runAsync(() -> {
                    this.logger.info(String.format("begin restore job (%s) from master active switch", entry2.getKey()));
                    try {
                        if (!this.runningJobMasterMap.keySet().contains(entry2.getKey())) {
                            restoreJobFromMasterActiveSwitch((Long) entry2.getKey(), (JobInfo) entry2.getValue());
                        }
                    } catch (Exception e2) {
                        this.logger.severe(e2);
                    }
                    this.logger.info(String.format("restore job (%s) from master active switch finished", entry2.getKey()));
                }, MDCTracer.tracing((Long) entry2.getKey(), this.executorService));
            }).collect(Collectors.toList())).toArray(new CompletableFuture[0])).get();
        } catch (Exception e2) {
            this.logger.severe(ExceptionUtils.getMessage(e2));
            throw new SeaTunnelEngineException(e2);
        }
    }

    private void restoreJobFromMasterActiveSwitch(@NonNull Long l, @NonNull JobInfo jobInfo) {
        if (l == null) {
            throw new NullPointerException("jobId is marked non-null but is null");
        }
        if (jobInfo == null) {
            throw new NullPointerException("jobInfo is marked non-null but is null");
        }
        if (this.runningJobStateIMap.get(l) == null) {
            this.runningJobInfoIMap.remove(l);
            return;
        }
        JobMaster jobMaster = new JobMaster(l, jobInfo.getJobImmutableInformation(), this.nodeEngine, MDCTracer.tracing(l, this.executorService), getResourceManager(), getJobHistoryService(), this.runningJobStateIMap, this.runningJobStateTimestampsIMap, this.ownedSlotProfilesIMap, this.runningJobInfoIMap, this.metricsImap, this.engineConfig, this.seaTunnelServer);
        try {
            jobMaster.init(((JobInfo) this.runningJobInfoIMap.get(l)).getInitializationTimestamp().longValue(), true);
            this.pendingJobMasterMap.put(l, new Tuple2<>(PendingSourceState.RESTORE, jobMaster));
            this.pendingJob.put(jobMaster);
            jobMaster.getPhysicalPlan().updateJobState(JobStatus.PENDING);
            this.logger.info(String.format("The restore job enter pending queue, JobId: %s", l));
        } catch (Exception e) {
            throw new SeaTunnelEngineException(String.format("Job id %s init failed", l), e);
        }
    }

    private void checkNewActiveMaster() {
        try {
            if (!this.isActive && this.seaTunnelServer.isMasterNode()) {
                this.logger.info("This node become a new active master node, begin init coordinator service");
                if (this.executorService.isShutdown()) {
                    this.executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("seatunnel-coordinator-service-%d").build());
                }
                initCoordinatorService();
                this.isActive = true;
            } else if (this.isActive && !this.seaTunnelServer.isMasterNode()) {
                this.isActive = false;
                this.logger.info("This node become leave active master node, begin clear coordinator service");
                clearCoordinatorService();
            }
        } catch (Exception e) {
            this.isActive = false;
            this.logger.severe(ExceptionUtils.getMessage(e));
            throw new SeaTunnelEngineException("check new active master error, stop loop", e);
        }
    }

    public synchronized void clearCoordinatorService() {
        this.runningJobMasterMap.values().forEach((v0) -> {
            v0.interrupt();
        });
        if (this.isWaitStrategy) {
            this.pendingJobMasterMap.values().stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).map((v0) -> {
                return v0._2();
            }).forEach((v0) -> {
                v0.interrupt();
            });
            this.pendingJobMasterMap.clear();
        }
        this.executorService.shutdownNow();
        this.runningJobMasterMap.clear();
        try {
            this.executorService.awaitTermination(20L, TimeUnit.SECONDS);
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
            try {
                if (this.eventProcessor != null) {
                    this.eventProcessor.close();
                }
            } catch (Exception e) {
                throw new SeaTunnelEngineException("close event processor error", e);
            }
        } catch (InterruptedException e2) {
            throw new SeaTunnelEngineException("wait clean executor service error", e2);
        }
    }

    public ResourceManager getResourceManager() {
        if (this.resourceManager == null) {
            synchronized (this) {
                if (this.resourceManager == null) {
                    ResourceManager resourceManager = new ResourceManagerFactory(this.nodeEngine, this.engineConfig).getResourceManager();
                    resourceManager.init();
                    this.resourceManager = resourceManager;
                }
            }
        }
        return this.resourceManager;
    }

    public PassiveCompletableFuture<Void> submitJob(long j, Data data, boolean z) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (getJobMaster(Long.valueOf(j)) != null) {
            this.logger.warning(String.format("The job %s is currently running; no need to submit again.", Long.valueOf(j)));
            completableFuture.complete((Object) null);
            return new PassiveCompletableFuture<>(completableFuture);
        }
        MDCExecutorService tracing = MDCTracer.tracing(Long.valueOf(j), this.executorService);
        JobMaster jobMaster = new JobMaster(Long.valueOf(j), data, this.nodeEngine, tracing, getResourceManager(), getJobHistoryService(), this.runningJobStateIMap, this.runningJobStateTimestampsIMap, this.ownedSlotProfilesIMap, this.runningJobInfoIMap, this.metricsImap, this.engineConfig, this.seaTunnelServer);
        tracing.submit(() -> {
            if (!z) {
                try {
                    if (getJobHistoryService().getJobMetrics(Long.valueOf(j)) != JobMetrics.empty()) {
                        throw new JobException(String.format("The job id %s has already been submitted and is not starting with a savepoint.", Long.valueOf(j)));
                    }
                } catch (Throwable th) {
                    String message = ExceptionUtils.getMessage(th);
                    this.logger.severe(String.format("submit job %s error %s ", Long.valueOf(j), message));
                    completableFuture.completeExceptionally(new JobException(message));
                }
            }
            this.pendingJobMasterMap.put(Long.valueOf(j), new Tuple2<>(PendingSourceState.SUBMIT, jobMaster));
            this.runningJobInfoIMap.put(Long.valueOf(j), new JobInfo(Long.valueOf(System.currentTimeMillis()), data));
            jobMaster.init(((JobInfo) this.runningJobInfoIMap.get(Long.valueOf(j))).getInitializationTimestamp().longValue(), false);
            completableFuture.complete((Object) null);
            if (completableFuture.isCompletedExceptionally()) {
                this.runningJobInfoIMap.remove(Long.valueOf(j));
                this.runningJobMasterMap.remove(Long.valueOf(j));
            } else {
                this.pendingJob.put(jobMaster);
                jobMaster.getPhysicalPlan().updateJobState(JobStatus.PENDING);
                this.logger.info(String.format("The submit job enter the pending queue , jobId: %s , jobName: %s", Long.valueOf(j), jobMaster.getJobImmutableInformation().getJobName()));
            }
        });
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public PassiveCompletableFuture<Void> savePoint(long j) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.runningJobMasterMap.containsKey(Long.valueOf(j))) {
            completableFuture = new PassiveCompletableFuture(CompletableFuture.supplyAsync(() -> {
                if (!((Boolean) this.runningJobMasterMap.get(Long.valueOf(j)).savePoint().join()).booleanValue()) {
                    throw new SavePointFailedException("The job with id '" + j + "' save point failed");
                }
                try {
                    waitForJobComplete(j).get();
                    return null;
                } catch (Throwable th) {
                    this.logger.warning(String.format("The job with id '%s' waiting state complete failed", Long.valueOf(j)));
                    return null;
                }
            }, this.executorService));
        } else {
            SavePointFailedException savePointFailedException = new SavePointFailedException("The job with id '" + j + "' not running, save point failed");
            this.logger.warning(savePointFailedException);
            completableFuture.completeExceptionally(savePointFailedException);
        }
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public PassiveCompletableFuture<JobResult> waitForJobComplete(long j) {
        this.restoreAllJobFromMasterNodeSwitchFuture.join();
        JobMaster jobMaster = getJobMaster(Long.valueOf(j));
        if (jobMaster != null) {
            return new PassiveCompletableFuture<>(jobMaster.getJobMasterCompleteFuture());
        }
        try {
            JobHistoryService.JobState jobState = (JobHistoryService.JobState) CompletableFuture.supplyAsync(() -> {
                return this.jobHistoryService.getJobDetailState(Long.valueOf(j));
            }, this.executorService).get();
            CompletableFuture completableFuture = new CompletableFuture();
            if (jobState == null) {
                completableFuture.complete(new JobResult(JobStatus.UNKNOWABLE, (String) null));
            } else {
                completableFuture.complete(new JobResult(jobState.getJobStatus(), jobState.getErrorMessage()));
            }
            return new PassiveCompletableFuture<>(completableFuture);
        } catch (Exception e) {
            throw new SeaTunnelEngineException("get job state error", e);
        }
    }

    public PassiveCompletableFuture<Void> cancelJob(long j) {
        JobMaster jobMaster = getJobMaster(Long.valueOf(j));
        if (jobMaster != null) {
            return new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
                jobMaster.cancelJob();
                return null;
            }, this.executorService));
        }
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete((Object) null);
        return new PassiveCompletableFuture<>(completableFuture);
    }

    public JobStatus getJobStatus(long j) {
        if (this.pendingJobMasterMap.containsKey(Long.valueOf(j))) {
            return JobStatus.PENDING;
        }
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(j));
        if (jobMaster == null) {
            JobHistoryService.JobState jobDetailState = this.jobHistoryService.getJobDetailState(Long.valueOf(j));
            return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus();
        }
        JobStatus jobStatus = jobMaster.getJobStatus();
        return jobStatus == null ? ((JobHistoryService.JobState) this.jobHistoryService.getFinishedJobStateImap().get(Long.valueOf(j))).getJobStatus() : jobStatus;
    }

    public JobMetrics getJobMetrics(long j) {
        if (this.pendingJobMasterMap.containsKey(Long.valueOf(j))) {
            return JobMetrics.empty();
        }
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(j));
        if (jobMaster == null) {
            return this.jobHistoryService.getJobMetrics(Long.valueOf(j));
        }
        JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(jobMaster.getCurrJobMetrics());
        JobMetrics jobMetrics2 = this.jobHistoryService.getJobMetrics(Long.valueOf(j));
        return jobMetrics2 != JobMetrics.empty() ? jobMetrics2.merge(jobMetrics) : jobMetrics;
    }

    public Map<Long, JobMetrics> getRunningJobMetrics() {
        Set<Long> keySet = this.runningJobMasterMap.keySet();
        HashSet hashSet = new HashSet();
        this.ownedSlotProfilesIMap.forEach((pipelineLocation, map) -> {
            if (keySet.contains(Long.valueOf(pipelineLocation.getJobId()))) {
                map.values().forEach(slotProfile -> {
                    hashSet.add(slotProfile.getWorker());
                });
            }
        });
        ArrayList arrayList = new ArrayList();
        hashSet.forEach(address -> {
            try {
                if (this.nodeEngine.getClusterService().getMember(address) != null) {
                    arrayList.add((RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, new GetMetricsOperation(keySet), address).get());
                }
            } catch (HazelcastInstanceNotActiveException e) {
                this.logger.warning(String.format("get metrics with exception: %s.", ExceptionUtils.getMessage(e)));
            } catch (Exception e2) {
                throw new SeaTunnelException(e2.getMessage());
            }
        });
        Map<Long, JobMetrics> jobMetricsMap = JobMetricsUtil.toJobMetricsMap(arrayList);
        jobMetricsMap.forEach((l, jobMetrics) -> {
            JobMetrics jobMetrics = this.jobHistoryService.getJobMetrics(l);
            if (jobMetrics != JobMetrics.empty()) {
                jobMetricsMap.put(l, jobMetrics.merge(jobMetrics));
            }
        });
        return jobMetricsMap;
    }

    public JobDAGInfo getJobInfo(long j) {
        JobDAGInfo jobDAGInfo = this.jobHistoryService.getJobDAGInfo(Long.valueOf(j));
        return jobDAGInfo != null ? jobDAGInfo : this.runningJobMasterMap.get(Long.valueOf(j)).getJobDAGInfo();
    }

    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        this.logger.info(String.format("Received task end from execution %s, state %s", taskExecutionState.getTaskGroupLocation(), taskExecutionState.getExecutionState()));
        TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
        JobMaster jobMaster = this.runningJobMasterMap.get(Long.valueOf(taskGroupLocation.getJobId()));
        if (jobMaster == null) {
            throw new JobNotFoundException(String.format("Job %s not running", Long.valueOf(taskGroupLocation.getJobId())));
        }
        jobMaster.updateTaskExecutionState(taskExecutionState);
    }

    public void shutdown() {
        if (this.masterActiveListener != null) {
            this.masterActiveListener.shutdownNow();
        }
        clearCoordinatorService();
    }

    public boolean isCoordinatorActive() {
        return this.isActive;
    }

    public void failedTaskOnMemberRemoved(MembershipServiceEvent membershipServiceEvent) {
        Address address = membershipServiceEvent.getMember().getAddress();
        this.runningJobMasterMap.forEach((l, jobMaster) -> {
            jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
                makeTasksFailed(subPlan.getCoordinatorVertexList(), address);
                makeTasksFailed(subPlan.getPhysicalVertexList(), address);
            });
        });
    }

    private void makeTasksFailed(@NonNull List<PhysicalVertex> list, @NonNull Address address) {
        if (list == null) {
            throw new NullPointerException("physicalVertexList is marked non-null but is null");
        }
        if (address == null) {
            throw new NullPointerException("lostAddress is marked non-null but is null");
        }
        list.forEach(physicalVertex -> {
            Address currentExecutionAddress = physicalVertex.getCurrentExecutionAddress();
            ExecutionState executionState = physicalVertex.getExecutionState();
            if (null == currentExecutionAddress || !currentExecutionAddress.equals(address)) {
                return;
            }
            if (executionState.equals(ExecutionState.DEPLOYING) || executionState.equals(ExecutionState.RUNNING) || executionState.equals(ExecutionState.CANCELING)) {
                TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
                physicalVertex.updateStateByExecutionService(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, (Throwable) new JobException(String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation, address))));
            }
        });
    }

    public void memberRemoved(MembershipServiceEvent membershipServiceEvent) {
        if (isCoordinatorActive()) {
            getResourceManager().memberRemoved(membershipServiceEvent);
        }
        failedTaskOnMemberRemoved(membershipServiceEvent);
    }

    public void printExecutionInfo() {
        ThreadPoolStatus threadPoolStatusMetrics = getThreadPoolStatusMetrics();
        this.logger.info(StringFormatUtils.formatTable(new Object[]{"CoordinatorService Thread Pool Status", "activeCount", Integer.valueOf(threadPoolStatusMetrics.getActiveCount()), "corePoolSize", Integer.valueOf(threadPoolStatusMetrics.getCorePoolSize()), "maximumPoolSize", Integer.valueOf(threadPoolStatusMetrics.getMaximumPoolSize()), "poolSize", Integer.valueOf(threadPoolStatusMetrics.getPoolSize()), "completedTaskCount", Long.valueOf(threadPoolStatusMetrics.getCompletedTaskCount()), "taskCount", Long.valueOf(threadPoolStatusMetrics.getTaskCount())}));
    }

    public void printJobDetailInfo() {
        JobCounter jobCountMetrics = getJobCountMetrics();
        this.logger.info(StringFormatUtils.formatTable(new Object[]{"Job info detail", "createdJobCount", Long.valueOf(jobCountMetrics.getCreatedJobCount()), "scheduledJobCount", Long.valueOf(jobCountMetrics.getScheduledJobCount()), "runningJobCount", Long.valueOf(jobCountMetrics.getRunningJobCount()), "failingJobCount", Long.valueOf(jobCountMetrics.getFailingJobCount()), "failedJobCount", Long.valueOf(jobCountMetrics.getFailedJobCount()), "cancellingJobCount", Long.valueOf(jobCountMetrics.getCancellingJobCount()), "canceledJobCount", Long.valueOf(jobCountMetrics.getCanceledJobCount()), "finishedJobCount", Long.valueOf(jobCountMetrics.getFinishedJobCount())}));
    }

    public JobCounter getJobCountMetrics() {
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        AtomicLong atomicLong3 = new AtomicLong();
        AtomicLong atomicLong4 = new AtomicLong();
        AtomicLong atomicLong5 = new AtomicLong();
        AtomicLong atomicLong6 = new AtomicLong();
        AtomicLong atomicLong7 = new AtomicLong();
        AtomicLong atomicLong8 = new AtomicLong();
        if (this.jobHistoryService != null) {
            this.jobHistoryService.getJobStatusData().forEach(jobStatusData -> {
                switch (AnonymousClass1.$SwitchMap$org$apache$seatunnel$engine$core$job$JobStatus[jobStatusData.getJobStatus().ordinal()]) {
                    case 1:
                        atomicLong.addAndGet(1L);
                        return;
                    case 2:
                        atomicLong2.addAndGet(1L);
                        return;
                    case 3:
                        atomicLong3.addAndGet(1L);
                        return;
                    case 4:
                        atomicLong4.addAndGet(1L);
                        return;
                    case 5:
                        atomicLong5.addAndGet(1L);
                        return;
                    case 6:
                        atomicLong6.addAndGet(1L);
                        return;
                    case 7:
                        atomicLong7.addAndGet(1L);
                        return;
                    case 8:
                        atomicLong8.addAndGet(1L);
                        return;
                    default:
                        return;
                }
            });
        }
        return new JobCounter(atomicLong.longValue(), atomicLong2.longValue(), atomicLong3.longValue(), atomicLong4.longValue(), atomicLong5.longValue(), atomicLong6.longValue(), atomicLong7.longValue(), atomicLong8.longValue());
    }

    public ThreadPoolStatus getThreadPoolStatusMetrics() {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executorService;
        return new ThreadPoolStatus(threadPoolExecutor.getActiveCount(), threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getTaskCount(), threadPoolExecutor.getQueue().size(), ((ThreadPoolStatus.RejectionCountingHandler) threadPoolExecutor.getRejectedExecutionHandler()).getRejectionCount());
    }

    public ConnectorPackageService getConnectorPackageService() {
        if (this.connectorPackageService == null) {
            throw new SeaTunnelEngineException("The user is not configured to enable connector package service, can not get connector package service service from master node.");
        }
        return this.connectorPackageService;
    }
}
