package org.apache.seatunnel.engine.server.task;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeAfterCheckpointOperation;
import org.apache.seatunnel.engine.server.checkpoint.operation.TriggerSchemaChangeBeforeCheckpointOperation;
import org.apache.seatunnel.engine.server.dag.physical.config.IntermediateQueueConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.ActionFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.ShuffleSinkFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithIntermediateQueue;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/SeaTunnelTask.class */
public abstract class SeaTunnelTask extends AbstractTask {
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelTask.class);
    private static final long serialVersionUID = 2604309561613784425L;
    protected volatile SeaTunnelTaskState currState;
    private final Flow executionFlow;
    protected FlowLifeCycle startFlowLifeCycle;
    protected List<FlowLifeCycle> allCycles;
    protected List<OneInputFlowLifeCycle<Record<?>>> outputs;
    protected List<CompletableFuture<Void>> flowFutures;
    protected final Map<Long, List<ActionSubtaskState>> checkpointStates;
    private final Map<Long, Integer> cycleAcks;
    protected int indexID;
    private TaskGroup taskBelongGroup;
    private SeaTunnelMetricsContext metricsContext;

    public SeaTunnelTask(long j, TaskLocation taskLocation, int i, Flow flow) {
        super(j, taskLocation);
        this.checkpointStates = new ConcurrentHashMap();
        this.cycleAcks = new ConcurrentHashMap();
        this.indexID = i;
        this.executionFlow = flow;
        this.currState = SeaTunnelTaskState.CREATED;
    }

    @Override // org.apache.seatunnel.engine.server.task.AbstractTask, org.apache.seatunnel.engine.server.execution.Task
    public void init() throws Exception {
        super.init();
        this.metricsContext = getExecutionContext().getOrCreateMetricsContext(this.taskLocation);
        this.currState = SeaTunnelTaskState.INIT;
        this.flowFutures = new ArrayList();
        this.allCycles = new ArrayList();
        this.startFlowLifeCycle = convertFlowToActionLifeCycle(this.executionFlow);
        Iterator<FlowLifeCycle> it = this.allCycles.iterator();
        while (it.hasNext()) {
            it.next().init();
        }
        CompletableFuture.allOf((CompletableFuture[]) this.flowFutures.toArray(new CompletableFuture[0])).whenComplete((r4, th) -> {
            this.closeCalled = true;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stateProcess() throws Exception {
        switch (this.currState) {
            case INIT:
                this.currState = SeaTunnelTaskState.WAITING_RESTORE;
                reportTaskStatus(SeaTunnelTaskState.WAITING_RESTORE);
                return;
            case WAITING_RESTORE:
                if (!this.restoreComplete.isDone()) {
                    Thread.sleep(100L);
                    return;
                }
                Iterator<FlowLifeCycle> it = this.allCycles.iterator();
                while (it.hasNext()) {
                    it.next().open();
                }
                this.currState = SeaTunnelTaskState.READY_START;
                reportTaskStatus(SeaTunnelTaskState.READY_START);
                return;
            case READY_START:
                if (this.startCalled) {
                    this.currState = SeaTunnelTaskState.STARTING;
                    return;
                } else {
                    Thread.sleep(100L);
                    return;
                }
            case STARTING:
                this.currState = SeaTunnelTaskState.RUNNING;
                return;
            case RUNNING:
                collect();
                if (this.prepareCloseStatus) {
                    this.currState = SeaTunnelTaskState.PREPARE_CLOSE;
                    return;
                }
                return;
            case PREPARE_CLOSE:
                if (this.closeCalled) {
                    this.currState = SeaTunnelTaskState.CLOSED;
                    return;
                } else {
                    Thread.sleep(100L);
                    return;
                }
            case CLOSED:
                close();
                this.progress.done();
                return;
            case CANCELLING:
                close();
                this.currState = SeaTunnelTaskState.CANCELED;
                this.progress.done();
                return;
            default:
                throw new IllegalArgumentException("Unknown Enumerator State: " + this.currState);
        }
    }

    public void setTaskGroup(TaskGroup taskGroup) {
        this.taskBelongGroup = taskGroup;
    }

    private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) throws Exception {
        AbstractFlowLifeCycle intermediateQueueFlowLifeCycle;
        if (flow == null) {
            throw new NullPointerException("flow is marked non-null but is null");
        }
        ArrayList arrayList = new ArrayList();
        if (!flow.getNext().isEmpty()) {
            Iterator<Flow> it = flow.getNext().iterator();
            while (it.hasNext()) {
                arrayList.add((OneInputFlowLifeCycle) convertFlowToActionLifeCycle(it.next()));
            }
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.flowFutures.add(completableFuture);
        if (flow instanceof PhysicalExecutionFlow) {
            PhysicalExecutionFlow physicalExecutionFlow = (PhysicalExecutionFlow) flow;
            if (physicalExecutionFlow.getAction() instanceof SourceAction) {
                intermediateQueueFlowLifeCycle = createSourceFlowLifeCycle((SourceAction) physicalExecutionFlow.getAction(), (SourceConfig) physicalExecutionFlow.getConfig(), completableFuture, mo71getMetricsContext());
                this.outputs = arrayList;
            } else if (physicalExecutionFlow.getAction() instanceof SinkAction) {
                intermediateQueueFlowLifeCycle = new SinkFlowLifeCycle(physicalExecutionFlow.getAction(), this.taskLocation, this.indexID, this, ((SinkConfig) physicalExecutionFlow.getConfig()).getCommitterTask(), ((SinkConfig) physicalExecutionFlow.getConfig()).isContainCommitter(), completableFuture, mo71getMetricsContext());
            } else if (physicalExecutionFlow.getAction() instanceof TransformChainAction) {
                intermediateQueueFlowLifeCycle = new TransformFlowLifeCycle(physicalExecutionFlow.getAction(), this, new SeaTunnelTransformCollector(arrayList), completableFuture);
            } else {
                if (!(physicalExecutionFlow.getAction() instanceof ShuffleAction)) {
                    throw new UnknownActionException(physicalExecutionFlow.getAction());
                }
                ShuffleAction action = physicalExecutionFlow.getAction();
                HazelcastInstance taskExecutionContext = getExecutionContext().getInstance();
                intermediateQueueFlowLifeCycle = flow.getNext().isEmpty() ? new ShuffleSinkFlowLifeCycle(this, this.indexID, action, taskExecutionContext, completableFuture) : new ShuffleSourceFlowLifeCycle(this, this.indexID, action, taskExecutionContext, completableFuture);
                this.outputs = arrayList;
            }
        } else {
            if (!(flow instanceof IntermediateExecutionFlow)) {
                throw new UnknownFlowException(flow);
            }
            intermediateQueueFlowLifeCycle = new IntermediateQueueFlowLifeCycle(this, completableFuture, ((AbstractTaskGroupWithIntermediateQueue) this.taskBelongGroup).getQueueCache(((IntermediateQueueConfig) ((IntermediateExecutionFlow) flow).getConfig()).getQueueID()));
            this.outputs = arrayList;
        }
        this.allCycles.add(intermediateQueueFlowLifeCycle);
        return intermediateQueueFlowLifeCycle;
    }

    protected abstract SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(SourceAction<?, ?, ?> sourceAction, SourceConfig sourceConfig, CompletableFuture<Void> completableFuture, MetricsContext metricsContext);

    protected abstract void collect() throws Exception;

    @Override // org.apache.seatunnel.engine.server.task.AbstractTask
    public Set<URL> getJarsUrl() {
        return getFlowInfo((action, set) -> {
            set.addAll(action.getJarUrls());
        });
    }

    @Override // org.apache.seatunnel.engine.server.task.AbstractTask
    public Set<ConnectorJarIdentifier> getConnectorPluginJars() {
        return getFlowInfo((action, set) -> {
            set.addAll(action.getConnectorJarIdentifiers());
        });
    }

    public Set<ActionStateKey> getActionStateKeys() {
        return getFlowInfo((action, set) -> {
            set.add(ActionStateKey.of(action));
        });
    }

    private <T> Set<T> getFlowInfo(BiConsumer<Action, Set<T>> biConsumer) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.executionFlow);
        HashSet hashSet = new HashSet();
        while (!arrayList.isEmpty()) {
            ArrayList arrayList2 = new ArrayList();
            arrayList.forEach(flow -> {
                if (flow instanceof PhysicalExecutionFlow) {
                    biConsumer.accept(((PhysicalExecutionFlow) flow).getAction(), hashSet);
                }
                arrayList2.addAll(flow.getNext());
            });
            arrayList.clear();
            arrayList.addAll(arrayList2);
        }
        return hashSet;
    }

    @Override // org.apache.seatunnel.engine.server.task.AbstractTask, org.apache.seatunnel.engine.server.execution.Task
    public void close() throws IOException {
        super.close();
        MDCTracer.tracing(this.allCycles.parallelStream()).forEach(flowLifeCycle -> {
            try {
                flowLifeCycle.close();
            } catch (IOException e) {
                log.error("Close FlowLifeCycle error.", e);
            }
        });
    }

    public void ack(Barrier barrier) {
        log.debug("seatunnel task ack barrier[{}]", this.taskLocation);
        if (this.cycleAcks.compute(Long.valueOf(barrier.getId()), (l, num) -> {
            return Integer.valueOf(num == null ? 1 : Integer.valueOf(num.intValue() + 1).intValue());
        }).intValue() == this.allCycles.size()) {
            this.cycleAcks.remove(Long.valueOf(barrier.getId()));
            if (barrier.prepareClose(this.taskLocation)) {
                this.prepareCloseStatus = true;
                this.prepareCloseBarrierId.set(barrier.getId());
            }
            if (barrier.snapshot()) {
                getExecutionContext().sendToMaster(new TaskAcknowledgeOperation(this.taskLocation, (CheckpointBarrier) barrier, this.checkpointStates.remove(Long.valueOf(barrier.getId())))).join();
            }
        }
    }

    public InvocationFuture<Object> triggerSchemaChangeBeforeCheckpoint() {
        log.info("trigger schema-change-before checkpoint. jobID[{}], taskLocation[{}]", Long.valueOf(this.jobID), this.taskLocation);
        return getExecutionContext().sendToMaster(new TriggerSchemaChangeBeforeCheckpointOperation(this.taskLocation));
    }

    public InvocationFuture<Object> triggerSchemaChangeAfterCheckpoint() {
        log.info("trigger schema-change-after checkpoint. jobID[{}], taskLocation[{}]", Long.valueOf(this.jobID), this.taskLocation);
        return getExecutionContext().sendToMaster(new TriggerSchemaChangeAfterCheckpointOperation(this.taskLocation));
    }

    public void addState(Barrier barrier, ActionStateKey actionStateKey, List<byte[]> list) {
        this.checkpointStates.computeIfAbsent(Long.valueOf(barrier.getId()), l -> {
            return new ArrayList();
        }).add(new ActionSubtaskState(actionStateKey, this.indexID, list));
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        notifyAllAction(internalCheckpointListener -> {
            internalCheckpointListener.notifyCheckpointComplete(j);
        });
        tryClose(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        notifyAllAction(internalCheckpointListener -> {
            internalCheckpointListener.notifyCheckpointAborted(j);
        });
        tryClose(j);
    }

    public void notifyCheckpointEnd(long j) throws Exception {
        notifyAllAction(internalCheckpointListener -> {
            internalCheckpointListener.notifyCheckpointEnd(j);
        });
        tryClose(j);
    }

    public void notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumerWithException) {
        this.allCycles.stream().filter(flowLifeCycle -> {
            return flowLifeCycle instanceof InternalCheckpointListener;
        }).map(flowLifeCycle2 -> {
            return (InternalCheckpointListener) flowLifeCycle2;
        }).forEach(internalCheckpointListener -> {
            ExceptionUtil.sneaky(consumerWithException, internalCheckpointListener);
        });
    }

    @Override // org.apache.seatunnel.engine.server.execution.Task, org.apache.seatunnel.engine.server.checkpoint.Stateful
    public void restoreState(List<ActionSubtaskState> list) throws Exception {
        log.debug("restoreState for SeaTunnelTask[{}]", list);
        if (null == list) {
            log.debug("restoreState is null, do nothing!");
            return;
        }
        Map map = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getStateKey();
        }, Collectors.toList()));
        this.allCycles.stream().filter(flowLifeCycle -> {
            return flowLifeCycle instanceof ActionFlowLifeCycle;
        }).map(flowLifeCycle2 -> {
            return (ActionFlowLifeCycle) flowLifeCycle2;
        }).forEach(actionFlowLifeCycle -> {
            try {
                actionFlowLifeCycle.restoreState((List) map.getOrDefault(ActionStateKey.of(actionFlowLifeCycle.getAction()), Collections.emptyList()));
            } catch (Exception e) {
                ExceptionUtil.sneakyThrow(e);
            }
        });
        this.restoreComplete.complete((Object) null);
        log.debug("restoreState for SeaTunnelTask finished, actionStateList: {}", list);
    }

    @Override // org.apache.seatunnel.engine.server.execution.Task
    /* renamed from: getMetricsContext, reason: merged with bridge method [inline-methods] */
    public SeaTunnelMetricsContext mo71getMetricsContext() {
        return this.metricsContext;
    }

    @Override // org.apache.seatunnel.engine.server.execution.Task
    public void provideDynamicMetrics(MetricDescriptor metricDescriptor, MetricsCollectionContext metricsCollectionContext) {
        if (null != this.metricsContext) {
            this.metricsContext.provideDynamicMetrics(metricDescriptor.copy().withTag("taskName", getClass().getSimpleName()), metricsCollectionContext);
        }
    }
}
