package org.apache.seatunnel.engine.server.task;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.record.Barrier;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.class */
public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends SeaTunnelTask {
    private static final ILogger LOGGER = Logger.getLogger(SourceSeaTunnelTask.class);
    private transient SeaTunnelSourceCollector<T> collector;
    private transient Object checkpointLock;
    private transient Serializer<SplitT> splitSerializer;
    private final Map<String, Object> envOption;
    private final PhysicalExecutionFlow<SourceAction, SourceConfig> sourceFlow;

    public SourceSeaTunnelTask(long j, TaskLocation taskLocation, int i, PhysicalExecutionFlow<SourceAction, SourceConfig> physicalExecutionFlow, Map<String, Object> map) {
        super(j, taskLocation, i, physicalExecutionFlow);
        this.sourceFlow = physicalExecutionFlow;
        this.envOption = map;
    }

    @Override // org.apache.seatunnel.engine.server.task.SeaTunnelTask, org.apache.seatunnel.engine.server.task.AbstractTask, org.apache.seatunnel.engine.server.execution.Task
    public void init() throws Exception {
        SeaTunnelDataType producedType;
        super.init();
        this.checkpointLock = new Object();
        this.splitSerializer = this.sourceFlow.getAction().getSource().getSplitSerializer();
        LOGGER.info("starting seatunnel source task, index " + this.indexID);
        if (!(this.startFlowLifeCycle instanceof SourceFlowLifeCycle)) {
            throw new TaskRuntimeException("SourceSeaTunnelTask only support SourceFlowLifeCycle, but get " + this.startFlowLifeCycle.getClass().getName());
        }
        try {
            producedType = CatalogTableUtil.convertToDataType(this.sourceFlow.getAction().getSource().getProducedCatalogTables());
        } catch (UnsupportedOperationException e) {
            producedType = this.sourceFlow.getAction().getSource().getProducedType();
        }
        this.collector = new SeaTunnelSourceCollector<>(this.checkpointLock, this.outputs, mo44getMetricsContext(), FlowControlStrategy.fromMap(this.envOption), producedType);
        ((SourceFlowLifeCycle) this.startFlowLifeCycle).setCollector(this.collector);
    }

    @Override // org.apache.seatunnel.engine.server.task.SeaTunnelTask
    protected SourceFlowLifeCycle<?, ?> createSourceFlowLifeCycle(SourceAction<?, ?, ?> sourceAction, SourceConfig sourceConfig, CompletableFuture<Void> completableFuture, MetricsContext metricsContext) {
        return new SourceFlowLifeCycle<>(sourceAction, this.indexID, sourceConfig.getEnumeratorTask(), this, this.taskLocation, completableFuture, metricsContext);
    }

    @Override // org.apache.seatunnel.engine.server.task.SeaTunnelTask
    protected void collect() throws Exception {
        ((SourceFlowLifeCycle) this.startFlowLifeCycle).collect();
    }

    @Override // org.apache.seatunnel.engine.server.task.AbstractTask, org.apache.seatunnel.engine.server.execution.Task
    @NonNull
    public ProgressState call() throws Exception {
        stateProcess();
        return this.progress.toState();
    }

    public void receivedSourceSplit(List<SplitT> list) {
        ((SourceFlowLifeCycle) this.startFlowLifeCycle).receivedSplits(list);
    }

    @Override // org.apache.seatunnel.engine.server.execution.Task
    public void triggerBarrier(Barrier barrier) throws Exception {
        ((SourceFlowLifeCycle) this.startFlowLifeCycle).triggerBarrier(barrier);
    }

    public Serializer<SplitT> getSplitSerializer() {
        return this.splitSerializer;
    }
}
