/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubtaskCheckpointCoordinatorImpl
implements SubtaskCheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
    private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
    private final CachingCheckpointStorageWorkerView checkpointStorage;
    private final String taskName;
    private final ExecutorService asyncOperationsThreadPool;
    private final Environment env;
    private final AsyncExceptionHandler asyncExceptionHandler;
    private final ChannelStateWriter channelStateWriter;
    private final StreamTaskActionExecutor actionExecutor;
    private final BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot;
    private final Set<Long> abortedCheckpointIds;
    private long lastCheckpointId;
    private final Object lock;
    @GuardedBy(value="lock")
    private final Map<Long, AsyncCheckpointRunnable> checkpoints;
    @GuardedBy(value="lock")
    private boolean closed;

    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, CloseableRegistry closeableRegistry, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, boolean unalignedCheckpointEnabled, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot) throws IOException {
        this(checkpointStorage, taskName, actionExecutor, closeableRegistry, asyncOperationsThreadPool, env, asyncExceptionHandler, unalignedCheckpointEnabled, prepareInputSnapshot, 128);
    }

    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, CloseableRegistry closeableRegistry, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, boolean unalignedCheckpointEnabled, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot, int maxRecordAbortedCheckpoints) throws IOException {
        this(checkpointStorage, taskName, actionExecutor, closeableRegistry, asyncOperationsThreadPool, env, asyncExceptionHandler, prepareInputSnapshot, maxRecordAbortedCheckpoints, unalignedCheckpointEnabled ? SubtaskCheckpointCoordinatorImpl.openChannelStateWriter(taskName, checkpointStorage, env) : ChannelStateWriter.NO_OP);
    }

    @VisibleForTesting
    SubtaskCheckpointCoordinatorImpl(CheckpointStorageWorkerView checkpointStorage, String taskName, StreamTaskActionExecutor actionExecutor, CloseableRegistry closeableRegistry, ExecutorService asyncOperationsThreadPool, Environment env, AsyncExceptionHandler asyncExceptionHandler, BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException> prepareInputSnapshot, int maxRecordAbortedCheckpoints, ChannelStateWriter channelStateWriter) throws IOException {
        this.checkpointStorage = new CachingCheckpointStorageWorkerView((CheckpointStorageWorkerView)Preconditions.checkNotNull((Object)checkpointStorage));
        this.taskName = (String)Preconditions.checkNotNull((Object)taskName);
        this.checkpoints = new HashMap<Long, AsyncCheckpointRunnable>();
        this.lock = new Object();
        this.asyncOperationsThreadPool = (ExecutorService)Preconditions.checkNotNull((Object)asyncOperationsThreadPool);
        this.env = (Environment)Preconditions.checkNotNull((Object)env);
        this.asyncExceptionHandler = (AsyncExceptionHandler)Preconditions.checkNotNull((Object)asyncExceptionHandler);
        this.actionExecutor = (StreamTaskActionExecutor)Preconditions.checkNotNull((Object)actionExecutor);
        this.channelStateWriter = (ChannelStateWriter)Preconditions.checkNotNull((Object)channelStateWriter);
        this.prepareInputSnapshot = prepareInputSnapshot;
        this.abortedCheckpointIds = this.createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
        this.lastCheckpointId = -1L;
        closeableRegistry.registerCloseable((Closeable)this);
        this.closed = false;
    }

    private static ChannelStateWriter openChannelStateWriter(String taskName, CheckpointStorageWorkerView checkpointStorage, Environment env) {
        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(taskName, env.getTaskInfo().getIndexOfThisSubtask(), checkpointStorage);
        writer.open();
        return writer;
    }

    @Override
    public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause, OperatorChain<?, ?> operatorChain) throws IOException {
        long next;
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", (Object)checkpointId, (Object)this.taskName);
        this.lastCheckpointId = Math.max(this.lastCheckpointId, checkpointId);
        Iterator<Long> iterator = this.abortedCheckpointIds.iterator();
        while (iterator.hasNext() && (next = iterator.next().longValue()) < this.lastCheckpointId) {
            iterator.remove();
        }
        this.checkpointStorage.clearCacheFor(checkpointId);
        this.channelStateWriter.abort(checkpointId, (Throwable)cause, true);
        this.env.declineCheckpoint(checkpointId, cause);
        this.actionExecutor.runThrowing(() -> operatorChain.broadcastEvent((AbstractEvent)new CancelCheckpointMarker(checkpointId)));
    }

    @Override
    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.checkpointStorage;
    }

    @Override
    public ChannelStateWriter getChannelStateWriter() {
        return this.channelStateWriter;
    }

    @Override
    public void checkpointState(CheckpointMetaData metadata, CheckpointOptions options, CheckpointMetricsBuilder metrics, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        Preconditions.checkNotNull((Object)options);
        Preconditions.checkNotNull((Object)metrics);
        if (this.lastCheckpointId >= metadata.getCheckpointId()) {
            LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", (Object)this.lastCheckpointId, (Object)metadata.getCheckpointId());
            this.channelStateWriter.abort(metadata.getCheckpointId(), (Throwable)new CancellationException(), true);
            this.checkAndClearAbortedStatus(metadata.getCheckpointId());
            return;
        }
        this.lastCheckpointId = metadata.getCheckpointId();
        if (this.checkAndClearAbortedStatus(metadata.getCheckpointId())) {
            operatorChain.broadcastEvent((AbstractEvent)new CancelCheckpointMarker(metadata.getCheckpointId()));
            LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", (Object)metadata.getCheckpointId());
            return;
        }
        if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
            options = options.withUnalignedSupported();
            this.initInputsCheckpoint(metadata.getCheckpointId(), options);
        }
        operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
        operatorChain.broadcastEvent((AbstractEvent)new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), options.isUnalignedCheckpoint());
        if (options.isUnalignedCheckpoint()) {
            this.channelStateWriter.finishOutput(metadata.getCheckpointId());
        }
        HashMap<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<OperatorID, OperatorSnapshotFutures>(operatorChain.getNumberOfOperators());
        try {
            if (this.takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {
                this.finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);
            } else {
                this.cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
            }
        }
        catch (Exception ex) {
            this.cleanup(snapshotFutures, metadata, metrics, ex);
            throw ex;
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        if (isRunning.get().booleanValue()) {
            LOG.debug("Notification of completed checkpoint {} for task {}", (Object)checkpointId, (Object)this.taskName);
            for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
                operatorWrapper.notifyCheckpointComplete(checkpointId);
            }
        } else {
            LOG.debug("Ignoring notification of complete checkpoint {} for not-running task {}", (Object)checkpointId, (Object)this.taskName);
        }
        this.env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        Exception previousException = null;
        if (isRunning.get().booleanValue()) {
            LOG.debug("Notification of aborted checkpoint {} for task {}", (Object)checkpointId, (Object)this.taskName);
            boolean canceled = this.cancelAsyncCheckpointRunnable(checkpointId);
            if (!canceled && checkpointId > this.lastCheckpointId) {
                this.abortedCheckpointIds.add(checkpointId);
            }
            this.channelStateWriter.abort(checkpointId, (Throwable)new CancellationException("checkpoint aborted via notification"), false);
            for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
                try {
                    operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);
                }
                catch (Exception e) {
                    previousException = e;
                }
            }
        } else {
            LOG.debug("Ignoring notification of aborted checkpoint {} for not-running task {}", (Object)checkpointId, (Object)this.taskName);
        }
        this.env.getTaskStateManager().notifyCheckpointAborted(checkpointId);
        ExceptionUtils.tryRethrowException(previousException);
    }

    @Override
    public void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions) throws CheckpointException {
        if (checkpointOptions.isUnalignedCheckpoint()) {
            this.channelStateWriter.start(id, checkpointOptions);
            this.prepareInflightDataSnapshot(id);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        ArrayList<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
        Object object = this.lock;
        synchronized (object) {
            if (!this.closed) {
                this.closed = true;
                asyncCheckpointRunnables = new ArrayList<AsyncCheckpointRunnable>(this.checkpoints.values());
                this.checkpoints.clear();
            }
        }
        IOUtils.closeAllQuietly(asyncCheckpointRunnables);
        this.channelStateWriter.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getAsyncCheckpointRunnableSize() {
        Object object = this.lock;
        synchronized (object) {
            return this.checkpoints.size();
        }
    }

    @VisibleForTesting
    int getAbortedCheckpointSize() {
        return this.abortedCheckpointIds.size();
    }

    private boolean checkAndClearAbortedStatus(long checkpointId) {
        return this.abortedCheckpointIds.remove(checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void registerAsyncCheckpointRunnable(long checkpointId, AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                LOG.debug("Cannot register Closeable, this subtaskCheckpointCoordinator is already closed. Closing argument.");
                IOUtils.closeQuietly((AutoCloseable)asyncCheckpointRunnable);
                Preconditions.checkState((!this.checkpoints.containsKey(checkpointId) ? 1 : 0) != 0, (String)"SubtaskCheckpointCoordinator was closed without releasing asyncCheckpointRunnable for checkpoint %s", (Object[])new Object[]{checkpointId});
            } else {
                if (this.checkpoints.containsKey(checkpointId)) {
                    IOUtils.closeQuietly((AutoCloseable)asyncCheckpointRunnable);
                    throw new IOException(String.format("Cannot register Closeable, async checkpoint %d runnable has been register. Closing argument.", checkpointId));
                }
                this.checkpoints.put(checkpointId, asyncCheckpointRunnable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean unregisterAsyncCheckpointRunnable(long checkpointId) {
        Object object = this.lock;
        synchronized (object) {
            return this.checkpoints.remove(checkpointId) != null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean cancelAsyncCheckpointRunnable(long checkpointId) {
        AsyncCheckpointRunnable asyncCheckpointRunnable;
        Object object = this.lock;
        synchronized (object) {
            asyncCheckpointRunnable = this.checkpoints.remove(checkpointId);
        }
        IOUtils.closeQuietly((AutoCloseable)asyncCheckpointRunnable);
        return asyncCheckpointRunnable != null;
    }

    private void cleanup(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData metadata, CheckpointMetricsBuilder metrics, Exception ex) {
        this.channelStateWriter.abort(metadata.getCheckpointId(), (Throwable)ex, true);
        for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
            if (operatorSnapshotResult == null) continue;
            try {
                operatorSnapshotResult.cancel();
            }
            catch (Exception e) {
                LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.taskName, metadata.getCheckpointId(), metrics.getAlignmentDurationNanosOrDefault() / 1000000L, metrics.getSyncDurationMillis()});
        }
    }

    private void prepareInflightDataSnapshot(long checkpointId) throws CheckpointException {
        ((CompletableFuture)this.prepareInputSnapshot.apply((Object)this.channelStateWriter, (Object)checkpointId)).whenComplete((unused, ex) -> {
            if (ex != null) {
                this.channelStateWriter.abort(checkpointId, ex, false);
            } else {
                this.channelStateWriter.finishInput(checkpointId);
            }
        });
    }

    private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetricsBuilder metrics, Supplier<Boolean> isRunning) {
        this.asyncOperationsThreadPool.execute(new AsyncCheckpointRunnable(snapshotFutures, metadata, metrics, System.nanoTime(), this.taskName, this.registerConsumer(), this.unregisterConsumer(), this.env, this.asyncExceptionHandler, isRunning));
    }

    private Consumer<AsyncCheckpointRunnable> registerConsumer() {
        return asyncCheckpointRunnable -> {
            try {
                this.registerAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId(), (AsyncCheckpointRunnable)asyncCheckpointRunnable);
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        };
    }

    private Consumer<AsyncCheckpointRunnable> unregisterConsumer() {
        return asyncCheckpointRunnable -> this.unregisterAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean takeSnapshotSync(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetrics, CheckpointOptions checkpointOptions, OperatorChain<?, ?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
        for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
            if (!operatorWrapper.isClosed()) continue;
            this.env.declineCheckpoint(checkpointMetaData.getCheckpointId(), new CheckpointException("Task Name" + this.taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
            return false;
        }
        long checkpointId = checkpointMetaData.getCheckpointId();
        long started = System.nanoTime();
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ? this.channelStateWriter.getAndRemoveWriteResult(checkpointId) : ChannelStateWriter.ChannelStateWriteResult.EMPTY;
        CheckpointStreamFactory storage = this.checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
        try {
            for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
                if (operatorWrapper.isClosed()) continue;
                operatorSnapshotsInProgress.put(operatorWrapper.getStreamOperator().getOperatorID(), this.buildOperatorSnapshotFutures(checkpointMetaData, checkpointOptions, operatorChain, (StreamOperator<?>)operatorWrapper.getStreamOperator(), isRunning, channelStateWriteResult, storage));
            }
        }
        finally {
            this.checkpointStorage.clearCacheFor(checkpointId);
        }
        LOG.debug("{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms, is unaligned checkpoint : {}", new Object[]{this.taskName, checkpointId, checkpointMetrics.getAlignmentDurationNanosOrDefault() / 1000000L, checkpointMetrics.getSyncDurationMillis(), checkpointOptions.isUnalignedCheckpoint()});
        checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1000000L);
        checkpointMetrics.setUnalignedCheckpoint(checkpointOptions.isUnalignedCheckpoint());
        return true;
    }

    private OperatorSnapshotFutures buildOperatorSnapshotFutures(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, OperatorChain<?, ?> operatorChain, StreamOperator<?> op, Supplier<Boolean> isRunning, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory storage) throws Exception {
        OperatorSnapshotFutures snapshotInProgress = SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(op, checkpointMetaData, checkpointOptions, storage, isRunning);
        if (op == operatorChain.getMainOperator()) {
            snapshotInProgress.setInputChannelStateFuture((Future<SnapshotResult<StateObjectCollection<InputChannelStateHandle>>>)((Object)((CompletableFuture)channelStateWriteResult.getInputChannelStateHandles().thenApply(StateObjectCollection::new)).thenApply(SnapshotResult::of)));
        }
        if (op == operatorChain.getTailOperator()) {
            snapshotInProgress.setResultSubpartitionStateFuture((Future<SnapshotResult<StateObjectCollection<ResultSubpartitionStateHandle>>>)((Object)((CompletableFuture)channelStateWriteResult.getResultSubpartitionStateHandles().thenApply(StateObjectCollection::new)).thenApply(SnapshotResult::of)));
        }
        return snapshotInProgress;
    }

    private Set<Long> createAbortedCheckpointSetWithLimitSize(final int maxRecordAbortedCheckpoints) {
        return Collections.newSetFromMap(new LinkedHashMap<Long, Boolean>(){
            private static final long serialVersionUID = 1L;

            @Override
            protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
                return this.size() > maxRecordAbortedCheckpoints;
            }
        });
    }

    private static OperatorSnapshotFutures checkpointStreamOperator(StreamOperator<?> op, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation, Supplier<Boolean> isRunning) throws Exception {
        try {
            return op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions, storageLocation);
        }
        catch (Exception ex) {
            if (isRunning.get().booleanValue()) {
                LOG.info(ex.getMessage(), (Throwable)ex);
            }
            throw ex;
        }
    }

    private static class CachingCheckpointStorageWorkerView
    implements CheckpointStorageWorkerView {
        private final Map<Long, CheckpointStreamFactory> cache = new ConcurrentHashMap<Long, CheckpointStreamFactory>();
        private final CheckpointStorageWorkerView delegate;

        private CachingCheckpointStorageWorkerView(CheckpointStorageWorkerView delegate) {
            this.delegate = delegate;
        }

        void clearCacheFor(long checkpointId) {
            this.cache.remove(checkpointId);
        }

        public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
            return this.cache.computeIfAbsent(checkpointId, id -> {
                try {
                    return this.delegate.resolveCheckpointStorageLocation(checkpointId, reference);
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException((Throwable)e);
                }
            });
        }

        public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
            return this.delegate.createTaskOwnedStateStream();
        }
    }
}

