package org.apache.dolphinscheduler.plugin.task.chunjun;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import lombok.Generated;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.class */
public class ChunJunTask extends AbstractTask {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ChunJunTask.class);
    private static final String CHUNJUN_PATH = "${CHUNJUN_HOME}/bin/start-chunjun";
    private static final String CHUNJUN_DIST_DIR = "${CHUNJUN_HOME}/chunjun-dist";
    private ChunJunParameters chunJunParameters;
    private ShellCommandExecutor shellCommandExecutor;
    private TaskExecutionContext taskExecutionContext;

    public ChunJunTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
    }

    public void init() {
        this.chunJunParameters = (ChunJunParameters) JSONUtils.parseObject(this.taskExecutionContext.getTaskParams(), ChunJunParameters.class);
        log.info("Initialize chunjun task params {}", JSONUtils.toPrettyJsonString(this.taskExecutionContext.getTaskParams()));
        if (!this.chunJunParameters.checkParameters()) {
            throw new RuntimeException("chunjun task params is not valid");
        }
    }

    public void handle(TaskCallBack taskCallBack) throws TaskException {
        try {
            Map<String, Property> prepareParamsMap = this.taskExecutionContext.getPrepareParamsMap();
            TaskResponse run = this.shellCommandExecutor.run(ShellInterceptorBuilderFactory.newBuilder().properties(ParameterUtils.convert(prepareParamsMap)).appendScript(buildCommand(buildChunJunJsonFile(prepareParamsMap))), taskCallBack);
            setExitStatusCode(run.getExitStatusCode());
            setAppIds(String.join(",", Collections.emptySet()));
            setProcessId(run.getProcessId());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("The current ChunJun Task has been interrupted", e);
            setExitStatusCode(-1);
            throw new TaskException("The current ChunJun Task has been interrupted", e);
        } catch (Exception e2) {
            log.error("chunjun task failed.", e2);
            setExitStatusCode(-1);
            throw new TaskException("Execute chunjun task failed", e2);
        }
    }

    private String buildChunJunJsonFile(Map<String, Property> map) throws Exception {
        String format = String.format("%s/%s_job.json", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId());
        String str = null;
        if (Files.exists(new File(format).toPath(), new LinkOption[0])) {
            return format;
        }
        if (this.chunJunParameters.getCustomConfig() == Flag.YES.ordinal()) {
            str = this.chunJunParameters.getJson().replaceAll("\\r\\n", "\n");
        }
        String convertParameterPlaceholders = ParameterUtils.convertParameterPlaceholders(str, ParameterUtils.convert(map));
        log.debug("chunjun job json : {}", convertParameterPlaceholders);
        FileUtils.writeStringToFile(new File(format), convertParameterPlaceholders, StandardCharsets.UTF_8);
        return format;
    }

    private String buildCommand(String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(CHUNJUN_PATH);
        arrayList.add("-mode");
        arrayList.add(getExecMode(this.chunJunParameters));
        arrayList.add("-jobType sync");
        arrayList.add("-job");
        arrayList.add(str);
        arrayList.add("-chunjunDistDir");
        arrayList.add(CHUNJUN_DIST_DIR);
        if (!"local".equalsIgnoreCase(getExecMode(this.chunJunParameters))) {
            arrayList.add("-flinkConfDir");
            arrayList.add(ChunJunConstants.FLINK_CONF_DIR);
            arrayList.add("-flinkLibDir");
            arrayList.add(ChunJunConstants.FLINK_LIB_DIR);
            arrayList.add("-hadoopConfDir");
            arrayList.add(ChunJunConstants.HADOOP_CONF_DIR);
        }
        if (this.chunJunParameters.getOthers() != null) {
            arrayList.add(this.chunJunParameters.getOthers());
        }
        return String.join(" ", arrayList);
    }

    public String getExecMode(ChunJunParameters chunJunParameters) {
        return chunJunParameters.getDeployMode() == null ? "local" : chunJunParameters.getDeployMode();
    }

    public AbstractParameters getParameters() {
        return this.chunJunParameters;
    }

    public void cancel() throws TaskException {
        try {
            this.shellCommandExecutor.cancelApplication();
        } catch (Exception e) {
            throw new TaskException("cancel application error", e);
        }
    }
}
