/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {
    private final LegacySourceFunctionThread sourceThread;
    private final Object lock;
    private volatile boolean externallyInducedCheckpoints;
    private volatile boolean wasStoppedExternally = false;

    public SourceStreamTask(Environment env) throws Exception {
        this(env, new Object());
    }

    private SourceStreamTask(Environment env, Object lock) throws Exception {
        super(env, null, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE, StreamTaskActionExecutor.synchronizedExecutor(lock));
        this.lock = Preconditions.checkNotNull((Object)lock);
        this.sourceThread = new LegacySourceFunctionThread();
    }

    @Override
    protected void init() {
        SourceFunction source = (SourceFunction)((StreamSource)this.mainOperator).getUserFunction();
        if (source instanceof ExternallyInducedSource) {
            this.externallyInducedCheckpoints = true;
            ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger(){

                @Override
                public void triggerCheckpoint(long checkpointId) throws FlinkException {
                    CheckpointOptions checkpointOptions = CheckpointOptions.forConfig((CheckpointType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (boolean)SourceStreamTask.this.configuration.isExactlyOnceCheckpointMode(), (boolean)SourceStreamTask.this.configuration.isUnalignedCheckpointsEnabled(), (long)SourceStreamTask.this.configuration.getAlignmentTimeout());
                    long timestamp = System.currentTimeMillis();
                    CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp, timestamp);
                    try {
                        SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions).get();
                    }
                    catch (RuntimeException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new FlinkException(e.getMessage(), (Throwable)e);
                    }
                }
            };
            ((ExternallyInducedSource)source).setCheckpointTrigger(triggerHook);
        }
        this.getEnvironment().getMetricGroup().getIOMetricGroup().gauge("checkpointStartDelayNanos", this::getAsyncCheckpointStartDelayNanos);
    }

    @Override
    protected void advanceToEndOfEventTime() throws Exception {
        ((StreamSource)this.mainOperator).advanceToEndOfEventTime();
    }

    @Override
    protected void cleanup() {
    }

    @Override
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        controller.suspendDefaultAction();
        this.sourceThread.setTaskDescription(this.getName());
        this.sourceThread.start();
        this.sourceThread.getCompletionFuture().whenComplete((ignore, sourceThreadThrowable) -> {
            if (this.isCanceled() && ExceptionUtils.findThrowable((Throwable)sourceThreadThrowable, InterruptedException.class).isPresent()) {
                this.mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));
            } else if (!this.wasStoppedExternally && sourceThreadThrowable != null) {
                this.mailboxProcessor.reportThrowable((Throwable)sourceThreadThrowable);
            } else {
                this.mailboxProcessor.allActionsCompleted();
            }
        });
    }

    @Override
    protected void cleanUpInvoke() throws Exception {
        if (this.isFailing()) {
            this.interruptSourceThread(true);
        }
        super.cleanUpInvoke();
    }

    @Override
    protected void cancelTask() {
        this.cancelTask(true);
    }

    @Override
    protected void finishTask() {
        this.wasStoppedExternally = true;
        this.cancelTask(false);
    }

    private void cancelTask(boolean interrupt) {
        try {
            if (this.mainOperator != null) {
                ((StreamSource)this.mainOperator).cancel();
            }
        }
        finally {
            this.interruptSourceThread(interrupt);
        }
    }

    private void interruptSourceThread(boolean interrupt) {
        if (this.sourceThread.isAlive()) {
            if (interrupt) {
                this.sourceThread.interrupt();
            }
        } else if (!this.sourceThread.getCompletionFuture().isDone()) {
            this.sourceThread.getCompletionFuture().complete(null);
        }
    }

    @Override
    protected CompletableFuture<Void> getCompletionFuture() {
        return this.sourceThread.getCompletionFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        if (!this.externallyInducedCheckpoints) {
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        }
        Object object = this.lock;
        synchronized (object) {
            return CompletableFuture.completedFuture(this.isRunning());
        }
    }

    @Override
    protected void declineCheckpoint(long checkpointId) {
        if (!this.externallyInducedCheckpoints) {
            super.declineCheckpoint(checkpointId);
        }
    }

    private class LegacySourceFunctionThread
    extends Thread {
        private final CompletableFuture<Void> completionFuture = new CompletableFuture();

        LegacySourceFunctionThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                ((StreamSource)SourceStreamTask.this.mainOperator).run(SourceStreamTask.this.lock, SourceStreamTask.this.getStreamStatusMaintainer(), SourceStreamTask.this.operatorChain);
                if (!SourceStreamTask.this.wasStoppedExternally && !SourceStreamTask.this.isCanceled()) {
                    Object object = SourceStreamTask.this.lock;
                    synchronized (object) {
                        SourceStreamTask.this.operatorChain.setIgnoreEndOfInput(false);
                    }
                }
                this.completionFuture.complete(null);
            }
            catch (Throwable t) {
                this.completionFuture.completeExceptionally(t);
            }
        }

        public void setTaskDescription(String taskDescription) {
            this.setName("Legacy Source Thread - " + taskDescription);
        }

        CompletableFuture<Void> getCompletionFuture() {
            return SourceStreamTask.this.isFailing() && !this.isAlive() ? CompletableFuture.completedFuture(null) : this.completionFuture;
        }
    }
}

