package org.apache.seatunnel.engine.server.task.flow;

import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.class */
public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements OneInputFlowLifeCycle<Record<?>> {
    private static final Logger log = LoggerFactory.getLogger(ShuffleSinkFlowLifeCycle.class);
    private final int pipelineId;
    private final int taskIndex;
    private final ShuffleAction shuffleAction;
    private final Map<String, IQueue<Record<?>>> shuffles;
    private final int shuffleBatchSize;
    private final long shuffleBatchFlushInterval;
    private final Map<String, Queue<Record<?>>> shuffleBuffer;
    private final ShuffleStrategy shuffleStrategy;
    private int shuffleBufferSize;
    private long lastModify;

    public ShuffleSinkFlowLifeCycle(SeaTunnelTask seaTunnelTask, int i, ShuffleAction shuffleAction, HazelcastInstance hazelcastInstance, CompletableFuture<Void> completableFuture) {
        super(seaTunnelTask, completableFuture);
        this.pipelineId = seaTunnelTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
        this.taskIndex = i;
        this.shuffleAction = shuffleAction;
        this.shuffleStrategy = shuffleAction.getConfig().getShuffleStrategy();
        this.shuffles = this.shuffleStrategy.createShuffles(hazelcastInstance, this.pipelineId, i);
        this.shuffleBatchSize = shuffleAction.getConfig().getBatchSize();
        this.shuffleBatchFlushInterval = shuffleAction.getConfig().getBatchFlushInterval();
        this.shuffleBuffer = new HashMap();
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle
    public void received(Record<?> record) throws IOException {
        if (!(record.getData() instanceof Barrier)) {
            if (record.getData() instanceof SchemaChangeEvent) {
                if (this.prepareClose.booleanValue()) {
                    return;
                }
                shuffleItem(record);
                return;
            } else {
                if (this.prepareClose.booleanValue()) {
                    return;
                }
                shuffleItem(record);
                return;
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        shuffleFlush();
        Barrier barrier = (Barrier) record.getData();
        if (barrier.prepareClose()) {
            this.prepareClose = true;
        }
        if (barrier.snapshot()) {
            this.runningTask.addState(barrier, ActionStateKey.of(this.shuffleAction), Collections.emptyList());
        }
        this.runningTask.ack(barrier);
        Iterator<Map.Entry<String, IQueue<Record<?>>>> it = this.shuffles.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().put(record);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        log.debug("trigger barrier [{}] finished, cost: {}ms. taskLocation: [{}]", new Object[]{Long.valueOf(barrier.getId()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.runningTask.getTaskLocation()});
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle, org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void close() throws IOException {
        super.close();
        for (Map.Entry<String, IQueue<Record<?>>> entry : this.shuffles.entrySet()) {
            log.info("destroy shuffle queue: {}", entry.getKey());
            entry.getValue().destroy();
        }
    }

    private synchronized void shuffleItem(Record<?> record) {
        this.shuffleBuffer.computeIfAbsent(this.shuffleStrategy.createShuffleKey(record, this.pipelineId, this.taskIndex), str -> {
            return new LinkedList();
        }).add(record);
        this.shuffleBufferSize++;
        if (this.shuffleBufferSize >= this.shuffleBatchSize || (this.shuffleBufferSize > 1 && System.currentTimeMillis() - this.lastModify > this.shuffleBatchFlushInterval)) {
            shuffleFlush();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x0089, code lost:
    
        continue;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private synchronized void shuffleFlush() {
        /*
            r4 = this;
            r0 = r4
            java.util.Map<java.lang.String, java.util.Queue<org.apache.seatunnel.api.table.type.Record<?>>> r0 = r0.shuffleBuffer
            java.util.Set r0 = r0.entrySet()
            java.util.Iterator r0 = r0.iterator()
            r5 = r0
        Lf:
            r0 = r5
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto L93
            r0 = r5
            java.lang.Object r0 = r0.next()
            java.util.Map$Entry r0 = (java.util.Map.Entry) r0
            r6 = r0
            r0 = r4
            java.util.Map<java.lang.String, com.hazelcast.collection.IQueue<org.apache.seatunnel.api.table.type.Record<?>>> r0 = r0.shuffles
            r1 = r6
            java.lang.Object r1 = r1.getKey()
            java.lang.Object r0 = r0.get(r1)
            com.hazelcast.collection.IQueue r0 = (com.hazelcast.collection.IQueue) r0
            r7 = r0
            r0 = r6
            java.lang.Object r0 = r0.getValue()
            java.util.Queue r0 = (java.util.Queue) r0
            r8 = r0
            r0 = r7
            int r0 = r0.remainingCapacity()
            if (r0 <= 0) goto L5b
            r0 = r7
            r1 = r6
            java.lang.Object r1 = r1.getValue()
            java.util.Collection r1 = (java.util.Collection) r1
            boolean r0 = r0.addAll(r1)
            if (r0 != 0) goto L89
        L5b:
            r0 = r8
            java.lang.Object r0 = r0.poll()
            org.apache.seatunnel.api.table.type.Record r0 = (org.apache.seatunnel.api.table.type.Record) r0
            r9 = r0
            r0 = r9
            if (r0 != 0) goto L6f
            goto L89
        L6f:
            r0 = r7
            r1 = r9
            r0.put(r1)     // Catch: java.lang.InterruptedException -> L7a
            goto L86
        L7a:
            r10 = move-exception
            java.lang.RuntimeException r0 = new java.lang.RuntimeException
            r1 = r0
            r2 = r10
            r1.<init>(r2)
            throw r0
        L86:
            goto L5b
        L89:
            r0 = r8
            r0.clear()
            goto Lf
        L93:
            r0 = r4
            r1 = 0
            r0.shuffleBufferSize = r1
            r0 = r4
            long r1 = java.lang.System.currentTimeMillis()
            r0.lastModify = r1
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.seatunnel.engine.server.task.flow.ShuffleSinkFlowLifeCycle.shuffleFlush():void");
    }
}
