package org.apache.seatunnel.engine.server.task.group.queue;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import java.io.IOException;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEvent;
import org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventHandler;
import org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventProducer;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.class */
public class IntermediateDisruptor extends AbstractIntermediateQueue<Disruptor<RecordEvent>> {
    private volatile boolean isExecuted;

    public IntermediateDisruptor(Disruptor<RecordEvent> disruptor) {
        super(disruptor);
    }

    @Override // org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue
    public void received(Record<?> record) {
        getIntermediateQueue().getRingBuffer();
        RecordEventProducer.onData(record, getIntermediateQueue().getRingBuffer(), getIntermediateQueueFlowLifeCycle());
    }

    @Override // org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue
    public void collect(Collector<Record<?>> collector) throws Exception {
        if (this.isExecuted) {
            Thread.sleep(100L);
            return;
        }
        getIntermediateQueue().handleEventsWith(new EventHandler[]{new RecordEventHandler(getRunningTask(), collector, getIntermediateQueueFlowLifeCycle())});
        getIntermediateQueue().start();
        this.isExecuted = true;
    }

    @Override // org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue
    public void close() throws IOException {
        getIntermediateQueue().shutdown();
    }
}
