package org.apache.seatunnel.engine.server.checkpoint;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.class */
public class CheckpointManager {
    private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
    private final Long jobId;
    private final NodeEngine nodeEngine;
    private final Map<Integer, CheckpointCoordinator> coordinatorMap;
    private final CheckpointStorage checkpointStorage;
    private final JobMaster jobMaster;
    private final ExecutorService executorService;

    public CheckpointManager(long j, boolean z, NodeEngine nodeEngine, JobMaster jobMaster, Map<Integer, CheckpointPlan> map, CheckpointConfig checkpointConfig, ExecutorService executorService, IMap<Object, Object> iMap) throws CheckpointStorageException {
        this.executorService = executorService;
        this.jobId = Long.valueOf(j);
        this.nodeEngine = nodeEngine;
        this.jobMaster = jobMaster;
        this.checkpointStorage = ((CheckpointStorageFactory) FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())).create(checkpointConfig.getStorage().getStoragePluginConfig());
        this.coordinatorMap = (Map) map.values().parallelStream().map(checkpointPlan -> {
            IMapCheckpointIDCounter iMapCheckpointIDCounter = new IMapCheckpointIDCounter(Long.valueOf(j), Integer.valueOf(checkpointPlan.getPipelineId()), nodeEngine);
            try {
                iMapCheckpointIDCounter.start();
                PipelineState pipelineState = null;
                if (z) {
                    pipelineState = this.checkpointStorage.getLatestCheckpointByJobIdAndPipelineId(String.valueOf(j), String.valueOf(checkpointPlan.getPipelineId()));
                    if (pipelineState != null) {
                        long checkpointId = pipelineState.getCheckpointId();
                        iMapCheckpointIDCounter.setCount(checkpointId + 1);
                        log.info("pipeline({}) start with savePoint on checkPointId({})", Integer.valueOf(checkpointPlan.getPipelineId()), Long.valueOf(checkpointId));
                    }
                }
                return new CheckpointCoordinator(this, this.checkpointStorage, checkpointConfig, j, checkpointPlan, iMapCheckpointIDCounter, pipelineState, executorService, iMap, z);
            } catch (Exception e) {
                ExceptionUtil.sneakyThrow(e);
                throw new RuntimeException("Never throw here.");
            }
        }).collect(Collectors.toMap((v0) -> {
            return v0.getPipelineId();
        }, Function.identity()));
    }

    public PassiveCompletableFuture<CompletedCheckpoint>[] triggerSavePoints() {
        return (PassiveCompletableFuture[]) this.coordinatorMap.values().parallelStream().map((v0) -> {
            return v0.startSavepoint();
        }).toArray(i -> {
            return new PassiveCompletableFuture[i];
        });
    }

    public void reportedPipelineRunning(int i, boolean z) {
        log.info("reported pipeline running stack: " + Arrays.toString(Thread.currentThread().getStackTrace()));
        getCheckpointCoordinator(i).restoreCoordinator(z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleCheckpointError(int i, boolean z) {
        this.jobMaster.handleCheckpointError(i, z);
    }

    private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
        return getCheckpointCoordinator(taskLocation.getPipelineId());
    }

    public void reportCheckpointErrorFromTask(TaskLocation taskLocation, String str) {
        getCheckpointCoordinator(taskLocation).reportCheckpointErrorFromTask(str);
    }

    @VisibleForTesting
    public CheckpointCoordinator getCheckpointCoordinator(int i) {
        CheckpointCoordinator checkpointCoordinator = this.coordinatorMap.get(Integer.valueOf(i));
        if (checkpointCoordinator == null) {
            throw new RuntimeException(String.format("The checkpoint coordinator(%s) don't exist", Integer.valueOf(i)));
        }
        return checkpointCoordinator;
    }

    public void reportedTask(TaskReportStatusOperation taskReportStatusOperation) {
        log.debug("reported task({}) status {}", Long.valueOf(taskReportStatusOperation.getLocation().getTaskID()), taskReportStatusOperation.getStatus());
        getCheckpointCoordinator(taskReportStatusOperation.getLocation()).reportedTask(taskReportStatusOperation);
    }

    public void readyToClose(TaskLocation taskLocation) {
        getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
    }

    public CompletableFuture<Void> listenPipeline(int i, PipelineStatus pipelineStatus) {
        return getCheckpointCoordinator(i).getCheckpointIdCounter().shutdown(pipelineStatus);
    }

    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
        if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED) && !isSavePointEnd()) {
            this.checkpointStorage.deleteCheckpoint(this.jobId + "");
        }
        return CompletableFuture.completedFuture(null);
    }

    public boolean isCompletedPipeline(int i) {
        return getCheckpointCoordinator(i).isNoErrorCompleted();
    }

    public void acknowledgeTask(TaskAcknowledgeOperation taskAcknowledgeOperation) {
        log.debug("checkpoint manager received ack {}", taskAcknowledgeOperation.getTaskLocation());
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(taskAcknowledgeOperation.getTaskLocation());
        if (checkpointCoordinator.isCompleted()) {
            log.info("The checkpoint coordinator({}) is completed", Integer.valueOf(taskAcknowledgeOperation.getTaskLocation().getPipelineId()));
        } else {
            checkpointCoordinator.acknowledgeTask(taskAcknowledgeOperation);
        }
    }

    public void triggerSchemaChangeBeforeCheckpoint(TriggerSchemaChangeBeforeCheckpointOperation triggerSchemaChangeBeforeCheckpointOperation) {
        log.debug("checkpoint manager received schema-change-before checkpoint operation {}", triggerSchemaChangeBeforeCheckpointOperation.getTaskLocation());
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(triggerSchemaChangeBeforeCheckpointOperation.getTaskLocation());
        if (checkpointCoordinator.isCompleted()) {
            log.info("The checkpoint coordinator({}) is completed", Integer.valueOf(triggerSchemaChangeBeforeCheckpointOperation.getTaskLocation().getPipelineId()));
        } else {
            checkpointCoordinator.scheduleSchemaChangeBeforeCheckpoint();
        }
    }

    public void triggerSchemaChangeAfterCheckpoint(TriggerSchemaChangeAfterCheckpointOperation triggerSchemaChangeAfterCheckpointOperation) {
        log.debug("checkpoint manager received schema-change-after checkpoint operation {}", triggerSchemaChangeAfterCheckpointOperation.getTaskLocation());
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(triggerSchemaChangeAfterCheckpointOperation.getTaskLocation());
        if (checkpointCoordinator.isCompleted()) {
            log.info("The checkpoint coordinator({}) is completed", Integer.valueOf(triggerSchemaChangeAfterCheckpointOperation.getTaskLocation().getPipelineId()));
        } else {
            checkpointCoordinator.scheduleSchemaChangeAfterCheckpoint();
        }
    }

    public boolean isSavePointEnd() {
        return ((Boolean) this.coordinatorMap.values().stream().map((v0) -> {
            return v0.isEndOfSavePoint();
        }).reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).orElse(false)).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InvocationFuture<?> sendOperationToMemberNode(TaskOperation taskOperation) {
        log.debug("Sead Operation : " + taskOperation.getClass().getSimpleName() + " to " + this.jobMaster.queryTaskGroupAddress(taskOperation.getTaskLocation().getTaskGroupLocation()) + " for task group:" + taskOperation.getTaskLocation().getTaskGroupLocation());
        return NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, taskOperation, this.jobMaster.queryTaskGroupAddress(taskOperation.getTaskLocation().getTaskGroupLocation()));
    }

    public PassiveCompletableFuture<CheckpointCoordinatorState> cancelCheckpoint(int i) {
        return getCheckpointCoordinator(i).cancelCheckpoint();
    }

    public PassiveCompletableFuture<CheckpointCoordinatorState> waitCheckpointCoordinatorComplete(int i) {
        return getCheckpointCoordinator(i).waitCheckpointCoordinatorComplete();
    }
}
