/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.checkpoint;

import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinatorState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
import org.apache.seatunnel.engine.server.checkpoint.IMapCheckpointIDCounter;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointManager {
    private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
    private final Long jobId;
    private final NodeEngine nodeEngine;
    private final Map<Integer, CheckpointCoordinator> coordinatorMap;
    private final CheckpointStorage checkpointStorage;
    private final JobMaster jobMaster;
    private final ExecutorService executorService;

    public CheckpointManager(long jobId, boolean isStartWithSavePoint, NodeEngine nodeEngine, JobMaster jobMaster, Map<Integer, CheckpointPlan> checkpointPlanMap, CheckpointConfig checkpointConfig, ExecutorService executorService, IMap<Object, Object> runningJobStateIMap) throws CheckpointStorageException {
        this.executorService = executorService;
        this.jobId = jobId;
        this.nodeEngine = nodeEngine;
        this.jobMaster = jobMaster;
        this.checkpointStorage = ((CheckpointStorageFactory)FactoryUtil.discoverFactory((ClassLoader)Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, (String)checkpointConfig.getStorage().getStorage())).create(checkpointConfig.getStorage().getStoragePluginConfig());
        this.coordinatorMap = MDCTracer.tracing(checkpointPlanMap.values().parallelStream()).map(plan -> {
            IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(jobId, plan.getPipelineId(), nodeEngine);
            try {
                idCounter.start();
                PipelineState pipelineState = null;
                if (isStartWithSavePoint && (pipelineState = this.checkpointStorage.getLatestCheckpointByJobIdAndPipelineId(String.valueOf(jobId), String.valueOf(plan.getPipelineId()))) != null) {
                    long checkpointId = pipelineState.getCheckpointId();
                    idCounter.setCount(checkpointId + 1L);
                    log.info("pipeline({}) start with savePoint on checkPointId({})", (Object)plan.getPipelineId(), (Object)checkpointId);
                }
                return new CheckpointCoordinator(this, this.checkpointStorage, checkpointConfig, jobId, (CheckpointPlan)plan, idCounter, pipelineState, executorService, runningJobStateIMap, isStartWithSavePoint);
            }
            catch (Exception e) {
                ExceptionUtil.sneakyThrow((Throwable)e);
                throw new RuntimeException("Never throw here.");
            }
        }).collect(Collectors.toMap(CheckpointCoordinator::getPipelineId, Function.identity()));
    }

    public PassiveCompletableFuture<CompletedCheckpoint>[] triggerSavePoints() {
        return (PassiveCompletableFuture[])MDCTracer.tracing(this.coordinatorMap.values().parallelStream()).map(CheckpointCoordinator::startSavepoint).toArray(PassiveCompletableFuture[]::new);
    }

    public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) {
        log.debug("reported pipeline running stack: {}", (Object)Arrays.toString(Thread.currentThread().getStackTrace()));
        this.getCheckpointCoordinator(pipelineId).restoreCoordinator(alreadyStarted);
    }

    protected void handleCheckpointError(int pipelineId, boolean neverRestore) {
        this.jobMaster.handleCheckpointError(pipelineId, neverRestore);
    }

    private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
        return this.getCheckpointCoordinator(taskLocation.getPipelineId());
    }

    public void reportCheckpointErrorFromTask(TaskLocation taskLocation, String errorMsg) {
        this.getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(errorMsg);
    }

    @VisibleForTesting
    public CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
        CheckpointCoordinator coordinator = this.coordinatorMap.get(pipelineId);
        if (coordinator == null) {
            throw new RuntimeException(String.format("The checkpoint coordinator(%s) don't exist", pipelineId));
        }
        return coordinator;
    }

    public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
        log.debug("reported task({}) status {}", (Object)reportStatusOperation.getLocation().getTaskID(), (Object)reportStatusOperation.getStatus());
        this.getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
    }

    public void readyToClose(TaskLocation taskLocation) {
        this.getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
    }

    public void readyToCloseIdleTask(TaskLocation taskLocation) {
        this.getCheckpointCoordinator(taskLocation).readyToCloseIdleTask(taskLocation);
    }

    public CompletableFuture<Void> listenPipeline(int pipelineId, PipelineStatus pipelineStatus) {
        return this.getCheckpointCoordinator(pipelineId).getCheckpointIdCounter().shutdown(pipelineStatus);
    }

    public void clearCheckpointIfNeed(JobStatus jobStatus) {
        if (!(jobStatus != JobStatus.FINISHED && jobStatus != JobStatus.CANCELED || this.isSavePointEnd())) {
            this.checkpointStorage.deleteCheckpoint(this.jobId + "");
        }
    }

    public boolean isCompletedPipeline(int pipelineId) {
        return this.getCheckpointCoordinator(pipelineId).isNoErrorCompleted();
    }

    public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
        log.debug("checkpoint manager received ack {}", (Object)ackOperation.getTaskLocation());
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator(ackOperation.getTaskLocation());
        if (coordinator.isCompleted()) {
            log.info("The checkpoint coordinator({}) is completed", (Object)ackOperation.getTaskLocation().getPipelineId());
            return;
        }
        coordinator.acknowledgeTask(ackOperation);
    }

    public void triggerSchemaChangeBeforeCheckpoint(TriggerSchemaChangeBeforeCheckpointOperation operation) {
        log.debug("checkpoint manager received schema-change-before checkpoint operation {}", (Object)operation.getTaskLocation());
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator(operation.getTaskLocation());
        if (coordinator.isCompleted()) {
            log.info("The checkpoint coordinator({}) is completed", (Object)operation.getTaskLocation().getPipelineId());
            return;
        }
        coordinator.scheduleSchemaChangeBeforeCheckpoint();
    }

    public void triggerSchemaChangeAfterCheckpoint(TriggerSchemaChangeAfterCheckpointOperation operation) {
        log.debug("checkpoint manager received schema-change-after checkpoint operation {}", (Object)operation.getTaskLocation());
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator(operation.getTaskLocation());
        if (coordinator.isCompleted()) {
            log.info("The checkpoint coordinator({}) is completed", (Object)operation.getTaskLocation().getPipelineId());
            return;
        }
        coordinator.scheduleSchemaChangeAfterCheckpoint();
    }

    public boolean isSavePointEnd() {
        return this.coordinatorMap.values().stream().map(CheckpointCoordinator::isEndOfSavePoint).reduce((v1, v2) -> v1 != false && v2 != false).orElse(false);
    }

    public boolean isPipelineSavePointEnd(PipelineLocation pipelineLocation) {
        return this.coordinatorMap.get(pipelineLocation.getPipelineId()).isEndOfSavePoint();
    }

    protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
        log.debug("Sead Operation : " + ((Object)((Object)operation)).getClass().getSimpleName() + " to " + this.jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation()) + " for task group:" + operation.getTaskLocation().getTaskGroupLocation());
        return NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, operation, this.jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation()));
    }

    public PassiveCompletableFuture<CheckpointCoordinatorState> cancelCheckpoint(int pipelineId) {
        return this.getCheckpointCoordinator(pipelineId).cancelCheckpoint();
    }

    public PassiveCompletableFuture<CheckpointCoordinatorState> waitCheckpointCoordinatorComplete(int pipelineId) {
        return this.getCheckpointCoordinator(pipelineId).waitCheckpointCoordinatorComplete();
    }
}

