package org.apache.seatunnel.engine.server.task.group;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskGroupType;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
import org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.class */
public class TaskGroupWithIntermediateBlockingQueue extends AbstractTaskGroupWithIntermediateQueue {
    public static final int QUEUE_SIZE = 2048;
    private Map<Long, BlockingQueue<Record<?>>> blockingQueueCache;

    public TaskGroupWithIntermediateBlockingQueue(TaskGroupLocation taskGroupLocation, String str, Collection<Task> collection) {
        super(taskGroupLocation, str, collection);
        this.blockingQueueCache = null;
    }

    @Override // org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl, org.apache.seatunnel.engine.server.execution.TaskGroup
    public void init() {
        this.blockingQueueCache = new ConcurrentHashMap();
        Stream<Task> stream = getTasks().stream();
        Class<SeaTunnelTask> cls = SeaTunnelTask.class;
        SeaTunnelTask.class.getClass();
        stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).map(task -> {
            return (SeaTunnelTask) task;
        }).forEach(seaTunnelTask -> {
            seaTunnelTask.setTaskGroup(this);
        });
    }

    @Override // org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithIntermediateQueue
    public AbstractIntermediateQueue<?> getQueueCache(long j) {
        this.blockingQueueCache.computeIfAbsent(Long.valueOf(j), l -> {
            return new ArrayBlockingQueue(QUEUE_SIZE);
        });
        return new IntermediateBlockingQueue(this.blockingQueueCache.get(Long.valueOf(j)));
    }

    @Override // org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl, org.apache.seatunnel.engine.server.execution.TaskGroup
    public TaskGroupType getTaskGroupType() {
        return TaskGroupType.INTERMEDIATE_BLOCKING_QUEUE;
    }
}
