package org.apache.seatunnel.engine.server.dag.physical;

import com.google.common.collect.Lists;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
import org.apache.seatunnel.engine.server.dag.physical.config.IntermediateQueueConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
import org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask;
import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateDisruptor;

/* loaded from: input_file:org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.class */
public class PhysicalPlanGenerator {
    private final List<Pipeline> pipelines;
    private final JobImmutableInformation jobImmutableInformation;
    private final long initializationTimestamp;
    private final ExecutorService executorService;
    private final NodeEngine nodeEngine;
    private final FlakeIdGenerator flakeIdGenerator;
    private final Set<TaskLocation> pipelineTasks;
    private final Set<TaskLocation> startingTasks;
    private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> subtaskActions;
    private final IMap<Object, Object> runningJobStateIMap;
    private final IMap<Object, Object> runningJobStateTimestampsIMap;
    private final QueueType queueType;
    private final IdGenerator idGenerator = new IdGenerator();
    private final Map<SourceAction<?, ?, ?>, TaskLocation> enumeratorTaskIDMap = new HashMap();
    private final Map<SinkAction<?, ?, ?, ?>, TaskLocation> committerTaskIDMap = new HashMap();

    public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan, @NonNull NodeEngine nodeEngine, @NonNull JobImmutableInformation jobImmutableInformation, long j, @NonNull ExecutorService executorService, @NonNull FlakeIdGenerator flakeIdGenerator, @NonNull IMap iMap, @NonNull IMap iMap2, @NonNull QueueType queueType) {
        if (executionPlan == null) {
            throw new NullPointerException("executionPlan is marked non-null but is null");
        }
        if (nodeEngine == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        if (executorService == null) {
            throw new NullPointerException("executorService is marked non-null but is null");
        }
        if (flakeIdGenerator == null) {
            throw new NullPointerException("flakeIdGenerator is marked non-null but is null");
        }
        if (iMap == null) {
            throw new NullPointerException("runningJobStateIMap is marked non-null but is null");
        }
        if (iMap2 == null) {
            throw new NullPointerException("runningJobStateTimestampsIMap is marked non-null but is null");
        }
        if (queueType == null) {
            throw new NullPointerException("queueType is marked non-null but is null");
        }
        this.pipelines = executionPlan.getPipelines();
        this.nodeEngine = nodeEngine;
        this.jobImmutableInformation = jobImmutableInformation;
        this.initializationTimestamp = j;
        this.executorService = executorService;
        this.flakeIdGenerator = flakeIdGenerator;
        this.pipelineTasks = new HashSet();
        this.startingTasks = new HashSet();
        this.subtaskActions = new HashMap();
        this.runningJobStateIMap = iMap;
        this.runningJobStateTimestampsIMap = iMap2;
        this.queueType = queueType;
    }

    public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        HashMap hashMap = new HashMap();
        int size = this.pipelines.size();
        return Tuple2.tuple2(new PhysicalPlan((List) this.pipelines.stream().map(pipeline -> {
            this.pipelineTasks.clear();
            this.startingTasks.clear();
            this.subtaskActions.clear();
            int intValue = pipeline.getId().intValue();
            List<ExecutionEdge> edges = pipeline.getEdges();
            List<SourceAction<?, ?, ?>> findSourceAction = findSourceAction(edges);
            List<PhysicalVertex> enumeratorTask = getEnumeratorTask(findSourceAction, intValue, size);
            enumeratorTask.addAll(getCommitterTask(edges, intValue, size));
            List<PhysicalVertex> sourceTask = getSourceTask(edges, findSourceAction, intValue, size);
            sourceTask.addAll(getShuffleTask(edges, intValue, size));
            copyOnWriteArrayList.add(new PassiveCompletableFuture(new CompletableFuture()));
            hashMap.put(Integer.valueOf(intValue), CheckpointPlan.builder().pipelineId(intValue).pipelineSubtasks(this.pipelineTasks).startingSubtasks(this.startingTasks).pipelineActions(pipeline.getActions()).subtaskActions(this.subtaskActions).build());
            return new SubPlan(intValue, size, this.initializationTimestamp, sourceTask, enumeratorTask, this.jobImmutableInformation, this.executorService, this.runningJobStateIMap, this.runningJobStateTimestampsIMap);
        }).collect(Collectors.toList()), this.executorService, this.jobImmutableInformation, this.initializationTimestamp, this.runningJobStateIMap, this.runningJobStateTimestampsIMap), hashMap);
    }

    private List<SourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> list) {
        return (List) list.stream().filter(executionEdge -> {
            return executionEdge.getLeftVertex().getAction() instanceof SourceAction;
        }).map(executionEdge2 -> {
            return executionEdge2.getLeftVertex().getAction();
        }).distinct().collect(Collectors.toList());
    }

    private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> list, int i, int i2) {
        AtomicInteger atomicInteger = new AtomicInteger(-1);
        List list2 = (List) list.stream().filter(executionEdge -> {
            return executionEdge.getRightVertex().getAction() instanceof SinkAction;
        }).collect(Collectors.toList());
        return (List) list2.stream().map(executionEdge2 -> {
            return executionEdge2.getRightVertex().getAction();
        }).map(sinkAction -> {
            try {
                Optional createAggregatedCommitter = sinkAction.getSink().createAggregatedCommitter();
                if (!createAggregatedCommitter.isPresent()) {
                    return null;
                }
                long nextId = this.idGenerator.getNextId();
                long nextId2 = this.idGenerator.getNextId();
                TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), i, nextId);
                TaskLocation taskLocation = new TaskLocation(taskGroupLocation, nextId2, 0);
                SinkAggregatedCommitterTask sinkAggregatedCommitterTask = new SinkAggregatedCommitterTask(this.jobImmutableInformation.getJobId(), taskLocation, sinkAction, (SinkAggregatedCommitter) createAggregatedCommitter.get());
                this.committerTaskIDMap.put(sinkAction, taskLocation);
                this.pipelineTasks.add(taskLocation);
                this.subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2(ActionStateKey.of(sinkAction), -1)));
                return new PhysicalVertex(atomicInteger.incrementAndGet(), this.executorService, list2.size(), new TaskGroupDefaultImpl(taskGroupLocation, sinkAction.getName() + "-AggregatedCommitterTask", Lists.newArrayList(new Task[]{sinkAggregatedCommitterTask})), this.flakeIdGenerator, i, i2, sinkAction.getJarUrls(), sinkAction.getConnectorJarIdentifiers(), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private List<PhysicalVertex> getShuffleTask(List<ExecutionEdge> list, int i, int i2) {
        return (List) ((Set) list.stream().filter(executionEdge -> {
            return executionEdge.getLeftVertex().getAction() instanceof ShuffleAction;
        }).map(executionEdge2 -> {
            return executionEdge2.getLeftVertex().getAction();
        }).collect(Collectors.toSet())).stream().map(shuffleAction -> {
            return new PhysicalExecutionFlow(shuffleAction, getNextWrapper(list, shuffleAction));
        }).flatMap(physicalExecutionFlow -> {
            ArrayList arrayList = new ArrayList();
            ShuffleAction action = physicalExecutionFlow.getAction();
            ShuffleConfig config = action.getConfig();
            ShuffleMultipleRowStrategy shuffleStrategy = config.getShuffleStrategy();
            if (shuffleStrategy instanceof ShuffleMultipleRowStrategy) {
                ShuffleMultipleRowStrategy shuffleMultipleRowStrategy = shuffleStrategy;
                Iterator<Flow> it = physicalExecutionFlow.getNext().iterator();
                while (it.hasNext()) {
                    PhysicalExecutionFlow physicalExecutionFlow = (PhysicalExecutionFlow) it.next();
                    SinkAction action2 = physicalExecutionFlow.getAction();
                    String multipleRowTableId = action2.getConfig().getMultipleRowTableId();
                    long nextId = this.idGenerator.getNextId();
                    long nextId2 = this.idGenerator.getNextId();
                    ShuffleAction shuffleAction2 = new ShuffleAction(this.idGenerator.getNextId(), String.format("%s -> %s -> %s", action.getName(), multipleRowTableId, action2.getName()), config.toBuilder().shuffleStrategy(shuffleMultipleRowStrategy.toBuilder().targetTableId(multipleRowTableId).build()).build());
                    shuffleAction2.setParallelism(1);
                    PhysicalExecutionFlow physicalExecutionFlow2 = new PhysicalExecutionFlow(shuffleAction2, Collections.singletonList(physicalExecutionFlow));
                    setFlowConfig(physicalExecutionFlow2, 0);
                    TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), i, mixIDPrefixAndIndex(nextId2, 0));
                    SeaTunnelTask transformSeaTunnelTask = new TransformSeaTunnelTask(this.jobImmutableInformation.getJobId(), new TaskLocation(taskGroupLocation, nextId, 0), 0, physicalExecutionFlow2);
                    fillCheckpointPlan(transformSeaTunnelTask);
                    arrayList.add(new PhysicalVertex(0, this.executorService, physicalExecutionFlow2.getAction().getParallelism(), new TaskGroupDefaultImpl(taskGroupLocation, physicalExecutionFlow2.getAction().getName() + "-ShuffleTask", Collections.singletonList(transformSeaTunnelTask)), this.flakeIdGenerator, i, i2, transformSeaTunnelTask.getJarsUrl(), transformSeaTunnelTask.getConnectorPluginJars(), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                }
            } else {
                long nextId3 = this.idGenerator.getNextId();
                long nextId4 = this.idGenerator.getNextId();
                for (int i3 = 0; i3 < physicalExecutionFlow.getAction().getParallelism(); i3++) {
                    TaskGroupLocation taskGroupLocation2 = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), i, mixIDPrefixAndIndex(nextId4, i3));
                    TaskLocation taskLocation = new TaskLocation(taskGroupLocation2, nextId3, i3);
                    setFlowConfig(physicalExecutionFlow, i3);
                    SeaTunnelTask transformSeaTunnelTask2 = new TransformSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, i3, physicalExecutionFlow);
                    fillCheckpointPlan(transformSeaTunnelTask2);
                    arrayList.add(new PhysicalVertex(i3, this.executorService, physicalExecutionFlow.getAction().getParallelism(), new TaskGroupDefaultImpl(taskGroupLocation2, physicalExecutionFlow.getAction().getName() + "-ShuffleTask", Lists.newArrayList(new Task[]{transformSeaTunnelTask2})), this.flakeIdGenerator, i, i2, transformSeaTunnelTask2.getJarsUrl(), transformSeaTunnelTask2.getConnectorPluginJars(), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                }
            }
            return arrayList.stream();
        }).collect(Collectors.toList());
    }

    private List<PhysicalVertex> getEnumeratorTask(List<SourceAction<?, ?, ?>> list, int i, int i2) {
        AtomicInteger atomicInteger = new AtomicInteger(-1);
        return (List) list.stream().map(sourceAction -> {
            long nextId = this.idGenerator.getNextId();
            long nextId2 = this.idGenerator.getNextId();
            TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), i, nextId);
            TaskLocation taskLocation = new TaskLocation(taskGroupLocation, nextId2, 0);
            SourceSplitEnumeratorTask sourceSplitEnumeratorTask = new SourceSplitEnumeratorTask(this.jobImmutableInformation.getJobId(), taskLocation, sourceAction);
            this.pipelineTasks.add(taskLocation);
            this.startingTasks.add(taskLocation);
            this.subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2(ActionStateKey.of(sourceAction), -1)));
            this.enumeratorTaskIDMap.put(sourceAction, taskLocation);
            return new PhysicalVertex(atomicInteger.incrementAndGet(), this.executorService, list.size(), new TaskGroupDefaultImpl(taskGroupLocation, sourceAction.getName() + "-SplitEnumerator", Lists.newArrayList(new Task[]{sourceSplitEnumeratorTask})), this.flakeIdGenerator, i, i2, sourceSplitEnumeratorTask.getJarsUrl(), sourceSplitEnumeratorTask.getConnectorPluginJars(), this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap);
        }).collect(Collectors.toList());
    }

    private List<PhysicalVertex> getSourceTask(List<ExecutionEdge> list, List<SourceAction<?, ?, ?>> list2, int i, int i2) {
        return (List) list2.stream().map(sourceAction -> {
            return new PhysicalExecutionFlow(sourceAction, getNextWrapper(list, sourceAction));
        }).flatMap(physicalExecutionFlow -> {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList(Collections.singletonList(physicalExecutionFlow));
            if (sourceWithSink(physicalExecutionFlow)) {
                arrayList2.addAll(splitSinkFromFlow(physicalExecutionFlow));
            }
            long nextId = this.idGenerator.getNextId();
            HashMap hashMap = new HashMap();
            for (int i3 = 0; i3 < physicalExecutionFlow.getAction().getParallelism(); i3++) {
                int i4 = i3;
                TaskGroupLocation taskGroupLocation = new TaskGroupLocation(this.jobImmutableInformation.getJobId(), i, mixIDPrefixAndIndex(nextId, i3));
                List list3 = (List) arrayList2.stream().map(flow -> {
                    setFlowConfig(flow, i4);
                    TaskLocation taskLocation = new TaskLocation(taskGroupLocation, ((Long) hashMap.computeIfAbsent(Long.valueOf(flow.getFlowID()), l -> {
                        return Long.valueOf(this.idGenerator.getNextId());
                    })).longValue(), i4);
                    return flow instanceof PhysicalExecutionFlow ? new SourceSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, i4, (PhysicalExecutionFlow) flow, this.jobImmutableInformation.getJobConfig().getEnvOptions()) : new TransformSeaTunnelTask(this.jobImmutableInformation.getJobId(), taskLocation, i4, flow);
                }).peek(this::fillCheckpointPlan).collect(Collectors.toList());
                Set set = (Set) list3.stream().flatMap(seaTunnelTask -> {
                    return seaTunnelTask.getJarsUrl().stream();
                }).collect(Collectors.toSet());
                Set set2 = (Set) list3.stream().flatMap(seaTunnelTask2 -> {
                    return seaTunnelTask2.getConnectorPluginJars().stream();
                }).collect(Collectors.toSet());
                Stream stream = list3.stream();
                Class<TransformSeaTunnelTask> cls = TransformSeaTunnelTask.class;
                TransformSeaTunnelTask.class.getClass();
                if (stream.anyMatch((v1) -> {
                    return r1.isInstance(v1);
                })) {
                    arrayList.add(new PhysicalVertex(i3, this.executorService, physicalExecutionFlow.getAction().getParallelism(), this.queueType.equals(QueueType.BLOCKINGQUEUE) ? new TaskGroupWithIntermediateBlockingQueue(taskGroupLocation, physicalExecutionFlow.getAction().getName() + "-SourceTask", (Collection) list3.stream().map(seaTunnelTask3 -> {
                        return seaTunnelTask3;
                    }).collect(Collectors.toList())) : new TaskGroupWithIntermediateDisruptor(taskGroupLocation, physicalExecutionFlow.getAction().getName() + "-SourceTask", (Collection) list3.stream().map(seaTunnelTask4 -> {
                        return seaTunnelTask4;
                    }).collect(Collectors.toList())), this.flakeIdGenerator, i, i2, set, set2, this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                } else {
                    arrayList.add(new PhysicalVertex(i3, this.executorService, physicalExecutionFlow.getAction().getParallelism(), new TaskGroupDefaultImpl(taskGroupLocation, physicalExecutionFlow.getAction().getName() + "-SourceTask", (Collection) list3.stream().map(seaTunnelTask5 -> {
                        return seaTunnelTask5;
                    }).collect(Collectors.toList())), this.flakeIdGenerator, i, i2, set, set2, this.jobImmutableInformation, this.initializationTimestamp, this.nodeEngine, this.runningJobStateIMap, this.runningJobStateTimestampsIMap));
                }
            }
            return arrayList.stream();
        }).collect(Collectors.toList());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void fillCheckpointPlan(SeaTunnelTask seaTunnelTask) {
        this.pipelineTasks.add(seaTunnelTask.getTaskLocation());
        this.subtaskActions.put(seaTunnelTask.getTaskLocation(), seaTunnelTask.getActionStateKeys().stream().map(actionStateKey -> {
            return Tuple2.tuple2(actionStateKey, Integer.valueOf(seaTunnelTask.getTaskLocation().getTaskIndex()));
        }).collect(Collectors.toSet()));
    }

    private void setFlowConfig(Flow flow, int i) {
        if (flow instanceof PhysicalExecutionFlow) {
            PhysicalExecutionFlow physicalExecutionFlow = (PhysicalExecutionFlow) flow;
            if (physicalExecutionFlow.getAction() instanceof SourceAction) {
                SourceConfig sourceConfig = new SourceConfig();
                sourceConfig.setEnumeratorTask(this.enumeratorTaskIDMap.get(physicalExecutionFlow.getAction()));
                physicalExecutionFlow.setConfig(sourceConfig);
            } else if (physicalExecutionFlow.getAction() instanceof SinkAction) {
                SinkConfig sinkConfig = new SinkConfig();
                if (this.committerTaskIDMap.containsKey(physicalExecutionFlow.getAction())) {
                    sinkConfig.setContainCommitter(true);
                    sinkConfig.setCommitterTask(this.committerTaskIDMap.get(physicalExecutionFlow.getAction()));
                }
                physicalExecutionFlow.setConfig(sinkConfig);
            }
        } else {
            if (!(flow instanceof IntermediateExecutionFlow)) {
                throw new UnknownFlowException(flow);
            }
            ((IntermediateExecutionFlow) flow).setConfig(new IntermediateQueueConfig(((IntermediateExecutionFlow) flow).getQueue().getId()));
        }
        if (flow.getNext().isEmpty()) {
            return;
        }
        flow.getNext().forEach(flow2 -> {
            setFlowConfig(flow2, i);
        });
    }

    private static List<Flow> splitSinkFromFlow(Flow flow) {
        List list = (List) flow.getNext().stream().filter(flow2 -> {
            return flow2 instanceof PhysicalExecutionFlow;
        }).map(flow3 -> {
            return (PhysicalExecutionFlow) flow3;
        }).filter(physicalExecutionFlow -> {
            return physicalExecutionFlow.getAction() instanceof SinkAction;
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        flow.getNext().removeAll(list);
        list.forEach(physicalExecutionFlow2 -> {
            IntermediateQueue intermediateQueue = new IntermediateQueue(physicalExecutionFlow2.getAction().getId(), physicalExecutionFlow2.getAction().getName() + "-Queue", physicalExecutionFlow2.getAction().getParallelism());
            flow.getNext().add(new IntermediateExecutionFlow(intermediateQueue));
            IntermediateExecutionFlow intermediateExecutionFlow = new IntermediateExecutionFlow(intermediateQueue);
            intermediateExecutionFlow.getNext().add(physicalExecutionFlow2);
            arrayList.add(intermediateExecutionFlow);
        });
        if (flow.getNext().size() > list.size()) {
            arrayList.addAll((Collection) flow.getNext().stream().flatMap(flow4 -> {
                return splitSinkFromFlow(flow4).stream();
            }).collect(Collectors.toList()));
        }
        return arrayList;
    }

    private static boolean sourceWithSink(PhysicalExecutionFlow<?, ?> physicalExecutionFlow) {
        return (physicalExecutionFlow.getAction() instanceof SinkAction) || ((List) physicalExecutionFlow.getNext().stream().map(flow -> {
            return (PhysicalExecutionFlow) flow;
        }).map(PhysicalPlanGenerator::sourceWithSink).collect(Collectors.toList())).contains(true);
    }

    private long mixIDPrefixAndIndex(long j, int i) {
        return (j * 10000) + i;
    }

    private List<Flow> getNextWrapper(List<ExecutionEdge> list, Action action) {
        List list2 = (List) list.stream().filter(executionEdge -> {
            return executionEdge.getLeftVertex().getAction().equals(action);
        }).map(executionEdge2 -> {
            return executionEdge2.getRightVertex().getAction();
        }).collect(Collectors.toList());
        List<Flow> list3 = (List) list2.stream().filter(action2 -> {
            return (action2 instanceof ShuffleAction) || (action2 instanceof SinkAction);
        }).map(PhysicalExecutionFlow::new).collect(Collectors.toList());
        list3.addAll((Collection) list2.stream().filter(action3 -> {
            return ((action3 instanceof ShuffleAction) || (action3 instanceof SinkAction)) ? false : true;
        }).map(action4 -> {
            return new PhysicalExecutionFlow(action4, getNextWrapper(list, action4));
        }).collect(Collectors.toList()));
        return list3;
    }
}
