/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ForwardGroupComputeUtilTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ForwardGroupComputeUtilTest() {
    }

    @Test
    void testIsolatedVertices() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, 0, new Integer[0]);
    }

    @Test
    void testVariousResultPartitionTypesBetweenVertices() throws Exception {
        this.testThreeVerticesConnectSequentially(false, true, 1, 2);
        this.testThreeVerticesConnectSequentially(false, false, 0, new Integer[0]);
        this.testThreeVerticesConnectSequentially(true, true, 1, 3);
    }

    private void testThreeVerticesConnectSequentially(boolean isForward1, boolean isForward2, int numOfGroups, Integer ... groupSizes) throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        if (isForward1) {
            ((JobEdge)((IntermediateDataSet)v1.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        }
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        if (isForward2) {
            ((JobEdge)((IntermediateDataSet)v2.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        }
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, numOfGroups, groupSizes);
    }

    @Test
    void testTwoInputsMergesIntoOne() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ((JobEdge)((IntermediateDataSet)v1.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ((JobEdge)((IntermediateDataSet)v2.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3, v4);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, 1, 3);
    }

    @Test
    void testOneInputSplitsIntoTwo() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ((JobEdge)((IntermediateDataSet)v2.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        ((JobEdge)((IntermediateDataSet)v2.getProducedDataSets().get(1)).getConsumers().get(0)).setForward(true);
        Set<ForwardGroup> groups = ForwardGroupComputeUtilTest.computeForwardGroups(v1, v2, v3, v4);
        ForwardGroupComputeUtilTest.checkGroupSize(groups, 1, 3);
    }

    private static Set<ForwardGroup> computeForwardGroups(JobVertex ... vertices) throws Exception {
        Arrays.asList(vertices).forEach(vertex -> vertex.setInvokableClass(NoOpInvokable.class));
        DefaultExecutionGraph executionGraph = ForwardGroupComputeUtilTest.createDynamicGraph(vertices);
        return new HashSet<ForwardGroup>(ForwardGroupComputeUtil.computeForwardGroups(Arrays.asList(vertices), arg_0 -> ((ExecutionGraph)executionGraph).getJobVertex(arg_0)).values());
    }

    private static void checkGroupSize(Set<ForwardGroup> groups, int numOfGroups, Integer ... sizes) {
        Assertions.assertThat((int)groups.size()).isEqualTo(numOfGroups);
        Assertions.assertThat(groups.stream().map(ForwardGroup::size).collect(Collectors.toList())).contains((Object[])sizes);
    }

    private static DefaultExecutionGraph createDynamicGraph(JobVertex ... vertices) throws Exception {
        TestingDefaultExecutionGraphBuilder builder = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobID(), "TestJob", vertices)).setVertexParallelismStore(AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(Arrays.asList(vertices), (int)10));
        return builder.buildDynamicGraph((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }
}

