/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.IndeterminateOperationStateException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotValidationRecord;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.ExecutionNotFoundException;
import com.hazelcast.jet.impl.execution.SnapshotFlags;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.operation.SnapshotPhase1Operation;
import com.hazelcast.jet.impl.operation.SnapshotPhase2Operation;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.logging.Level;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class MasterSnapshotContext {
    final MasterContext mc;
    private final ILogger logger;
    private boolean snapshotInProgress;
    @Nonnull
    private volatile CompletableFuture<Void> terminalSnapshotFuture = CompletableFuture.completedFuture(null);
    private final Queue<SnapshotRequest> snapshotQueue = new LinkedList<SnapshotRequest>();

    MasterSnapshotContext(MasterContext masterContext, ILogger logger) {
        this.mc = masterContext;
        this.logger = logger;
    }

    void enqueueSnapshot(String snapshotName, boolean isTerminal, CompletableFuture<Void> future) {
        this.snapshotQueue.add(new SnapshotRequest(snapshotName, isTerminal, future));
    }

    private void enqueueRegularSnapshot() {
        this.enqueueSnapshot(null, false, null);
    }

    void startScheduledSnapshot(long executionId) {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.RUNNING) {
                this.logger.fine("Not beginning snapshot, " + this.mc.jobIdString() + " is not RUNNING, but " + this.mc.jobStatus());
                return;
            }
            if (this.mc.executionId() != executionId) {
                this.logger.fine("Not beginning snapshot since unexpected execution ID received for " + this.mc.jobIdString() + ". Received execution ID: " + com.hazelcast.jet.Util.idToString(executionId));
                return;
            }
            this.enqueueRegularSnapshot();
        }
        finally {
            this.mc.unlock();
        }
        this.tryBeginSnapshot();
    }

    void tryBeginSnapshot() {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            long localExecutionId;
            SnapshotRequest requestedSnapshot;
            this.mc.lock();
            try {
                if (this.mc.jobStatus() != JobStatus.RUNNING) {
                    this.logger.fine("Not beginning snapshot, " + this.mc.jobIdString() + " is not RUNNING, but " + this.mc.jobStatus());
                    return;
                }
                if (this.snapshotInProgress) {
                    this.logger.fine("Not beginning snapshot since one is already in progress " + this.mc.jobIdString());
                    return;
                }
                if (this.terminalSnapshotFuture.isDone()) {
                    this.logger.fine("Not beginning snapshot since terminal snapshot is already completed " + this.mc.jobIdString());
                    return;
                }
                requestedSnapshot = this.snapshotQueue.poll();
                if (requestedSnapshot == null) {
                    return;
                }
                this.snapshotInProgress = true;
                this.mc.jobExecutionRecord().startNewSnapshot(requestedSnapshot.snapshotName);
                localExecutionId = this.mc.executionId();
            }
            finally {
                this.mc.unlock();
            }
            long newSnapshotId = this.mc.jobExecutionRecord().ongoingSnapshotId();
            int snapshotFlags = requestedSnapshot.snapshotFlags();
            String mapName = requestedSnapshot.mapName();
            try {
                this.mc.writeJobExecutionRecordSafe(false);
                this.mc.nodeEngine().getHazelcastInstance().getMap(mapName).clear();
            }
            catch (Exception e) {
                this.logger.warning(String.format("Failed to start snapshot %d for %s", newSnapshotId, Util.jobNameAndExecutionId(this.mc.jobName(), localExecutionId)), e);
                requestedSnapshot.completeFuture(e);
                return;
            }
            LoggingUtil.logFine(this.logger, "Starting snapshot %d for %s, flags: %s, writing to: %s", newSnapshotId, Util.jobNameAndExecutionId(this.mc.jobName(), localExecutionId), SnapshotFlags.toString(snapshotFlags), requestedSnapshot.snapshotName);
            Function<ExecutionPlan, Operation> factory = plan -> new SnapshotPhase1Operation(this.mc.jobId(), localExecutionId, newSnapshotId, mapName, snapshotFlags);
            this.mc.invokeOnParticipants(factory, responses -> this.onSnapshotPhase1Complete((Collection<Map.Entry<MemberInfo, Object>>)responses, localExecutionId, newSnapshotId, requestedSnapshot), null, true);
        });
    }

    private void onSnapshotPhase1Complete(Collection<Map.Entry<MemberInfo, Object>> responses, long executionId, long snapshotId, SnapshotRequest requestedSnapshot) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            SnapshotPhase1Operation.SnapshotPhase1Result mergedResult = new SnapshotPhase1Operation.SnapshotPhase1Result();
            ArrayList<CompletableFuture> missingResponses = new ArrayList<CompletableFuture>();
            for (Map.Entry entry : responses) {
                Object response = entry.getValue();
                if (response instanceof Throwable) {
                    if (response instanceof ExecutionNotFoundException) {
                        missingResponses.add((CompletableFuture)this.mc.startOperationResponses().get(((MemberInfo)entry.getKey()).getAddress()));
                        continue;
                    }
                    response = new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, (Throwable)response);
                }
                mergedResult.merge((SnapshotPhase1Operation.SnapshotPhase1Result)response);
            }
            if (!missingResponses.isEmpty()) {
                LoggingUtil.logFine(this.logger, "%s will wait for %d responses to StartExecutionOperation in onSnapshotPhase1Complete()", this.mc.jobIdString(), missingResponses.size());
            }
            CompletableFuture.allOf(missingResponses.toArray(new CompletableFuture[0])).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, t) -> this.onSnapshotPhase1CompleteWithStartResponses(responses, executionId, snapshotId, requestedSnapshot, mergedResult, missingResponses)));
        });
    }

    private void onSnapshotPhase1CompleteWithStartResponses(Collection<Map.Entry<MemberInfo, Object>> responses, long executionId, long snapshotId, SnapshotRequest requestedSnapshot, SnapshotPhase1Operation.SnapshotPhase1Result mergedResult, List<CompletableFuture<Object>> missingResponses) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            JobExecutionRecord.SnapshotStats stats;
            boolean isSuccess;
            boolean skipPhase2 = false;
            this.mc.lock();
            try {
                if (!missingResponses.isEmpty()) {
                    LoggingUtil.logFine(this.logger, "%s all awaited responses to StartExecutionOperation received or were already received", this.mc.jobIdString());
                }
                if (executionId != this.mc.executionId()) {
                    LoggingUtil.logFine(this.logger, "%s: ignoring responses for snapshot %s phase 1: the responses are from a different execution: %s. Responses: %s", this.mc.jobIdString(), snapshotId, com.hazelcast.jet.Util.idToString(executionId), responses);
                    return;
                }
                for (CompletableFuture response : missingResponses) {
                    assert (response.isDone()) : "response not done";
                    try {
                        response.get();
                    }
                    catch (ExecutionException e) {
                        mergedResult.merge(new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, e.getCause()));
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                String mapName = requestedSnapshot.mapName();
                IMap<SnapshotValidationRecord.SnapshotValidationKey, SnapshotValidationRecord> snapshotMap = JobRepository.safeImap(this.mc.nodeEngine().getHazelcastInstance().getMap(mapName));
                try {
                    SnapshotValidationRecord validationRecord = new SnapshotValidationRecord(snapshotId, mergedResult.getNumChunks(), mergedResult.getNumBytes(), this.mc.jobExecutionRecord().ongoingSnapshotStartTime(), this.mc.jobId(), this.mc.jobName(), this.mc.jobRecord().getDagJson());
                    SnapshotValidationRecord oldValue = snapshotMap.put(SnapshotValidationRecord.KEY, validationRecord);
                    if (requestedSnapshot.isExport()) {
                        assert (requestedSnapshot.snapshotName != null);
                        this.mc.jobRepository().cacheValidationRecord(requestedSnapshot.snapshotName, validationRecord);
                    }
                    if (oldValue != null) {
                        this.logger.severe("SnapshotValidationRecord overwritten after writing to '" + mapName + "' for " + this.mc.jobIdString() + ": snapshot data might be corrupted");
                    }
                }
                catch (Exception e) {
                    mergedResult.merge(new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, e));
                }
                isSuccess = mergedResult.getError() == null;
                stats = this.mc.jobExecutionRecord().ongoingSnapshotDone(mergedResult.getNumBytes(), mergedResult.getNumKeys(), mergedResult.getNumChunks(), mergedResult.getError(), requestedSnapshot.isTerminal);
                if (isSuccess && !requestedSnapshot.isExportOnly()) {
                    try {
                        this.mc.writeJobExecutionRecordSafe(false);
                    }
                    catch (IndeterminateOperationStateException indeterminate) {
                        skipPhase2 = true;
                        this.logger.warning(this.mc.jobIdString() + " snapshot " + snapshotId + " update of JobExecutionRecord was indeterminate. Will restart the job forcefully.", indeterminate);
                    }
                    catch (Exception otherError) {
                        skipPhase2 = true;
                        this.logger.warning(this.mc.jobIdString() + " snapshot " + snapshotId + " update of JobExecutionRecord failed. Will restart the job forcefully.", otherError);
                    }
                } else {
                    this.mc.writeJobExecutionRecord(false);
                }
                if (this.logger.isFineEnabled()) {
                    this.logger.fine(String.format("Snapshot %d phase 1 for %s completed with status %s in %dms, %,d bytes, %,d keys in %,d chunks, stored in '%s'%s", snapshotId, this.mc.jobIdString(), (skipPhase2 ? "INDETERMINATE/" : "") + (isSuccess ? "SUCCESS" : "FAILURE"), stats.duration(), stats.numBytes(), stats.numKeys(), stats.numChunks(), mapName, (skipPhase2 ? ", skipping " : ", proceeding to ") + "phase 2"));
                }
                if (!isSuccess) {
                    this.logger.warning(this.mc.jobIdString() + " snapshot " + snapshotId + " phase 1 failed on some member(s), one of the failures: " + mergedResult.getError());
                    try {
                        snapshotMap.clear();
                    }
                    catch (Exception e) {
                        this.logger.warning(this.mc.jobIdString() + ": failed to clear snapshot map '" + mapName + "' after a failure", e);
                    }
                }
                if (isSuccess && !skipPhase2 && !requestedSnapshot.isExport()) {
                    this.mc.jobRepository().clearSnapshotData(this.mc.jobId(), this.mc.jobExecutionRecord().ongoingDataMapIndex());
                }
            }
            finally {
                this.mc.unlock();
            }
            if (skipPhase2) {
                TerminationMode newMode = TerminationMode.RESTART_FORCEFUL;
                this.logger().fine(this.mc.jobIdString() + ": Terminating job without performing snapshot phase 2 with mode " + newMode);
                this.mc.jobContext().handleTermination(newMode);
                requestedSnapshot.completeFuture(new JetException("Snapshot in unknown state"));
            } else {
                Function<ExecutionPlan, Operation> factory = plan -> new SnapshotPhase2Operation(this.mc.jobId(), executionId, snapshotId, isSuccess && !requestedSnapshot.isExportOnly());
                this.mc.invokeOnParticipants(factory, responses2 -> this.onSnapshotPhase2Complete(mergedResult.getError(), (Collection<Map.Entry<MemberInfo, Object>>)responses2, executionId, snapshotId, requestedSnapshot, stats.startTime()), null, true);
            }
        });
    }

    private void onSnapshotPhase2Complete(String phase1Error, Collection<Map.Entry<MemberInfo, Object>> responses, long executionId, long snapshotId, SnapshotRequest requestedSnapshot, long startTime) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            if (executionId != this.mc.executionId()) {
                LoggingUtil.logFine(this.logger, "%s: ignoring responses for snapshot %s phase 2: the responses are from a different execution: %s. Responses: %s", this.mc.jobIdString(), snapshotId, com.hazelcast.jet.Util.idToString(executionId), responses);
                return;
            }
            for (Map.Entry response : responses) {
                if (!(response.getValue() instanceof Throwable)) continue;
                this.logger.log(response.getValue() instanceof ExecutionNotFoundException ? Level.FINE : Level.WARNING, SnapshotPhase2Operation.class.getSimpleName() + " for snapshot " + snapshotId + " in " + this.mc.jobIdString() + " failed on member: " + response, (Throwable)response.getValue());
            }
            requestedSnapshot.completeFuture(phase1Error == null ? null : new JetException(phase1Error));
            this.mc.lock();
            try {
                if (executionId != this.mc.executionId()) {
                    this.logger.fine("Not completing terminalSnapshotFuture on " + this.mc.jobIdString() + ", new execution already started, snapshot was for executionId=" + com.hazelcast.jet.Util.idToString(executionId));
                    return;
                }
                assert (this.snapshotInProgress) : "snapshot not in progress";
                this.snapshotInProgress = false;
                if (requestedSnapshot.isTerminal) {
                    boolean completedNow = this.terminalSnapshotFuture.complete(null);
                    assert (completedNow) : "terminalSnapshotFuture was already completed";
                    if (phase1Error != null) {
                        this.mc.jobContext().cancelExecutionInvocations(this.mc.jobId(), this.mc.executionId(), null, null);
                    }
                } else if (!requestedSnapshot.isExport()) {
                    this.mc.coordinationService().scheduleSnapshot(this.mc, executionId);
                }
            }
            finally {
                this.mc.unlock();
            }
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Snapshot " + snapshotId + " for " + this.mc.jobIdString() + " completed in " + (System.currentTimeMillis() - startTime) + "ms, status=" + (String)(phase1Error == null ? "success" : "failure: " + phase1Error));
            }
            this.tryBeginSnapshot();
        });
    }

    CompletableFuture<Void> terminalSnapshotFuture() {
        return this.terminalSnapshotFuture;
    }

    void onExecutionStarted() {
        this.snapshotInProgress = false;
        assert (this.snapshotQueue.isEmpty()) : "snapshotQueue not empty";
        this.terminalSnapshotFuture = new CompletableFuture();
    }

    void onExecutionTerminated() {
        for (SnapshotRequest snapshotRequest : this.snapshotQueue) {
            snapshotRequest.completeFuture(new JetException("Execution completed before snapshot executed"));
        }
        this.snapshotQueue.clear();
    }

    public ILogger logger() {
        return this.logger;
    }

    private class SnapshotRequest {
        final String snapshotName;
        final boolean isTerminal;
        final CompletableFuture<Void> future;

        SnapshotRequest(String snapshotName, @Nullable boolean isTerminal, CompletableFuture<Void> future) {
            this.snapshotName = snapshotName;
            this.isTerminal = isTerminal;
            this.future = future;
        }

        public boolean isExport() {
            return this.snapshotName != null;
        }

        public boolean isExportOnly() {
            return this.isExport() && !this.isTerminal;
        }

        public int snapshotFlags() {
            return SnapshotFlags.create(this.isTerminal, this.isExport());
        }

        public String mapName() {
            return this.isExport() ? JobRepository.exportedSnapshotMapName(this.snapshotName) : JobRepository.snapshotDataMapName(MasterSnapshotContext.this.mc.jobId(), MasterSnapshotContext.this.mc.jobExecutionRecord().ongoingDataMapIndex());
        }

        public void completeFuture(@Nullable Throwable error) {
            if (this.future != null) {
                if (error == null) {
                    this.future.complete(null);
                } else {
                    this.future.completeExceptionally(error);
                }
            }
        }
    }
}

