package org.apache.seatunnel.engine.server.dag.physical;

import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.exception.TaskGroupDeployException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskDeployState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecutingOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.class */
public class PhysicalVertex {
    private static final Logger log = LoggerFactory.getLogger(PhysicalVertex.class);
    private final TaskGroupLocation taskGroupLocation;
    private final String taskFullName;
    private final TaskGroupDefaultImpl taskGroup;
    private final FlakeIdGenerator flakeIdGenerator;
    private final List<Set<URL>> pluginJarsUrls;
    private final List<Set<ConnectorJarIdentifier>> connectorJarIdentifiers;
    private final IMap<Object, Object> runningJobStateIMap;
    private CompletableFuture<TaskExecutionState> taskFuture;
    private final IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private final NodeEngine nodeEngine;
    private JobMaster jobMaster;
    private volatile ExecutionState currExecutionState;
    public volatile boolean isRunning = false;
    private AtomicReference<String> errorByPhysicalVertex = new AtomicReference<>();

    public PhysicalVertex(int i, int i2, @NonNull TaskGroupDefaultImpl taskGroupDefaultImpl, @NonNull FlakeIdGenerator flakeIdGenerator, int i3, int i4, List<Set<URL>> list, List<Set<ConnectorJarIdentifier>> list2, @NonNull JobImmutableInformation jobImmutableInformation, long j, @NonNull NodeEngine nodeEngine, @NonNull IMap iMap, @NonNull IMap iMap2) {
        if (taskGroupDefaultImpl == null) {
            throw new NullPointerException("taskGroup is marked non-null but is null");
        }
        if (flakeIdGenerator == null) {
            throw new NullPointerException("flakeIdGenerator is marked non-null but is null");
        }
        if (jobImmutableInformation == null) {
            throw new NullPointerException("jobImmutableInformation is marked non-null but is null");
        }
        if (nodeEngine == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (iMap == null) {
            throw new NullPointerException("runningJobStateIMap is marked non-null but is null");
        }
        if (iMap2 == null) {
            throw new NullPointerException("runningJobStateTimestampsIMap is marked non-null but is null");
        }
        this.taskGroupLocation = taskGroupDefaultImpl.getTaskGroupLocation();
        this.taskGroup = taskGroupDefaultImpl;
        this.flakeIdGenerator = flakeIdGenerator;
        this.pluginJarsUrls = list;
        this.connectorJarIdentifiers = list2;
        Long[] lArr = new Long[ExecutionState.values().length];
        if (iMap2.get(taskGroupDefaultImpl.getTaskGroupLocation()) == null) {
            lArr[ExecutionState.INITIALIZING.ordinal()] = Long.valueOf(j);
            iMap2.put(taskGroupDefaultImpl.getTaskGroupLocation(), lArr);
        }
        if (iMap.get(this.taskGroupLocation) == null) {
            lArr[ExecutionState.CREATED.ordinal()] = Long.valueOf(System.currentTimeMillis());
            iMap2.put(this.taskGroupLocation, lArr);
            iMap.put(this.taskGroupLocation, ExecutionState.CREATED);
        }
        this.currExecutionState = (ExecutionState) iMap.get(this.taskGroupLocation);
        this.nodeEngine = nodeEngine;
        if (log.isDebugEnabled() || log.isTraceEnabled()) {
            this.taskFullName = String.format("Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]", jobImmutableInformation.getJobConfig().getName(), Long.valueOf(jobImmutableInformation.getJobId()), Integer.valueOf(i3), Integer.valueOf(i4), taskGroupDefaultImpl.getTaskGroupName(), Integer.valueOf(i + 1), Integer.valueOf(i2), this.taskGroupLocation);
        } else {
            this.taskFullName = String.format("Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]", jobImmutableInformation.getJobConfig().getName(), Long.valueOf(jobImmutableInformation.getJobId()), Integer.valueOf(i3), Integer.valueOf(i4), taskGroupDefaultImpl.getTaskGroupName(), Integer.valueOf(i + 1), Integer.valueOf(i2));
        }
        this.taskFuture = new CompletableFuture<>();
        this.runningJobStateIMap = iMap;
        this.runningJobStateTimestampsIMap = iMap2;
    }

    public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
        this.taskFuture = new CompletableFuture<>();
        this.currExecutionState = (ExecutionState) this.runningJobStateIMap.get(this.taskGroupLocation);
        if (this.currExecutionState != null) {
            log.info(String.format("The task %s is in state %s when init state future", this.taskFullName, this.currExecutionState));
        }
        if (ExecutionState.RUNNING.equals(this.currExecutionState)) {
            if (!checkTaskGroupIsExecuting(this.taskGroupLocation)) {
                updateTaskState(ExecutionState.FAILING);
            }
        } else if (ExecutionState.DEPLOYING.equals(this.currExecutionState) && !checkTaskGroupIsExecuting(this.taskGroupLocation)) {
            updateTaskState(ExecutionState.FAILING);
        }
        return new PassiveCompletableFuture<>(this.taskFuture);
    }

