package org.apache.seatunnel.engine.server.checkpoint;

import com.google.common.annotations.VisibleForTesting;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.serializer.api.Serializer;
import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointEndOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskStartOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.class */
public class CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private final long jobId;
    private final int pipelineId;
    private final CheckpointManager checkpointManager;
    private final CheckpointStorage checkpointStorage;
    private final CheckpointIDCounter checkpointIdCounter;
    private final transient Serializer serializer;
    private final Map<Long, Integer> pipelineTasks;
    private final Map<Long, SeaTunnelTaskState> pipelineTaskStatus;
    private final CheckpointPlan plan;
    private final Set<TaskLocation> readyToCloseStartingTask;
    private final ConcurrentHashMap<Long, PendingCheckpoint> pendingCheckpoints;
    private final ArrayDeque<String> completedCheckpointIds;
    private volatile CompletedCheckpoint latestCompletedCheckpoint;
    private final CheckpointConfig coordinatorConfig;
    private transient ScheduledExecutorService scheduler;
    private volatile boolean shutdown;
    private final ExecutorService executorService;
    private CompletableFuture<CheckpointCoordinatorState> checkpointCoordinatorFuture;
    private final IMap<Object, Object> runningJobStateIMap;
    private final String checkpointStateImapKey;
    private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
    private final AtomicInteger pendingCounter = new AtomicInteger(0);
    private final AtomicBoolean schemaChanging = new AtomicBoolean(false);
    private final Object lock = new Object();
    private volatile boolean isAllTaskReady = false;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();

    public CheckpointCoordinator(CheckpointManager checkpointManager, CheckpointStorage checkpointStorage, CheckpointConfig checkpointConfig, long j, CheckpointPlan checkpointPlan, CheckpointIDCounter checkpointIDCounter, PipelineState pipelineState, ExecutorService executorService, IMap<Object, Object> iMap, boolean z) {
        this.latestCompletedCheckpoint = null;
        this.executorService = executorService;
        this.checkpointManager = checkpointManager;
        this.checkpointStorage = checkpointStorage;
        this.jobId = j;
        this.pipelineId = checkpointPlan.getPipelineId();
        this.checkpointStateImapKey = "checkpoint_state_" + j + "_" + this.pipelineId;
        this.runningJobStateIMap = iMap;
        this.plan = checkpointPlan;
        this.coordinatorConfig = checkpointConfig;
        this.pendingCheckpoints = new ConcurrentHashMap<>();
        this.completedCheckpointIds = new ArrayDeque<>(this.coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
        this.scheduler = Executors.newScheduledThreadPool(2, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName(String.format("checkpoint-coordinator-%s/%s", Integer.valueOf(this.pipelineId), Long.valueOf(j)));
            return thread;
        });
        ((ScheduledThreadPoolExecutor) this.scheduler).setRemoveOnCancelPolicy(true);
        this.serializer = new ProtoStuffSerializer();
        this.pipelineTasks = getPipelineTasks(checkpointPlan.getPipelineSubtasks());
        this.pipelineTaskStatus = new ConcurrentHashMap();
        this.checkpointIdCounter = checkpointIDCounter;
        this.readyToCloseStartingTask = new CopyOnWriteArraySet();
        LOG.info("Create CheckpointCoordinator for job({}@{}) with plan({})", new Object[]{Integer.valueOf(this.pipelineId), Long.valueOf(j), checkpointPlan});
        if (pipelineState != null) {
            this.latestCompletedCheckpoint = (CompletedCheckpoint) this.serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class);
            this.latestCompletedCheckpoint.setRestored(true);
            LOG.info("Restore job({}@{}) with checkpoint({}), data: {}", new Object[]{Integer.valueOf(this.pipelineId), Long.valueOf(j), Long.valueOf(this.latestCompletedCheckpoint.getCheckpointId()), this.latestCompletedCheckpoint});
        }
        this.checkpointCoordinatorFuture = new CompletableFuture<>();
        CheckpointCoordinatorStatus checkpointCoordinatorStatus = (CheckpointCoordinatorStatus) iMap.get(this.checkpointStateImapKey);
        if (z) {
            updateStatus(CheckpointCoordinatorStatus.RUNNING);
            return;
        }
        if (checkpointCoordinatorStatus != null) {
            if (checkpointCoordinatorStatus.isEndState()) {
                this.checkpointCoordinatorFuture.complete(new CheckpointCoordinatorState(checkpointCoordinatorStatus, null));
            } else {
                updateStatus(CheckpointCoordinatorStatus.RUNNING);
            }
        }
    }

    public int getPipelineId() {
        return this.pipelineId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reportedTask(TaskReportStatusOperation taskReportStatusOperation) {
        this.pipelineTaskStatus.put(Long.valueOf(taskReportStatusOperation.getLocation().getTaskID()), taskReportStatusOperation.getStatus());
        CompletableFuture.runAsync(() -> {
            switch (taskReportStatusOperation.getStatus()) {
                case WAITING_RESTORE:
                    restoreTaskState(taskReportStatusOperation.getLocation());
                    return;
                case READY_START:
                    allTaskReady();
                    return;
                default:
                    return;
            }
        }, this.executorService).exceptionally(th -> {
            handleCoordinatorError("task running failed", th, CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
            return null;
        });
    }

    @VisibleForTesting
    public void handleCoordinatorError(String str, Throwable th, CheckpointCloseReason checkpointCloseReason) {
        LOG.error(str, th);
        handleCoordinatorError(checkpointCloseReason, th);
    }

    private void handleCoordinatorError(CheckpointCloseReason checkpointCloseReason, Throwable th) {
        this.errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage(new CheckpointException(checkpointCloseReason, th)));
        if (this.checkpointCoordinatorFuture.isDone()) {
            return;
        }
        cleanPendingCheckpoint(checkpointCloseReason);
        updateStatus(CheckpointCoordinatorStatus.FAILED);
        this.checkpointCoordinatorFuture.complete(new CheckpointCoordinatorState(CheckpointCoordinatorStatus.FAILED, this.errorByPhysicalVertex.get()));
        this.checkpointManager.handleCheckpointError(this.pipelineId, false);
    }

    private void restoreTaskState(TaskLocation taskLocation) {
        ArrayList arrayList = new ArrayList();
        if (this.latestCompletedCheckpoint != null) {
            Integer num = this.pipelineTasks.get(Long.valueOf(taskLocation.getTaskVertexId()));
            this.plan.getSubtaskActions().get(taskLocation).forEach(tuple2 -> {
                ActionState actionState = this.latestCompletedCheckpoint.getTaskStates().get(tuple2.f0());
                if (actionState == null) {
                    LOG.info("Not found task({}) state for key({})", taskLocation, tuple2.f0());
                    return;
                }
                if (CheckpointPlan.COORDINATOR_INDEX.equals(tuple2.f1())) {
                    arrayList.add(actionState.getCoordinatorState());
                    return;
                }
                int intValue = ((Integer) tuple2.f1()).intValue();
                while (true) {
                    int i = intValue;
                    if (i >= actionState.getParallelism()) {
                        return;
                    }
                    arrayList.add(actionState.getSubtaskStates().get(i));
                    intValue = i + num.intValue();
                }
            });
        }
        this.checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, arrayList)).join();
    }

    private void allTaskReady() {
        if (this.pipelineTaskStatus.size() != this.plan.getPipelineSubtasks().size()) {
            return;
        }
        Iterator<SeaTunnelTaskState> it = this.pipelineTaskStatus.values().iterator();
        while (it.hasNext()) {
            if (SeaTunnelTaskState.READY_START != it.next()) {
                return;
            }
        }
        this.isAllTaskReady = true;
        CompletableFuture.allOf(notifyTaskStart()).join();
        notifyCompleted(this.latestCompletedCheckpoint);
        if (!this.coordinatorConfig.isCheckpointEnable()) {
            LOG.info("checkpoint is disabled, because in batch mode and 'checkpoint.interval' of env is missing.");
        } else {
            LOG.info("checkpoint is enabled, start schedule trigger pending checkpoint.");
            scheduleTriggerPendingCheckpoint(this.coordinatorConfig.getCheckpointInterval());
        }
    }

    private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
        if (completedCheckpoint != null) {
            try {
                LOG.info("start notify checkpoint completed, job id: {}, pipeline id: {}, checkpoint id:{}", new Object[]{Long.valueOf(completedCheckpoint.getJobId()), Integer.valueOf(completedCheckpoint.getPipelineId()), Long.valueOf(completedCheckpoint.getCheckpointId())});
                CompletableFuture.allOf(notifyCheckpointCompleted(completedCheckpoint)).join();
                CompletableFuture.allOf(notifyCheckpointEnd(completedCheckpoint)).join();
            } catch (Throwable th) {
                handleCoordinatorError("notify checkpoint completed failed", th, CheckpointCloseReason.CHECKPOINT_NOTIFY_COMPLETE_FAILED);
            }
        }
    }

    public InvocationFuture<?>[] notifyTaskStart() {
        Stream<R> map = this.plan.getPipelineSubtasks().stream().map(NotifyTaskStartOperation::new);
        CheckpointManager checkpointManager = this.checkpointManager;
        checkpointManager.getClass();
        return (InvocationFuture[]) map.map((v1) -> {
            return r1.sendOperationToMemberNode(v1);
        }).toArray(i -> {
            return new InvocationFuture[i];
        });
    }

    public void reportCheckpointErrorFromTask(String str) {
        handleCoordinatorError("report error from task", new SeaTunnelException(str), CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
    }

    private void scheduleTriggerPendingCheckpoint(long j) {
        scheduleTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE, j);
    }

    private void scheduleTriggerPendingCheckpoint(CheckpointType checkpointType, long j) {
        this.scheduler.schedule(() -> {
            tryTriggerPendingCheckpoint(checkpointType);
        }, j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void readyToClose(TaskLocation taskLocation) {
        this.readyToCloseStartingTask.add(taskLocation);
        if (this.readyToCloseStartingTask.size() == this.plan.getStartingSubtasks().size()) {
            tryTriggerPendingCheckpoint(CheckpointType.COMPLETED_POINT_TYPE);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void restoreCoordinator(boolean z) {
        LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + z);
        this.errorByPhysicalVertex = new AtomicReference<>();
        this.checkpointCoordinatorFuture = new CompletableFuture<>();
        updateStatus(CheckpointCoordinatorStatus.RUNNING);
        cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET);
        this.shutdown = false;
        if (!z) {
            this.isAllTaskReady = false;
            return;
        }
        this.isAllTaskReady = true;
        notifyCompleted(this.latestCompletedCheckpoint);
        tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
    }

    protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
        if (Thread.currentThread().isInterrupted()) {
            LOG.warn("currentThread already be interrupted, skip trigger checkpoint");
            return;
        }
        long epochMilli = Instant.now().toEpochMilli();
        if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint() && (epochMilli - this.latestTriggerTimestamp.get() < this.coordinatorConfig.getCheckpointInterval() || !this.isAllTaskReady)) {
            return;
        }
        synchronized (this.lock) {
            if (isCompleted() || isShutdown()) {
                Logger logger = LOG;
                Object[] objArr = new Object[3];
                objArr[0] = checkpointType;
                objArr[1] = this.latestCompletedCheckpoint != null ? this.latestCompletedCheckpoint.getCheckpointType() : "null";
                objArr[2] = Boolean.valueOf(this.shutdown);
                logger.warn(String.format("can't trigger checkpoint with type: %s, because checkpoint coordinator already have last completed checkpoint: (%s) or shutdown (%b).", objArr));
                return;
            }
            if (this.schemaChanging.get() && checkpointType.isGeneralCheckpoint()) {
                LOG.info("skip trigger generic-checkpoint because schema change in progress");
                return;
            }
            if (this.pendingCounter.get() > 0) {
                scheduleTriggerPendingCheckpoint(checkpointType, 500L);
                LOG.debug("skip trigger checkpoint because there is already a pending checkpoint.");
                return;
            }
            startTriggerPendingCheckpoint(createPendingCheckpoint(epochMilli, checkpointType));
            this.pendingCounter.incrementAndGet();
            if (checkpointType.notFinalCheckpoint() && checkpointType.notSchemaChangeCheckpoint()) {
                scheduleTriggerPendingCheckpoint(this.coordinatorConfig.getCheckpointInterval());
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation> set) {
        return (Map) ((Map) set.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getTaskVertexId();
        }, Collectors.toList()))).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Integer.valueOf(((List) entry.getValue()).size());
        }));
    }

    public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
        CompletableFuture<PendingCheckpoint> createPendingCheckpoint;
        LOG.info(String.format("Start save point for Job (%s)", Long.valueOf(this.jobId)));
        if (!this.isAllTaskReady) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new CheckpointException(CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
            return new PassiveCompletableFuture<>(completableFuture);
        }
        synchronized (this.lock) {
            while (this.pendingCounter.get() > 0) {
                Thread.sleep(500L);
            }
            createPendingCheckpoint = createPendingCheckpoint(Instant.now().toEpochMilli(), CheckpointType.SAVEPOINT_TYPE);
            startTriggerPendingCheckpoint(createPendingCheckpoint);
        }
        PendingCheckpoint join = createPendingCheckpoint.join();
        LOG.info(String.format("The save point checkpointId is %s", Long.valueOf(join.getCheckpointId())));
        return join.getCompletableFuture();
    }

    private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> completableFuture) {
        completableFuture.thenAccept(pendingCheckpoint -> {
            LOG.info("wait checkpoint completed: " + pendingCheckpoint.getCheckpointId());
            pendingCheckpoint.getCompletableFuture().whenCompleteAsync((completedCheckpoint, th) -> {
                if (th != null) {
                    handleCoordinatorError("trigger checkpoint failed", th, CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
                    return;
                }
                if (completedCheckpoint == null) {
                    LOG.info("skip this checkpoint cause by completedCheckpoint is null");
                    return;
                }
                try {
                    completePendingCheckpoint(completedCheckpoint);
                } catch (Throwable th) {
                    handleCoordinatorError("complete checkpoint failed", th, CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
                }
            }, this.executorService);
            LOG.debug("trigger checkpoint barrier {}", pendingCheckpoint.getInfo());
            try {
                CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> {
                    return new CheckpointBarrier(pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointTimestamp(), pendingCheckpoint.getCheckpointType());
                }, this.executorService).thenApplyAsync(this::triggerCheckpoint, (Executor) this.executorService)).get();
                if (this.coordinatorConfig.isCheckpointEnable()) {
                    LOG.debug("Start a scheduled task to prevent checkpoint timeouts for barrier " + pendingCheckpoint.getInfo());
                    long checkpointTimeout = this.coordinatorConfig.getCheckpointTimeout();
                    if (pendingCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) {
                        checkpointTimeout = this.coordinatorConfig.getSchemaChangeCheckpointTimeout();
                    }
                    pendingCheckpoint.setCheckpointTimeOutFuture(this.scheduler.schedule(() -> {
                        if (this.pendingCheckpoints.get(Long.valueOf(pendingCheckpoint.getCheckpointId())) == null || pendingCheckpoint.isFullyAcknowledged()) {
                            return;
                        }
                        LOG.info("timeout checkpoint: " + pendingCheckpoint.getInfo());
                        handleCoordinatorError(CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
                    }, checkpointTimeout, TimeUnit.MILLISECONDS));
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (Exception e2) {
                LOG.error(ExceptionUtils.getMessage(e2));
            }
        });
    }

    CompletableFuture<PendingCheckpoint> createPendingCheckpoint(long j, CheckpointType checkpointType) {
        CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint;
        synchronized (this.lock) {
            triggerPendingCheckpoint = triggerPendingCheckpoint(j, checkpointType.notCompletedCheckpoint() ? CompletableFuture.supplyAsync(() -> {
                try {
                    return Long.valueOf(this.checkpointIdCounter.getAndIncrement());
                } catch (Throwable th) {
                    handleCoordinatorError("get checkpoint id failed", th, CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
                    throw new CompletionException(th);
                }
            }, this.executorService) : CompletableFuture.supplyAsync(() -> {
                return Barrier.PREPARE_CLOSE_BARRIER_ID;
            }, this.executorService), checkpointType);
        }
        return triggerPendingCheckpoint;
    }

    CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint(long j, CompletableFuture<Long> completableFuture, CheckpointType checkpointType) {
        if (!Thread.holdsLock(this.lock)) {
            throw new RuntimeException(String.format("Unsafe invoke, the current thread[%s] has not acquired the lock[%s].", Thread.currentThread().getName(), this.lock.toString()));
        }
        this.latestTriggerTimestamp.set(j);
        return completableFuture.thenApplyAsync(l -> {
            return new PendingCheckpoint(this.jobId, this.plan.getPipelineId(), l.longValue(), j, checkpointType, getNotYetAcknowledgedTasks(), getTaskStatistics(), getActionStates());
        }, (Executor) this.executorService).thenApplyAsync((Function<? super U, ? extends U>) pendingCheckpoint -> {
            this.pendingCheckpoints.put(Long.valueOf(pendingCheckpoint.getCheckpointId()), pendingCheckpoint);
            return pendingCheckpoint;
        }, (Executor) this.executorService);
    }

    private Set<Long> getNotYetAcknowledgedTasks() {
        return (Set) this.plan.getPipelineSubtasks().stream().map((v0) -> {
            return v0.getTaskID();
        }).collect(Collectors.toCollection(CopyOnWriteArraySet::new));
    }

    private Map<ActionStateKey, ActionState> getActionStates() {
        return (Map) this.plan.getPipelineActions().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new ActionState((ActionStateKey) entry.getKey(), ((Integer) entry.getValue()).intValue());
        }));
    }

    private Map<Long, TaskStatistics> getTaskStatistics() {
        return (Map) this.pipelineTasks.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new TaskStatistics((Long) entry.getKey(), ((Integer) entry.getValue()).intValue());
        }));
    }

    public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarrier) {
        Stream<R> map = this.plan.getStartingSubtasks().stream().map(taskLocation -> {
            return new CheckpointBarrierTriggerOperation(checkpointBarrier, taskLocation);
        });
        CheckpointManager checkpointManager = this.checkpointManager;
        checkpointManager.getClass();
        return (InvocationFuture[]) map.map((v1) -> {
            return r1.sendOperationToMemberNode(v1);
        }).toArray(i -> {
            return new InvocationFuture[i];
        });
    }

    protected void cleanPendingCheckpoint(CheckpointCloseReason checkpointCloseReason) {
        this.shutdown = true;
        this.isAllTaskReady = false;
        synchronized (this.lock) {
            LOG.info("start clean pending checkpoint cause {}", checkpointCloseReason.message());
            if (!this.pendingCheckpoints.isEmpty()) {
                this.pendingCheckpoints.values().forEach(pendingCheckpoint -> {
                    pendingCheckpoint.abortCheckpoint(checkpointCloseReason, null);
                });
                this.pendingCheckpoints.clear();
            }
            this.pipelineTaskStatus.clear();
            this.readyToCloseStartingTask.clear();
            this.pendingCounter.set(0);
            this.schemaChanging.set(false);
            this.scheduler.shutdownNow();
            this.scheduler = Executors.newScheduledThreadPool(2, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName(String.format("checkpoint-coordinator-%s/%s", Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId)));
                return thread;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void acknowledgeTask(TaskAcknowledgeOperation taskAcknowledgeOperation) {
        long id = taskAcknowledgeOperation.getBarrier().getId();
        PendingCheckpoint pendingCheckpoint = this.pendingCheckpoints.get(Long.valueOf(id));
        if (pendingCheckpoint == null) {
            LOG.info("skip already ack checkpoint " + id);
            return;
        }
        TaskLocation taskLocation = taskAcknowledgeOperation.getTaskLocation();
        LOG.debug("task[{}]({}/{}) ack. {}", new Object[]{Long.valueOf(taskLocation.getTaskID()), Integer.valueOf(taskLocation.getPipelineId()), Long.valueOf(taskLocation.getJobId()), taskAcknowledgeOperation.getBarrier().toString()});
        pendingCheckpoint.acknowledgeTask(taskLocation, taskAcknowledgeOperation.getStates(), pendingCheckpoint.getCheckpointType().isSavepoint() ? SubtaskStatus.SAVEPOINT_PREPARE_CLOSE : SubtaskStatus.RUNNING);
    }

    public synchronized void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
        LOG.debug("pending checkpoint({}/{}@{}) completed! cost: {}, trigger: {}, completed: {}", new Object[]{Long.valueOf(completedCheckpoint.getCheckpointId()), Integer.valueOf(completedCheckpoint.getPipelineId()), Long.valueOf(completedCheckpoint.getJobId()), Long.valueOf(completedCheckpoint.getCompletedTimestamp() - completedCheckpoint.getCheckpointTimestamp()), Long.valueOf(completedCheckpoint.getCheckpointTimestamp()), Long.valueOf(completedCheckpoint.getCompletedTimestamp())});
        long checkpointId = completedCheckpoint.getCheckpointId();
        this.completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId()));
        try {
            this.checkpointStorage.storeCheckPoint(PipelineState.builder().checkpointId(checkpointId).jobId(String.valueOf(this.jobId)).pipelineId(this.pipelineId).states(this.serializer.serialize(completedCheckpoint)).build());
            if (this.completedCheckpointIds.size() % this.coordinatorConfig.getStorage().getMaxRetainedCheckpoints() == 0 && this.completedCheckpointIds.size() / this.coordinatorConfig.getStorage().getMaxRetainedCheckpoints() > 1) {
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < this.coordinatorConfig.getStorage().getMaxRetainedCheckpoints(); i++) {
                    arrayList.add(this.completedCheckpointIds.removeFirst());
                }
                this.checkpointStorage.deleteCheckpoint(String.valueOf(completedCheckpoint.getJobId()), String.valueOf(completedCheckpoint.getPipelineId()), arrayList);
            }
        } catch (Throwable th) {
            LOG.error("store checkpoint states failed.", th);
            ExceptionUtil.sneakyThrow(th);
        }
        LOG.info("pending checkpoint({}/{}@{}) notify finished!", new Object[]{Long.valueOf(completedCheckpoint.getCheckpointId()), Integer.valueOf(completedCheckpoint.getPipelineId()), Long.valueOf(completedCheckpoint.getJobId())});
        this.latestCompletedCheckpoint = completedCheckpoint;
        notifyCompleted(completedCheckpoint);
        this.pendingCheckpoints.remove(Long.valueOf(checkpointId)).abortCheckpointTimeoutFutureWhenIsCompleted();
        this.pendingCounter.decrementAndGet();
        if (isCompleted()) {
            cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
            if (this.latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
                updateStatus(CheckpointCoordinatorStatus.SUSPEND);
                this.checkpointCoordinatorFuture.complete(new CheckpointCoordinatorState(CheckpointCoordinatorStatus.SUSPEND, null));
            } else {
                updateStatus(CheckpointCoordinatorStatus.FINISHED);
                this.checkpointCoordinatorFuture.complete(new CheckpointCoordinatorState(CheckpointCoordinatorStatus.FINISHED, null));
            }
        }
    }

    public InvocationFuture<?>[] notifyCheckpointCompleted(CompletedCheckpoint completedCheckpoint) {
        if (completedCheckpoint.getCheckpointType().isSchemaChangeAfterCheckpoint()) {
            completeSchemaChangeAfterCheckpoint(completedCheckpoint);
        }
        Stream<R> map = this.plan.getPipelineSubtasks().stream().map(taskLocation -> {
            return new CheckpointFinishedOperation(taskLocation, completedCheckpoint.getCheckpointId(), true);
        });
        CheckpointManager checkpointManager = this.checkpointManager;
        checkpointManager.getClass();
        return (InvocationFuture[]) map.map((v1) -> {
            return r1.sendOperationToMemberNode(v1);
        }).toArray(i -> {
            return new InvocationFuture[i];
        });
    }

    public InvocationFuture<?>[] notifyCheckpointEnd(CompletedCheckpoint completedCheckpoint) {
        if (!completedCheckpoint.getCheckpointType().isSchemaChangeCheckpoint()) {
            return new InvocationFuture[0];
        }
        Stream<R> map = this.plan.getPipelineSubtasks().stream().map(taskLocation -> {
            return new CheckpointEndOperation(taskLocation, completedCheckpoint.getCheckpointId(), true);
        });
        CheckpointManager checkpointManager = this.checkpointManager;
        checkpointManager.getClass();
        return (InvocationFuture[]) map.map((v1) -> {
            return r1.sendOperationToMemberNode(v1);
        }).toArray(i -> {
            return new InvocationFuture[i];
        });
    }

    public boolean isCompleted() {
        return (this.latestCompletedCheckpoint == null || !this.latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint() || this.latestCompletedCheckpoint.isRestored()) ? false : true;
    }

    public boolean isNoErrorCompleted() {
        if (this.latestCompletedCheckpoint == null) {
            return false;
        }
        CheckpointCoordinatorStatus checkpointCoordinatorStatus = (CheckpointCoordinatorStatus) this.runningJobStateIMap.get(this.checkpointStateImapKey);
        return this.latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint() && (checkpointCoordinatorStatus.equals(CheckpointCoordinatorStatus.FINISHED) || checkpointCoordinatorStatus.equals(CheckpointCoordinatorStatus.SUSPEND)) && !this.latestCompletedCheckpoint.isRestored();
    }

    public boolean isEndOfSavePoint() {
        if (this.latestCompletedCheckpoint == null) {
            return false;
        }
        return this.latestCompletedCheckpoint.getCheckpointType().isSavepoint();
    }

    public PassiveCompletableFuture<CheckpointCoordinatorState> waitCheckpointCoordinatorComplete() {
        return new PassiveCompletableFuture<>(this.checkpointCoordinatorFuture);
    }

    public PassiveCompletableFuture<CheckpointCoordinatorState> cancelCheckpoint() {
        if (this.checkpointCoordinatorFuture.isDone()) {
            return new PassiveCompletableFuture<>(this.checkpointCoordinatorFuture);
        }
        cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END);
        updateStatus(CheckpointCoordinatorStatus.CANCELED);
        this.checkpointCoordinatorFuture.complete(new CheckpointCoordinatorState(CheckpointCoordinatorStatus.CANCELED, null));
        return new PassiveCompletableFuture<>(this.checkpointCoordinatorFuture);
    }

    private synchronized void updateStatus(@NonNull CheckpointCoordinatorStatus checkpointCoordinatorStatus) {
        if (checkpointCoordinatorStatus == null) {
            throw new NullPointerException("targetStatus is marked non-null but is null");
        }
        try {
            RetryUtils.retryWithException(() -> {
                LOG.info(String.format("Turn %s state from %s to %s", this.checkpointStateImapKey, this.runningJobStateIMap.get(this.checkpointStateImapKey), checkpointCoordinatorStatus));
                this.runningJobStateIMap.set(this.checkpointStateImapKey, checkpointCoordinatorStatus);
                return null;
            }, new RetryUtils.RetryMaterial(30, true, exc -> {
                return ExceptionUtil.isOperationNeedRetryException(exc);
            }, 2000L));
        } catch (Exception e) {
            LOG.warn(String.format("Set %s state %s to IMap failed, skip do it", this.checkpointStateImapKey, checkpointCoordinatorStatus));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleSchemaChangeBeforeCheckpoint() {
        if (!this.schemaChanging.compareAndSet(false, true)) {
            LOG.warn("schema-change-before checkpoint({}@{}) is already scheduled.", Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId));
            return;
        }
        LOG.info("stop trigger general-checkpoint({}@{}) because schema change in progress.", Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId));
        LOG.info("schedule schema-change-before checkpoint({}@{}).", Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId));
        scheduleTriggerPendingCheckpoint(CheckpointType.SCHEMA_CHANGE_BEFORE_POINT_TYPE, 0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleSchemaChangeAfterCheckpoint() {
        if (!this.schemaChanging.get()) {
            LOG.warn("schema-change-after checkpoint({}@{}) is already scheduled.", Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId));
        } else {
            LOG.info("schedule schema-change-after checkpoint({}@{}).", Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId));
            scheduleTriggerPendingCheckpoint(CheckpointType.SCHEMA_CHANGE_AFTER_POINT_TYPE, 0L);
        }
    }

    protected void completeSchemaChangeAfterCheckpoint(CompletedCheckpoint completedCheckpoint) {
        if (!this.schemaChanging.compareAndSet(true, false)) {
            throw new IllegalStateException(String.format("schema-change-after checkpoint(%s/%s@%s) is already completed.", Long.valueOf(completedCheckpoint.getCheckpointId()), Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId)));
        }
        LOG.info("completed schema-change-after checkpoint({}/{}@{}).", new Object[]{Long.valueOf(completedCheckpoint.getCheckpointId()), Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId)});
        LOG.info("recover trigger general-checkpoint({}/{}@{}).", new Object[]{Long.valueOf(completedCheckpoint.getCheckpointId()), Integer.valueOf(this.pipelineId), Long.valueOf(this.jobId)});
        scheduleTriggerPendingCheckpoint(this.coordinatorConfig.getCheckpointInterval());
    }

    public CheckpointIDCounter getCheckpointIdCounter() {
        return this.checkpointIdCounter;
    }
}
