package org.apache.seatunnel.engine.server.task.group.queue;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.record.Barrier;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.class */
public class IntermediateBlockingQueue extends AbstractIntermediateQueue<BlockingQueue<Record<?>>> {
    public IntermediateBlockingQueue(BlockingQueue<Record<?>> blockingQueue) {
        super(blockingQueue);
    }

    @Override // org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue
    public void received(Record<?> record) {
        try {
            BlockingQueue<Record<?>> intermediateQueue = getIntermediateQueue();
            intermediateQueue.getClass();
            handleRecord(record, (v1) -> {
                r2.put(v1);
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue
    public void collect(Collector<Record<?>> collector) throws Exception {
        while (true) {
            Record<?> poll = getIntermediateQueue().poll(100L, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return;
            }
            collector.getClass();
            handleRecord(poll, (v1) -> {
                r2.collect(v1);
            });
        }
    }

    @Override // org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue
    public void close() throws IOException {
        getIntermediateQueue().clear();
    }

    private void handleRecord(Record<?> record, ConsumerWithException<Record<?>> consumerWithException) throws Exception {
        if (!(record.getData() instanceof Barrier)) {
            if (getIntermediateQueueFlowLifeCycle().getPrepareClose().booleanValue()) {
                return;
            }
            consumerWithException.accept(record);
        } else {
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) record.getData();
            getRunningTask().ack(checkpointBarrier);
            if (checkpointBarrier.prepareClose()) {
                getIntermediateQueueFlowLifeCycle().setPrepareClose(true);
            }
            consumerWithException.accept(record);
        }
    }
}