    public void restoreExecutionState() {
        startPhysicalVertex();
        stateProcess();
    }

    private boolean checkTaskGroupIsExecuting(TaskGroupLocation taskGroupLocation) {
        SlotProfile ownedSlotProfilesByTaskGroup = getOwnedSlotProfilesByTaskGroup(taskGroupLocation, this.nodeEngine.getHazelcastInstance().getMap("engine_ownedSlotProfilesIMap"));
        if (null == ownedSlotProfilesByTaskGroup) {
            return false;
        }
        Address worker = ownedSlotProfilesByTaskGroup.getWorker();
        if (!((List) this.nodeEngine.getClusterService().getMembers().stream().map((v0) -> {
            return v0.getAddress();
        }).collect(Collectors.toList())).contains(worker)) {
            log.warn("The node:{} running the taskGroup {} no longer exists, return false.", worker.toString(), taskGroupLocation);
            return false;
        }
        try {
            return ((Boolean) this.nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, new CheckTaskGroupIsExecutingOperation(taskGroupLocation), worker).invoke().get()).booleanValue();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("Execution of CheckTaskGroupIsExecutingOperation {} failed, checkTaskGroupIsExecuting return false. ", taskGroupLocation, e);
            return false;
        }
    }

    private SlotProfile getOwnedSlotProfilesByTaskGroup(TaskGroupLocation taskGroupLocation, IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> iMap) {
        try {
            return (SlotProfile) ((Map) iMap.get(taskGroupLocation.getPipelineLocation())).get(taskGroupLocation);
        } catch (NullPointerException e) {
            return null;
        }
    }

    private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) throws Exception {
        if (slotProfile == null) {
            throw new NullPointerException("slotProfile is marked non-null but is null");
        }
        return deployInternal(taskGroupImmutableInformation -> {
            return ((SeaTunnelServer) this.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME)).getSlotService().getSlotContext(slotProfile).getTaskExecutionService().deployTask(taskGroupImmutableInformation);
        });
    }

    private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) {
        if (slotProfile == null) {
            throw new NullPointerException("slotProfile is marked non-null but is null");
        }
        return deployInternal(taskGroupImmutableInformation -> {
            try {
                return (TaskDeployState) NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, new DeployTaskOperation(slotProfile, this.nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)), slotProfile.getWorker()).get();
            } catch (Exception e) {
                if (!getExecutionState().isEndState()) {
                    return TaskDeployState.failed(e);
                }
                log.warn(ExceptionUtils.getMessage(e));
                log.warn(String.format("%s deploy error, but the state is already in end state %s, skip this error", getTaskFullName(), this.currExecutionState));
                return TaskDeployState.success();
            }
        });
    }

    public void makeTaskGroupDeploy() {
        updateTaskState(ExecutionState.DEPLOYING);
    }

    public TaskDeployState deploy(@NonNull SlotProfile slotProfile) {
        if (slotProfile == null) {
            throw new NullPointerException("slotProfile is marked non-null but is null");
        }
        try {
            return slotProfile.getWorker().equals(this.nodeEngine.getThisAddress()) ? deployOnLocal(slotProfile) : deployOnRemote(slotProfile);
        } catch (Throwable th) {
            return TaskDeployState.failed(th);
        }
    }

    private TaskDeployState deployInternal(Function<TaskGroupImmutableInformation, TaskDeployState> function) {
        TaskDeployState apply = function.apply(getTaskGroupImmutableInformation());
        updateTaskState(ExecutionState.RUNNING);
        return apply;
    }

    @VisibleForTesting
    public TaskGroupImmutableInformation getTaskGroupImmutableInformation() {
        return new TaskGroupImmutableInformation(this.taskGroup.getTaskGroupLocation().getJobId(), this.flakeIdGenerator.newId(), this.taskGroup.getTaskGroupType(), this.taskGroup.getTaskGroupLocation(), this.taskGroup.getTaskGroupName(), (List) this.taskGroup.getTasks().stream().map(task -> {
            return this.nodeEngine.getSerializationService().toData(task);
        }).collect(Collectors.toList()), this.pluginJarsUrls, this.connectorJarIdentifiers);
    }

    @VisibleForTesting
    public TaskGroup getTaskGroup() {
        return this.taskGroup;
    }

    public synchronized void updateTaskState(@NonNull ExecutionState executionState) {
        if (executionState == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        try {
            ExecutionState executionState2 = (ExecutionState) this.runningJobStateIMap.get(this.taskGroupLocation);
            log.debug(String.format("Try to update the task %s state from %s to %s", this.taskFullName, executionState2, executionState));
            if (executionState2.equals(executionState)) {
                log.info("{} current state equals target state: {}, skip", this.taskFullName, executionState);
                return;
            }
            if (executionState2.isEndState()) {
                log.error("Task is trying to leave terminal state " + executionState2);
                return;
            }
            RetryUtils.retryWithException(() -> {
                updateStateTimestamps(executionState);
                this.runningJobStateIMap.set(this.taskGroupLocation, executionState);
                return null;
            }, new RetryUtils.RetryMaterial(30, true, (v0) -> {
                return ExceptionUtil.isOperationNeedRetryException(v0);
            }, 2000L));
            this.currExecutionState = executionState;
            log.info(String.format("%s turned from state %s to %s.", this.taskFullName, executionState2, executionState));
            stateProcess();
        } catch (Exception e) {
            log.error(ExceptionUtils.getMessage(e));
            if (executionState.equals(ExecutionState.FAILING)) {
                return;
            }
            makeTaskGroupFailing(e);
        }
    }

    public synchronized void cancel() {
        if (getExecutionState().isEndState()) {
            return;
        }
        updateTaskState(ExecutionState.CANCELING);
    }

    private void noticeTaskExecutionServiceCancel() {
        if (!checkTaskGroupIsExecuting(this.taskGroupLocation)) {
            updateTaskState(ExecutionState.CANCELED);
            return;
        }
        int i = 0;
        while (!this.taskFuture.isDone()) {
            ClusterService clusterService = this.nodeEngine.getClusterService();
            Address currentExecutionAddress = getCurrentExecutionAddress();
            if (clusterService.getMember(currentExecutionAddress) == null) {
                return;
            }
            try {
                i++;
                log.info(String.format("Send cancel %s operator to member %s", this.taskFullName, currentExecutionAddress));
                this.nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, new CancelTaskOperation(this.taskGroupLocation), currentExecutionAddress).invoke().get();
                return;
            } catch (Exception e) {
                log.warn(String.format("%s cancel failed with Exception: %s, retry %s", getTaskFullName(), ExceptionUtils.getMessage(e), Integer.valueOf(i)));
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
    }

    private void updateStateTimestamps(@NonNull ExecutionState executionState) {
        if (executionState == null) {
            throw new NullPointerException("targetState is marked non-null but is null");
        }
        Long[] lArr = (Long[]) this.runningJobStateTimestampsIMap.get(this.taskGroupLocation);
        lArr[executionState.ordinal()] = Long.valueOf(System.currentTimeMillis());
        this.runningJobStateTimestampsIMap.set(this.taskGroupLocation, lArr);
    }

    public ExecutionState getExecutionState() {
        return this.currExecutionState;
    }

    private void resetExecutionState() {
        synchronized (this) {
            ExecutionState executionState = getExecutionState();
            if (!executionState.isEndState()) {
                String format = String.format("%s reset state failed, only end state can be reset, current is %s", getTaskFullName(), executionState);
                log.error(format);
                throw new IllegalStateException(format);
            }
            try {
                RetryUtils.retryWithException(() -> {
                    updateStateTimestamps(ExecutionState.CREATED);
                    this.runningJobStateIMap.set(this.taskGroupLocation, ExecutionState.CREATED);
                    this.errorByPhysicalVertex = new AtomicReference<>();
                    return null;
                }, new RetryUtils.RetryMaterial(30, true, (v0) -> {
                    return ExceptionUtil.isOperationNeedRetryException(v0);
                }, 2000L));
            } catch (Exception e) {
                log.warn(ExceptionUtils.getMessage(e));
                log.warn(String.format("Set %s state %s to Imap failed, skip.", getTaskFullName(), ExecutionState.CREATED));
            }
            this.currExecutionState = ExecutionState.CREATED;
            log.info(String.format("%s turn to state %s.", this.taskFullName, ExecutionState.CREATED));
        }
    }

    public void reset() {
        resetExecutionState();
    }

    public String getTaskFullName() {
        return this.taskFullName;
    }

    public void updateStateByExecutionService(TaskExecutionState taskExecutionState) {
        if (!taskExecutionState.getExecutionState().isEndState()) {
            throw new SeaTunnelEngineException(String.format("The state must be end state from ExecutionService, can not be %s", taskExecutionState.getExecutionState()));
        }
        this.errorByPhysicalVertex.compareAndSet(null, taskExecutionState.getThrowableMsg());
        updateTaskState(taskExecutionState.getExecutionState());
    }

    public Address getCurrentExecutionAddress() {
        SlotProfile ownedSlotProfiles = this.jobMaster.getOwnedSlotProfiles(this.taskGroupLocation);
        if (ownedSlotProfiles == null) {
            return null;
        }
        return ownedSlotProfiles.getWorker();
    }

    public TaskGroupLocation getTaskGroupLocation() {
        return this.taskGroupLocation;
    }

    public void setJobMaster(JobMaster jobMaster) {
        this.jobMaster = jobMaster;
    }

    public void startPhysicalVertex() {
        this.isRunning = true;
        log.info(String.format("%s state process is start", this.taskFullName));
    }

    public void stopPhysicalVertex() {
        this.isRunning = false;
        log.info(String.format("%s state process is stopped", this.taskFullName));
    }

    public synchronized void stateProcess() {
        if (!this.isRunning) {
            log.warn(String.format("%s state process is not start", this.taskFullName));
            return;
        }
        switch (getExecutionState()) {
            case INITIALIZING:
            case CREATED:
            case RUNNING:
                return;
            case DEPLOYING:
                TaskDeployState deploy = deploy(this.jobMaster.getOwnedSlotProfiles(this.taskGroupLocation));
                if (deploy.isSuccess()) {
                    updateTaskState(ExecutionState.RUNNING);
                    return;
                } else {
                    makeTaskGroupFailing(new TaskGroupDeployException(deploy.getThrowableMsg()));
                    return;
                }
            case FAILING:
                updateTaskState(ExecutionState.FAILED);
                return;
            case CANCELING:
                noticeTaskExecutionServiceCancel();
                return;
            case CANCELED:
                stopPhysicalVertex();
                this.taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.CANCELED, this.errorByPhysicalVertex.get()));
                return;
            case FAILED:
                stopPhysicalVertex();
                log.error(String.format("%s end with state %s and Exception: %s", this.taskFullName, ExecutionState.FAILED, this.errorByPhysicalVertex.get()));
                this.taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.FAILED, this.errorByPhysicalVertex.get()));
                return;
            case FINISHED:
                stopPhysicalVertex();
                this.taskFuture.complete(new TaskExecutionState(this.taskGroupLocation, ExecutionState.FINISHED, this.errorByPhysicalVertex.get()));
                return;
            default:
                throw new IllegalArgumentException("Unknown TaskGroup State: " + getExecutionState());
        }
    }

    public void makeTaskGroupFailing(Throwable th) {
        this.errorByPhysicalVertex.compareAndSet(null, ExceptionUtils.getMessage(th));
        updateTaskState(ExecutionState.FAILING);
    }
}
