/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@ThreadSafe
public class ChannelStateWriterImpl
implements ChannelStateWriter {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriterImpl.class);
    private static final int DEFAULT_MAX_CHECKPOINTS = 1000;
    private final JobVertexID jobVertexID;
    private final int subtaskIndex;
    private final String taskName;
    private final ChannelStateWriteRequestExecutor executor;
    private final ConcurrentMap<Long, ChannelStateWriter.ChannelStateWriteResult> results;
    private final int maxCheckpoints;
    private final AtomicBoolean wasClosed = new AtomicBoolean(false);

    public ChannelStateWriterImpl(JobVertexID jobVertexID, String taskName, int subtaskIndex, CheckpointStorage checkpointStorage, ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory, int maxSubtasksPerChannelStateFile) {
        this(jobVertexID, taskName, subtaskIndex, checkpointStorage, 1000, channelStateExecutorFactory, maxSubtasksPerChannelStateFile);
    }

    ChannelStateWriterImpl(JobVertexID jobVertexID, String taskName, int subtaskIndex, CheckpointStorage checkpointStorage, int maxCheckpoints, ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory, int maxSubtasksPerChannelStateFile) {
        this(jobVertexID, taskName, subtaskIndex, new ConcurrentHashMap<Long, ChannelStateWriter.ChannelStateWriteResult>(maxCheckpoints), channelStateExecutorFactory.getOrCreateExecutor(jobVertexID, subtaskIndex, checkpointStorage, maxSubtasksPerChannelStateFile), maxCheckpoints);
    }

    ChannelStateWriterImpl(JobVertexID jobVertexID, String taskName, int subtaskIndex, ConcurrentMap<Long, ChannelStateWriter.ChannelStateWriteResult> results, ChannelStateWriteRequestExecutor executor, int maxCheckpoints) {
        this.jobVertexID = jobVertexID;
        this.taskName = taskName;
        this.subtaskIndex = subtaskIndex;
        this.results = results;
        this.maxCheckpoints = maxCheckpoints;
        this.executor = executor;
    }

    @Override
    public void start(long checkpointId, CheckpointOptions checkpointOptions) {
        LOG.debug("{} starting checkpoint {} ({})", new Object[]{this.taskName, checkpointId, checkpointOptions});
        ChannelStateWriter.ChannelStateWriteResult result = new ChannelStateWriter.ChannelStateWriteResult();
        ChannelStateWriter.ChannelStateWriteResult put = this.results.computeIfAbsent(checkpointId, id -> {
            Preconditions.checkState((this.results.size() < this.maxCheckpoints ? 1 : 0) != 0, (Object)String.format("%s can't start %d, results.size() > maxCheckpoints: %d > %d", this.taskName, checkpointId, this.results.size(), this.maxCheckpoints));
            this.enqueue(new CheckpointStartRequest(this.jobVertexID, this.subtaskIndex, checkpointId, result, checkpointOptions.getTargetLocation()), false);
            return result;
        });
        Preconditions.checkArgument((put == result ? 1 : 0) != 0, (Object)(this.taskName + " result future already present for checkpoint " + checkpointId));
    }

    @Override
    public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> iterator) {
        LOG.trace("{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}", new Object[]{this.taskName, checkpointId, info, startSeqNum});
        this.enqueue(ChannelStateWriteRequest.write(this.jobVertexID, this.subtaskIndex, checkpointId, info, iterator), false);
    }

    @Override
    public void addOutputData(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, Buffer ... data) {
        LOG.trace("{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}", new Object[]{this.taskName, checkpointId, info, startSeqNum, data == null ? 0 : data.length});
        this.enqueue(ChannelStateWriteRequest.write(this.jobVertexID, this.subtaskIndex, checkpointId, info, data), false);
    }

    @Override
    public void addOutputDataFuture(long checkpointId, ResultSubpartitionInfo info, int startSeqNum, CompletableFuture<List<Buffer>> dataFuture) throws IllegalArgumentException {
        LOG.trace("{} adding output data future, checkpoint {}, channel: {}, startSeqNum: {}", new Object[]{this.taskName, checkpointId, info, startSeqNum});
        this.enqueue(ChannelStateWriteRequest.write(this.jobVertexID, this.subtaskIndex, checkpointId, info, dataFuture), false);
    }

    @Override
    public void finishInput(long checkpointId) {
        LOG.debug("{} finishing input data, checkpoint {}", (Object)this.taskName, (Object)checkpointId);
        this.enqueue(ChannelStateWriteRequest.completeInput(this.jobVertexID, this.subtaskIndex, checkpointId), false);
    }

    @Override
    public void finishOutput(long checkpointId) {
        LOG.debug("{} finishing output data, checkpoint {}", (Object)this.taskName, (Object)checkpointId);
        this.enqueue(ChannelStateWriteRequest.completeOutput(this.jobVertexID, this.subtaskIndex, checkpointId), false);
    }

    @Override
    public void abort(long checkpointId, Throwable cause, boolean cleanup) {
        LOG.debug("{} aborting, checkpoint {}", (Object)this.taskName, (Object)checkpointId);
        this.enqueue(ChannelStateWriteRequest.abort(this.jobVertexID, this.subtaskIndex, checkpointId, cause), true);
        this.enqueue(ChannelStateWriteRequest.abort(this.jobVertexID, this.subtaskIndex, checkpointId, cause), false);
        if (cleanup) {
            this.results.remove(checkpointId);
        }
    }

    @Override
    public ChannelStateWriter.ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
        LOG.debug("{} requested write result, checkpoint {}", (Object)this.taskName, (Object)checkpointId);
        ChannelStateWriter.ChannelStateWriteResult result = (ChannelStateWriter.ChannelStateWriteResult)this.results.remove(checkpointId);
        Preconditions.checkArgument((result != null ? 1 : 0) != 0, (Object)(this.taskName + " channel state write result not found for checkpoint " + checkpointId));
        return result;
    }

    @VisibleForTesting
    public ChannelStateWriter.ChannelStateWriteResult getWriteResult(long checkpointId) {
        return (ChannelStateWriter.ChannelStateWriteResult)this.results.get(checkpointId);
    }

    @Override
    public void close() throws IOException {
        if (this.wasClosed.compareAndSet(false, true)) {
            LOG.debug("close, dropping checkpoints {}", this.results.keySet());
            this.results.clear();
            this.enqueue(ChannelStateWriteRequest.releaseSubtask(this.jobVertexID, this.subtaskIndex), false);
            this.executor.releaseSubtask(this.jobVertexID, this.subtaskIndex);
        }
    }

    private void enqueue(ChannelStateWriteRequest request, boolean atTheFront) {
        try {
            if (atTheFront) {
                this.executor.submitPriority(request);
            } else {
                this.executor.submit(request);
            }
        }
        catch (Exception e) {
            RuntimeException wrapped = new RuntimeException("unable to send request to worker", e);
            try {
                request.cancel(e);
            }
            catch (Exception cancelException) {
                wrapped.addSuppressed(cancelException);
            }
            throw wrapped;
        }
    }
}

