package org.apache.seatunnel.engine.core.dag.actions;

import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.class */
public class ShufflePartitionStrategy extends ShuffleStrategy {
    private static final Logger log = LoggerFactory.getLogger(ShufflePartitionStrategy.class);
    private final Map<Integer, String[]> inputQueueMapping;
    private int targetPartitions;

    /* loaded from: input_file:org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy$ShufflePartitionStrategyBuilder.class */
    public static abstract class ShufflePartitionStrategyBuilder<C extends ShufflePartitionStrategy, B extends ShufflePartitionStrategyBuilder<C, B>> extends ShuffleStrategy.ShuffleStrategyBuilder<C, B> {
        private int targetPartitions;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy.ShuffleStrategyBuilder
        public abstract B self();

        @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy.ShuffleStrategyBuilder
        public abstract C build();

        public B targetPartitions(int i) {
            this.targetPartitions = i;
            return self();
        }

        @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy.ShuffleStrategyBuilder
        public String toString() {
            return "ShufflePartitionStrategy.ShufflePartitionStrategyBuilder(super=" + super.toString() + ", targetPartitions=" + this.targetPartitions + ")";
        }
    }

    /* loaded from: input_file:org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy$ShufflePartitionStrategyBuilderImpl.class */
    private static final class ShufflePartitionStrategyBuilderImpl extends ShufflePartitionStrategyBuilder<ShufflePartitionStrategy, ShufflePartitionStrategyBuilderImpl> {
        private ShufflePartitionStrategyBuilderImpl() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.seatunnel.engine.core.dag.actions.ShufflePartitionStrategy.ShufflePartitionStrategyBuilder, org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy.ShuffleStrategyBuilder
        public ShufflePartitionStrategyBuilderImpl self() {
            return this;
        }

        @Override // org.apache.seatunnel.engine.core.dag.actions.ShufflePartitionStrategy.ShufflePartitionStrategyBuilder, org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy.ShuffleStrategyBuilder
        public ShufflePartitionStrategy build() {
            return new ShufflePartitionStrategy(this);
        }
    }

    public ShufflePartitionStrategy() {
        this.inputQueueMapping = new HashMap();
    }

    @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy
    public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance hazelcastInstance, int i, int i2) {
        Preconditions.checkArgument(i2 >= 0 && i2 < getInputPartitions());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (int i3 = 0; i3 < this.targetPartitions; i3++) {
            String generateQueueName = generateQueueName(i, i2, i3);
            IQueue<Record<?>> iQueue = getIQueue(hazelcastInstance, generateQueueName);
            iQueue.clear();
            linkedHashMap.put(generateQueueName, iQueue);
        }
        log.info("pipeline[{}] / reader[{}] assigned shuffle queue list: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), linkedHashMap.keySet()});
        return linkedHashMap;
    }

    @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy
    public String createShuffleKey(Record<?> record, int i, int i2) {
        return this.inputQueueMapping.computeIfAbsent(Integer.valueOf(i2), num -> {
            String[] strArr = new String[this.targetPartitions];
            for (int i3 = 0; i3 < this.targetPartitions; i3++) {
                strArr[i3] = generateQueueName(i, num.intValue(), i3);
            }
            return strArr;
        })[ThreadLocalRandom.current().nextInt(this.targetPartitions)];
    }

    @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy
    public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcastInstance, int i, int i2) {
        Preconditions.checkArgument(i2 >= 0 && i2 < this.targetPartitions);
        IQueue<Record<?>>[] iQueueArr = new IQueue[getInputPartitions()];
        for (int i3 = 0; i3 < getInputPartitions(); i3++) {
            iQueueArr[i3] = getIQueue(hazelcastInstance, generateQueueName(i, i3, i2));
        }
        log.info("pipeline[{}] / writer[{}] assigned shuffle queue list: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Stream.of((Object[]) iQueueArr).map(iQueue -> {
            return iQueue.getName();
        }).collect(Collectors.toList())});
        return iQueueArr;
    }

    private String generateQueueName(int i, int i2, int i3) {
        return String.format("ShufflePartition-Queue_%s_%s_%s_%s", Long.valueOf(getJobId()), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
    }

    protected ShufflePartitionStrategy(ShufflePartitionStrategyBuilder<?, ?> shufflePartitionStrategyBuilder) {
        super(shufflePartitionStrategyBuilder);
        this.inputQueueMapping = new HashMap();
        this.targetPartitions = ((ShufflePartitionStrategyBuilder) shufflePartitionStrategyBuilder).targetPartitions;
    }

    public static ShufflePartitionStrategyBuilder<?, ?> builder() {
        return new ShufflePartitionStrategyBuilderImpl();
    }

    public Map<Integer, String[]> getInputQueueMapping() {
        return this.inputQueueMapping;
    }

    public int getTargetPartitions() {
        return this.targetPartitions;
    }

    public void setTargetPartitions(int i) {
        this.targetPartitions = i;
    }

    @Override // org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy
    public String toString() {
        return "ShufflePartitionStrategy(inputQueueMapping=" + getInputQueueMapping() + ", targetPartitions=" + getTargetPartitions() + ")";
    }
}
