package org.apache.seatunnel.engine.server.task.flow;

import com.hazelcast.cluster.Address;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.AbstractTask;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import org.apache.seatunnel.engine.server.task.operation.checkpoint.BarrierFlowOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.class */
public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCommitInfoT, StateT> extends ActionFlowLifeCycle implements OneInputFlowLifeCycle<Record<?>>, InternalCheckpointListener {
    private static final Logger log = LoggerFactory.getLogger(SinkFlowLifeCycle.class);
    private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> sinkAction;
    private SinkWriter<T, CommitInfoT, StateT> writer;
    private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
    private transient Optional<Serializer<StateT>> writerStateSerializer;
    private final int indexID;
    private final TaskLocation taskLocation;
    private Address committerTaskAddress;
    private final TaskLocation committerTaskLocation;
    private Optional<SinkCommitter<CommitInfoT>> committer;
    private Optional<CommitInfoT> lastCommitInfo;
    private MetricsContext metricsContext;
    private Counter sinkWriteCount;
    private Meter sinkWriteQPS;
    private Counter sinkWriteBytes;
    private Meter sinkWriteBytesPerSeconds;
    private final boolean containAggCommitter;
    private MultiTableResourceManager resourceManager;

    public SinkFlowLifeCycle(SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> sinkAction, TaskLocation taskLocation, int i, SeaTunnelTask seaTunnelTask, TaskLocation taskLocation2, boolean z, CompletableFuture<Void> completableFuture, MetricsContext metricsContext) {
        super(sinkAction, seaTunnelTask, completableFuture);
        this.sinkAction = sinkAction;
        this.indexID = i;
        this.taskLocation = taskLocation;
        this.committerTaskLocation = taskLocation2;
        this.containAggCommitter = z;
        this.metricsContext = metricsContext;
        this.sinkWriteCount = metricsContext.counter("SinkWriteCount");
        this.sinkWriteQPS = metricsContext.meter("SinkWriteQPS");
        this.sinkWriteBytes = metricsContext.counter("SinkWriteBytes");
        this.sinkWriteBytesPerSeconds = metricsContext.meter("SinkWriteBytesPerSeconds");
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void init() throws Exception {
        this.commitInfoSerializer = this.sinkAction.getSink().getCommitInfoSerializer();
        this.writerStateSerializer = this.sinkAction.getSink().getWriterStateSerializer();
        this.committer = this.sinkAction.getSink().createCommitter();
        this.lastCommitInfo = Optional.empty();
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void open() throws Exception {
        super.open();
        if (this.containAggCommitter) {
            this.committerTaskAddress = getCommitterTaskAddress();
        }
        registerCommitter();
    }

    private Address getCommitterTaskAddress() throws ExecutionException, InterruptedException {
        return (Address) this.runningTask.getExecutionContext().sendToMaster(new GetTaskGroupAddressOperation(this.committerTaskLocation)).get();
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle, org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void close() throws IOException {
        super.close();
        this.writer.close();
        try {
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
        } catch (Throwable th) {
            log.error("close resourceManager error", th);
        }
    }

    private void registerCommitter() {
        if (this.containAggCommitter) {
            this.runningTask.getExecutionContext().sendToMember(new SinkRegisterOperation(this.taskLocation, this.committerTaskLocation), this.committerTaskAddress).join();
        }
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle
    public void received(Record<?> record) {
        try {
            if (record.getData() instanceof Barrier) {
                long currentTimeMillis = System.currentTimeMillis();
                Barrier barrier = (Barrier) record.getData();
                if (barrier.prepareClose()) {
                    this.prepareClose = true;
                }
                if (barrier.snapshot()) {
                    try {
                        this.lastCommitInfo = this.writer.prepareCommit();
                        List snapshotState = this.writer.snapshotState(barrier.getId());
                        if (this.writerStateSerializer.isPresent()) {
                            this.runningTask.addState(barrier, ActionStateKey.of(this.sinkAction), AbstractTask.serializeStates(this.writerStateSerializer.get(), snapshotState));
                        } else {
                            this.runningTask.addState(barrier, ActionStateKey.of(this.sinkAction), Collections.emptyList());
                        }
                        if (this.containAggCommitter) {
                            CommitInfoT commitinfot = null;
                            if (this.lastCommitInfo.isPresent()) {
                                commitinfot = this.lastCommitInfo.get();
                            }
                            this.runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, this.committerTaskLocation, this.commitInfoSerializer.isPresent() ? this.commitInfoSerializer.get().serialize(commitinfot) : null), this.committerTaskAddress).join();
                        }
                    } catch (Exception e) {
                        this.writer.abortPrepare();
                        throw e;
                    }
                } else if (this.containAggCommitter) {
                    this.runningTask.getExecutionContext().sendToMember(new BarrierFlowOperation(barrier, this.committerTaskLocation), this.committerTaskAddress).join();
                }
                this.runningTask.ack(barrier);
                log.debug("trigger barrier [{}] finished, cost {}ms. taskLocation [{}]", new Object[]{Long.valueOf(barrier.getId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.taskLocation});
            } else if (record.getData() instanceof SchemaChangeEvent) {
                if (this.prepareClose.booleanValue()) {
                } else {
                    this.writer.applySchemaChange((SchemaChangeEvent) record.getData());
                }
            } else {
                if (this.prepareClose.booleanValue()) {
                    return;
                }
                this.writer.write(record.getData());
                this.sinkWriteCount.inc();
                this.sinkWriteQPS.markEvent();
                if (record.getData() instanceof SeaTunnelRow) {
                    long bytesSize = ((SeaTunnelRow) record.getData()).getBytesSize();
                    this.sinkWriteBytes.inc(bytesSize);
                    this.sinkWriteBytesPerSeconds.markEvent(bytesSize);
                }
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.committer.isPresent() && this.lastCommitInfo.isPresent()) {
            this.committer.get().commit(Collections.singletonList(this.lastCommitInfo.get()));
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.committer.isPresent() && this.lastCommitInfo.isPresent()) {
            this.committer.get().abort(Collections.singletonList(this.lastCommitInfo.get()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.util.List] */
    @Override // org.apache.seatunnel.engine.server.checkpoint.Stateful
    public void restoreState(List<ActionSubtaskState> list) throws Exception {
        ArrayList arrayList = new ArrayList();
        if (this.writerStateSerializer.isPresent()) {
            arrayList = (List) list.stream().map((v0) -> {
                return v0.getState();
            }).flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(bArr -> {
                return ExceptionUtil.sneaky(() -> {
                    return this.writerStateSerializer.get().deserialize(bArr);
                });
            }).collect(Collectors.toList());
        }
        if (arrayList.isEmpty()) {
            this.writer = this.sinkAction.getSink().createWriter(new SinkWriterContext(this.indexID, this.metricsContext));
        } else {
            this.writer = this.sinkAction.getSink().restoreWriter(new SinkWriterContext(this.indexID, this.metricsContext), arrayList);
        }
        if (this.writer instanceof SupportResourceShare) {
            this.resourceManager = this.writer.initMultiTableResourceManager(1, 1);
            this.writer.setMultiTableResourceManager(this.resourceManager, 0);
        }
    }
}
