package org.apache.seatunnel.engine.server.task;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.schema.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.class */
public class SeaTunnelSourceCollector<T> implements Collector<T> {
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelSourceCollector.class);
    private final Object checkpointLock;
    private final List<OneInputFlowLifeCycle<Record<?>>> outputs;
    private final MetricsContext metricsContext;
    private final TaskMetricsCalcContext taskMetricsCalcContext;
    private volatile boolean emptyThisPollNext;
    private SeaTunnelDataType rowType;
    private FlowControlGate flowControlGate;
    private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false);
    private final AtomicBoolean schemaChangeAfterCheckpointSignal = new AtomicBoolean(false);
    private final DataTypeChangeEventHandler dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
    private Map<String, SeaTunnelRowType> rowTypeMap = new HashMap();

    public SeaTunnelSourceCollector(Object obj, List<OneInputFlowLifeCycle<Record<?>>> list, MetricsContext metricsContext, FlowControlStrategy flowControlStrategy, SeaTunnelDataType seaTunnelDataType, List<TablePath> list2) {
        this.checkpointLock = obj;
        this.outputs = list;
        this.rowType = seaTunnelDataType;
        this.metricsContext = metricsContext;
        if (seaTunnelDataType instanceof MultipleRowType) {
            ((MultipleRowType) seaTunnelDataType).iterator().forEachRemaining(entry -> {
            });
        }
        this.taskMetricsCalcContext = new TaskMetricsCalcContext(metricsContext, PluginType.SOURCE, CollectionUtils.isNotEmpty(list2), list2);
        this.flowControlGate = FlowControlGate.create(flowControlStrategy);
    }

    public void collect(T t) {
        try {
            if (t instanceof SeaTunnelRow) {
                String tableId = ((SeaTunnelRow) t).getTableId();
                if (this.rowType instanceof SeaTunnelRowType) {
                    ((SeaTunnelRow) t).getBytesSize(this.rowType);
                } else {
                    if (!(this.rowType instanceof MultipleRowType)) {
                        throw new SeaTunnelEngineException("Unsupported row type: " + this.rowType.getClass().getName());
                    }
                    ((SeaTunnelRow) t).getBytesSize(this.rowTypeMap.get(tableId));
                }
                this.flowControlGate.audit((SeaTunnelRow) t);
                this.taskMetricsCalcContext.updateMetrics(t, tableId);
            }
            sendRecordToNext(new Record<>(t));
            this.emptyThisPollNext = false;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void collect(SchemaChangeEvent schemaChangeEvent) {
        try {
            if (this.rowType instanceof SeaTunnelRowType) {
                this.rowType = this.dataTypeChangeEventHandler.reset(this.rowType).apply(schemaChangeEvent);
            } else {
                if (!(this.rowType instanceof MultipleRowType)) {
                    throw new SeaTunnelEngineException("Unsupported row type: " + this.rowType.getClass().getName());
                }
                String tablePath = schemaChangeEvent.tablePath().toString();
                this.rowTypeMap.put(tablePath, this.dataTypeChangeEventHandler.reset(this.rowTypeMap.get(tablePath)).apply(schemaChangeEvent));
            }
            sendRecordToNext(new Record<>(schemaChangeEvent));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void markSchemaChangeBeforeCheckpoint() {
        if (this.schemaChangeAfterCheckpointSignal.get()) {
            throw new IllegalStateException("schema-change-after checkpoint already marked.");
        }
        if (!this.schemaChangeBeforeCheckpointSignal.compareAndSet(false, true)) {
            throw new IllegalStateException("schema-change-before checkpoint already marked.");
        }
        log.info("mark schema-change-before checkpoint signal.");
    }

    public void markSchemaChangeAfterCheckpoint() {
        if (this.schemaChangeBeforeCheckpointSignal.get()) {
            throw new IllegalStateException("schema-change-before checkpoint already marked.");
        }
        if (!this.schemaChangeAfterCheckpointSignal.compareAndSet(false, true)) {
            throw new IllegalStateException("schema-change-after checkpoint already marked.");
        }
        log.info("mark schema-change-after checkpoint signal.");
    }

    public boolean captureSchemaChangeBeforeCheckpointSignal() {
        if (!this.schemaChangeBeforeCheckpointSignal.get()) {
            return false;
        }
        log.info("capture schema-change-before checkpoint signal.");
        return this.schemaChangeBeforeCheckpointSignal.getAndSet(false);
    }

    public boolean captureSchemaChangeAfterCheckpointSignal() {
        if (!this.schemaChangeAfterCheckpointSignal.get()) {
            return false;
        }
        log.info("capture schema-change-after checkpoint signal.");
        return this.schemaChangeAfterCheckpointSignal.getAndSet(false);
    }

    public Object getCheckpointLock() {
        return this.checkpointLock;
    }

    public boolean isEmptyThisPollNext() {
        return this.emptyThisPollNext;
    }

    public void resetEmptyThisPollNext() {
        this.emptyThisPollNext = true;
    }

    public void sendRecordToNext(Record<?> record) throws IOException {
        synchronized (this.checkpointLock) {
            Iterator<OneInputFlowLifeCycle<Record<?>>> it = this.outputs.iterator();
            while (it.hasNext()) {
                it.next().received(record);
            }
        }
    }
}
