/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector;
import org.apache.flink.streaming.runtime.tasks.ChainingOutput;
import org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector;
import org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput;
import org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements StreamStatusMaintainer,
BoundedMultiInput {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    private final RecordWriterOutput<?>[] streamOutputs;
    private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;
    @Nullable
    private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
    @Nullable
    private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;
    @Nullable
    private final StreamOperatorWrapper<?, ?> tailOperatorWrapper;
    private final Map<StreamConfig.SourceInputConfig, ChainedSource> chainedSources;
    private final int numOperators;
    private final OperatorEventDispatcherImpl operatorEventDispatcher;
    private boolean ignoreEndOfInput;
    private StreamStatus streamStatus = StreamStatus.ACTIVE;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public OperatorChain(StreamTask<OUT, OP> containingTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
        this.operatorEventDispatcher = new OperatorEventDispatcherImpl(containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(), containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
        ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        StreamConfig configuration = containingTask.getConfiguration();
        Object operatorFactory = configuration.getStreamOperatorFactory(userCodeClassloader);
        Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
        HashMap streamOutputMap = new HashMap(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outEdgesInOrder.size()];
        boolean success = false;
        try {
            this.createChainOutputs(outEdgesInOrder, recordWriterDelegate, chainedConfigs, containingTask, streamOutputMap);
            ArrayList allOpWrappers = new ArrayList(chainedConfigs.size());
            this.mainOperatorOutput = this.createOutputCollector(containingTask, configuration, chainedConfigs, userCodeClassloader, streamOutputMap, allOpWrappers, containingTask.getMailboxExecutorFactory());
            if (operatorFactory != null) {
                Tuple2 mainOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(operatorFactory, containingTask, configuration, this.mainOperatorOutput, this.operatorEventDispatcher);
                StreamOperator mainOperator = (StreamOperator)mainOperatorAndTimeService.f0;
                mainOperator.getMetricGroup().gauge("currentOutputWatermark", this.mainOperatorOutput.getWatermarkGauge());
                this.mainOperatorWrapper = this.createOperatorWrapper(mainOperator, containingTask, configuration, (Optional)mainOperatorAndTimeService.f1, true);
                allOpWrappers.add(this.mainOperatorWrapper);
                this.tailOperatorWrapper = (StreamOperatorWrapper)allOpWrappers.get(0);
            } else {
                Preconditions.checkState((allOpWrappers.size() == 0 ? 1 : 0) != 0);
                this.mainOperatorWrapper = null;
                this.tailOperatorWrapper = null;
            }
            this.chainedSources = this.createChainedSources(containingTask, configuration.getInputs(userCodeClassloader), chainedConfigs, userCodeClassloader, allOpWrappers);
            this.numOperators = allOpWrappers.size();
            this.firstOperatorWrapper = this.linkOperatorWrappers(allOpWrappers);
            success = true;
        }
        finally {
            if (!success) {
                for (RecordWriterOutput<?> output : this.streamOutputs) {
                    if (output == null) continue;
                    output.close();
                }
            }
        }
    }

    @VisibleForTesting
    OperatorChain(List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, RecordWriterOutput<?>[] streamOutputs, WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput, StreamOperatorWrapper<OUT, OP> mainOperatorWrapper) {
        this.streamOutputs = (RecordWriterOutput[])Preconditions.checkNotNull(streamOutputs);
        this.mainOperatorOutput = (WatermarkGaugeExposingOutput)Preconditions.checkNotNull(mainOperatorOutput);
        this.operatorEventDispatcher = null;
        Preconditions.checkState((allOperatorWrappers != null && allOperatorWrappers.size() > 0 ? 1 : 0) != 0);
        this.mainOperatorWrapper = (StreamOperatorWrapper)Preconditions.checkNotNull(mainOperatorWrapper);
        this.tailOperatorWrapper = allOperatorWrappers.get(0);
        this.numOperators = allOperatorWrappers.size();
        this.chainedSources = Collections.emptyMap();
        this.firstOperatorWrapper = this.linkOperatorWrappers(allOperatorWrappers);
    }

    private void createChainOutputs(List<StreamEdge> outEdgesInOrder, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, Map<Integer, StreamConfig> chainedConfigs, StreamTask<OUT, OP> containingTask, Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) {
        for (int i = 0; i < outEdgesInOrder.size(); ++i) {
            StreamEdge outEdge = outEdgesInOrder.get(i);
            RecordWriterOutput<OUT> streamOutput = this.createStreamOutput(recordWriterDelegate.getRecordWriter(i), outEdge, chainedConfigs.get(outEdge.getSourceId()), containingTask.getEnvironment());
            this.streamOutputs[i] = streamOutput;
            streamOutputMap.put(outEdge, streamOutput);
        }
    }

    private Map<StreamConfig.SourceInputConfig, ChainedSource> createChainedSources(StreamTask<OUT, OP> containingTask, StreamConfig.InputConfig[] configuredInputs, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, List<StreamOperatorWrapper<?, ?>> allOpWrappers) {
        if (Arrays.stream(configuredInputs).noneMatch(input -> input instanceof StreamConfig.SourceInputConfig)) {
            return Collections.emptyMap();
        }
        Preconditions.checkState((boolean)(this.mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator), (Object)"Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");
        HashMap<StreamConfig.SourceInputConfig, ChainedSource> chainedSourceInputs = new HashMap<StreamConfig.SourceInputConfig, ChainedSource>();
        MultipleInputStreamOperator multipleInputOperator = (MultipleInputStreamOperator)this.mainOperatorWrapper.getStreamOperator();
        List<Input> operatorInputs = multipleInputOperator.getInputs();
        int sourceInputGateIndex = Arrays.stream(containingTask.getEnvironment().getAllInputGates()).mapToInt(IndexedInputGate::getInputGateIndex).max().orElse(-1) + 1;
        for (int inputId = 0; inputId < configuredInputs.length; ++inputId) {
            if (!(configuredInputs[inputId] instanceof StreamConfig.SourceInputConfig)) continue;
            StreamConfig.SourceInputConfig sourceInput = (StreamConfig.SourceInputConfig)configuredInputs[inputId];
            int sourceEdgeId = sourceInput.getInputEdge().getSourceId();
            StreamConfig sourceInputConfig = chainedConfigs.get(sourceEdgeId);
            OutputTag outputTag = sourceInput.getInputEdge().getOutputTag();
            WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput = this.createChainedSourceOutput(containingTask, operatorInputs.get(inputId), (OperatorMetricGroup)multipleInputOperator.getMetricGroup(), outputTag);
            SourceOperator sourceOperator = (SourceOperator)this.createOperator(containingTask, sourceInputConfig, userCodeClassloader, chainedSourceOutput, allOpWrappers, true);
            chainedSourceInputs.put(sourceInput, new ChainedSource(chainedSourceOutput, new StreamTaskSourceInput(sourceOperator, sourceInputGateIndex++, inputId)));
        }
        return chainedSourceInputs;
    }

    private WatermarkGaugeExposingOutput<StreamRecord> createChainedSourceOutput(StreamTask<?, OP> containingTask, Input input, OperatorMetricGroup metricGroup, OutputTag outputTag) {
        if (!containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            throw new UnsupportedOperationException("Currently chained sources are supported only with objectReuse enabled");
        }
        return new ChainingOutput(input, metricGroup, this, outputTag, null);
    }

    @Override
    public StreamStatus getStreamStatus() {
        return this.streamStatus;
    }

    public OperatorEventDispatcher getOperatorEventDispatcher() {
        return this.operatorEventDispatcher;
    }

    public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
        this.operatorEventDispatcher.dispatchEventToHandlers(operator, event);
    }

    @Override
    public void toggleStreamStatus(StreamStatus status) {
        if (!status.equals(this.streamStatus)) {
            this.streamStatus = status;
            for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
                streamOutput.emitStreamStatus(status);
            }
        }
    }

    public void broadcastEvent(AbstractEvent event) throws IOException {
        this.broadcastEvent(event, false);
    }

    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.broadcastEvent(event, isPriorityEvent);
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        for (StreamOperatorWrapper<?, ?> operatorWrapper : this.getAllOperators()) {
            if (operatorWrapper.isClosed()) continue;
            operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
        }
    }

    @Override
    public void endInput(int inputId) throws Exception {
        if (this.mainOperatorWrapper != null && !this.ignoreEndOfInput) {
            this.mainOperatorWrapper.endOperatorInput(inputId);
        }
    }

    protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        for (StreamOperatorWrapper<?, ?> operatorWrapper : this.getAllOperators(true)) {
            Object operator = operatorWrapper.getStreamOperator();
            operator.initializeState(streamTaskStateInitializer);
            operator.open();
        }
    }

    protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
        if (this.firstOperatorWrapper != null) {
            this.firstOperatorWrapper.close(actionExecutor, this.ignoreEndOfInput);
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators() {
        return this.getAllOperators(false);
    }

    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators(boolean reverse) {
        return reverse ? new StreamOperatorWrapper.ReadIterator(this.tailOperatorWrapper, true) : new StreamOperatorWrapper.ReadIterator(this.mainOperatorWrapper, false);
    }

    public int getNumberOfOperators() {
        return this.numOperators;
    }

    public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput() {
        return this.mainOperatorOutput;
    }

    public Output<StreamRecord<?>> getChainedSourceOutput(StreamConfig.SourceInputConfig sourceInput) {
        Preconditions.checkArgument((boolean)this.chainedSources.containsKey(sourceInput), (String)"Chained source with sourcedId = [%s] was not found", (Object[])new Object[]{sourceInput});
        return this.chainedSources.get(sourceInput).getSourceOutput();
    }

    public StreamTaskSourceInput<?> getSourceTaskInput(StreamConfig.SourceInputConfig sourceInput) {
        Preconditions.checkArgument((boolean)this.chainedSources.containsKey(sourceInput), (String)"Chained source with sourcedId = [%s] was not found", (Object[])new Object[]{sourceInput});
        return this.chainedSources.get(sourceInput).getSourceTaskInput();
    }

    public List<StreamTaskSourceInput<?>> getSourceTaskInputs() {
        return this.chainedSources.values().stream().map(ChainedSource::getSourceTaskInput).collect(Collectors.toList());
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.getStreamOutputs()) {
            streamOutput.flush();
        }
    }

    public void releaseOutputs() {
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.close();
        }
    }

    @Nullable
    public OP getMainOperator() {
        return this.mainOperatorWrapper == null ? null : (OP)this.mainOperatorWrapper.getStreamOperator();
    }

    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, MailboxExecutorFactory mailboxExecutorFactory) {
        ArrayList<Tuple2> allOutputs = new ArrayList<Tuple2>(4);
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            RecordWriterOutput<?> output = streamOutputs.get(outputEdge);
            allOutputs.add(new Tuple2(output, (Object)outputEdge));
        }
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
            WatermarkGaugeExposingOutput output = this.createOperatorChain(containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperatorWrappers, outputEdge.getOutputTag(), mailboxExecutorFactory);
            allOutputs.add(new Tuple2(output, (Object)outputEdge));
        }
        if (allOutputs.size() == 1) {
            return (WatermarkGaugeExposingOutput)((Tuple2)allOutputs.get((int)0)).f0;
        }
        Output[] asArray = new Output[allOutputs.size()];
        for (int i = 0; i < allOutputs.size(); ++i) {
            asArray[i] = (Output)((Tuple2)allOutputs.get((int)i)).f0;
        }
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            return new CopyingBroadcastingOutputCollector(asArray, this);
        }
        return new BroadcastingOutputCollector(asArray, this);
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(StreamTask<OUT, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, OutputTag<IN> outputTag, MailboxExecutorFactory mailboxExecutorFactory) {
        WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = this.createOutputCollector(containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperatorWrappers, mailboxExecutorFactory);
        OneInputStreamOperator chainedOperator = (OneInputStreamOperator)this.createOperator(containingTask, operatorConfig, userCodeClassloader, chainedOperatorOutput, allOperatorWrappers, false);
        return this.wrapOperatorIntoOutput(chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag);
    }

    private <OUT, OP extends StreamOperator<OUT>> OP createOperator(StreamTask<OUT, ?> containingTask, StreamConfig operatorConfig, ClassLoader userCodeClassloader, WatermarkGaugeExposingOutput<StreamRecord<OUT>> output, List<StreamOperatorWrapper<?, ?>> allOperatorWrappers, boolean isHead) {
        Tuple2 chainedOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(operatorConfig.getStreamOperatorFactory(userCodeClassloader), containingTask, operatorConfig, output, this.operatorEventDispatcher);
        StreamOperator chainedOperator = (StreamOperator)chainedOperatorAndTimeService.f0;
        allOperatorWrappers.add(this.createOperatorWrapper(chainedOperator, containingTask, operatorConfig, (Optional)chainedOperatorAndTimeService.f1, isHead));
        chainedOperator.getMetricGroup().gauge("currentOutputWatermark", () -> output.getWatermarkGauge().getValue());
        return (OP)chainedOperator;
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(OneInputStreamOperator<IN, OUT> operator, StreamTask<OUT, ?> containingTask, StreamConfig operatorConfig, ClassLoader userCodeClassloader, OutputTag<IN> outputTag) {
        ChainingOutput<IN> currentOperatorOutput;
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            currentOperatorOutput = new ChainingOutput<IN>(operator, this, outputTag);
        } else {
            TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            currentOperatorOutput = new CopyingChainingOutput<IN>(operator, inSerializer, outputTag, this);
        }
        operator.getMetricGroup().gauge("currentInputWatermark", () -> currentOperatorOutput.getWatermarkGauge().getValue());
        return currentOperatorOutput;
    }

    private RecordWriterOutput<OUT> createStreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, StreamEdge edge, StreamConfig upStreamConfig, Environment taskEnvironment) {
        OutputTag sideOutputTag = edge.getOutputTag();
        TypeSerializer outSerializer = null;
        outSerializer = edge.getOutputTag() != null ? upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserCodeClassLoader().asClassLoader()) : upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserCodeClassLoader().asClassLoader());
        return new RecordWriterOutput<OUT>(recordWriter, outSerializer, sideOutputTag, this, edge.supportsUnalignedCheckpoints());
    }

    private StreamOperatorWrapper<?, ?> linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> allOperatorWrappers) {
        StreamOperatorWrapper<?, ?> previous = null;
        for (StreamOperatorWrapper<?, ?> current : allOperatorWrappers) {
            if (previous != null) {
                previous.setPrevious(current);
            }
            current.setNext(previous);
            previous = current;
        }
        return previous;
    }

    private <T, P extends StreamOperator<T>> StreamOperatorWrapper<T, P> createOperatorWrapper(P operator, StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Optional<ProcessingTimeService> processingTimeService, boolean isHead) {
        return new StreamOperatorWrapper(operator, processingTimeService, containingTask.getMailboxExecutorFactory().createExecutor(operatorConfig.getChainIndex()), isHead);
    }

    @Nullable
    StreamOperator<?> getTailOperator() {
        return this.tailOperatorWrapper == null ? null : (StreamOperator<?>)this.tailOperatorWrapper.getStreamOperator();
    }

    public void setIgnoreEndOfInput(boolean ignoreEndOfInput) {
        this.ignoreEndOfInput = ignoreEndOfInput;
    }

    public static class ChainedSource {
        private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
        private final StreamTaskSourceInput<?> sourceTaskInput;

        public ChainedSource(WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput, StreamTaskSourceInput<?> sourceTaskInput) {
            this.chainedSourceOutput = chainedSourceOutput;
            this.sourceTaskInput = sourceTaskInput;
        }

        public Output<StreamRecord<?>> getSourceOutput() {
            return this.chainedSourceOutput;
        }

        public StreamTaskSourceInput<?> getSourceTaskInput() {
            return this.sourceTaskInput;
        }
    }
}

