package org.apache.dolphinscheduler.server.worker.registry;

import java.time.Duration;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = {"strategy"}, havingValue = "waiting")
@Service
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.class */
public class WorkerWaitingStrategy implements WorkerConnectStrategy {
    private final Logger logger = LoggerFactory.getLogger(WorkerWaitingStrategy.class);

    @Autowired
    private WorkerConfig workerConfig;

    @Autowired
    private RegistryClient registryClient;

    @Autowired
    private WorkerRpcServer workerRpcServer;

    @Autowired
    private WorkerRpcClient workerRpcClient;

    @Autowired
    private MessageRetryRunner messageRetryRunner;

    @Autowired
    private WorkerManagerThread workerManagerThread;

    public void disconnect() {
        try {
            ServerLifeCycleManager.toWaiting();
            clearWorkerResource();
            Duration maxWaitingTime = this.workerConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
            try {
                this.logger.info("Worker disconnect from registry will try to reconnect in {} s", Long.valueOf(maxWaitingTime.getSeconds()));
                this.registryClient.connectUntilTimeout(maxWaitingTime);
            } catch (RegistryException e) {
                throw new ServerLifeCycleException(String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), e);
            }
        } catch (Exception e2) {
            this.logger.error("Disconnect from registry and get an unknown exception, will stop the server", e2);
            this.registryClient.getStoppable().stop("Disconnect from registry and get an unknown exception, will stop the server");
        } catch (RegistryException e3) {
            this.logger.error("Disconnect from registry and waiting to reconnect failed, will stop the server", e3);
            this.registryClient.getStoppable().stop("Disconnect from registry and waiting to reconnect failed, will stop the server");
        } catch (ServerLifeCycleException e4) {
            String format = String.format("Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server", ServerLifeCycleManager.getServerStatus());
            this.logger.error(format, e4);
            this.registryClient.getStoppable().stop(format);
        }
    }

    public void reconnect() {
        if (ServerLifeCycleManager.isRunning()) {
            this.logger.info("no need to reconnect, as the current server status is running");
            return;
        }
        try {
            ServerLifeCycleManager.recoverFromWaiting();
            reStartWorkerResource();
            this.logger.info("Recover from waiting success, the current server status is {}", ServerLifeCycleManager.getServerStatus());
        } catch (Exception e) {
            String format = String.format("Recover from waiting failed, the current server status is %s, will stop the server", ServerLifeCycleManager.getServerStatus());
            this.logger.error(format, e);
            this.registryClient.getStoppable().stop(format);
        }
    }

    public StrategyType getStrategyType() {
        return StrategyType.WAITING;
    }

    private void clearWorkerResource() {
        this.workerRpcServer.close();
        this.logger.warn("Worker server close the RPC server due to lost connection from registry");
        this.workerRpcClient.close();
        this.logger.warn("Worker server close the RPC client due to lost connection from registry");
        this.workerManagerThread.clearTask();
        this.logger.warn("Worker server clear the tasks due to lost connection from registry");
        this.messageRetryRunner.clearMessage();
        this.logger.warn("Worker server clear the retry message due to lost connection from registry");
    }

    private void reStartWorkerResource() {
        this.workerRpcServer.start();
        this.logger.warn("Worker server restart PRC server due to reconnect to registry");
        this.workerRpcClient.start();
        this.logger.warn("Worker server restart PRC client due to reconnect to registry");
    }
}
