/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.chunjun;

import java.io.File;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.chunjun.ChunJunParameters;
import org.apache.dolphinscheduler.spi.enums.Flag;

public class ChunJunTask
extends AbstractTask {
    private static final String CHUNJUN_PATH = "${CHUNJUN_HOME}/bin/start-chunjun";
    private static final String CHUNJUN_DIST_DIR = "${CHUNJUN_HOME}/chunjun-dist";
    private ChunJunParameters chunJunParameters;
    private ShellCommandExecutor shellCommandExecutor;
    private TaskExecutionContext taskExecutionContext;

    public ChunJunTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
        this.shellCommandExecutor = new ShellCommandExecutor(arg_0 -> ((ChunJunTask)this).logHandle(arg_0), taskExecutionContext, this.logger);
    }

    public void init() {
        this.logger.info("chunjun task params {}", (Object)this.taskExecutionContext.getTaskParams());
        this.chunJunParameters = (ChunJunParameters)((Object)JSONUtils.parseObject((String)this.taskExecutionContext.getTaskParams(), ChunJunParameters.class));
        if (!this.chunJunParameters.checkParameters()) {
            throw new RuntimeException("chunjun task params is not valid");
        }
    }

    public void handle(TaskCallBack taskCallBack) throws TaskException {
        try {
            Map paramsMap = this.taskExecutionContext.getPrepareParamsMap();
            String jsonFilePath = this.buildChunJunJsonFile(paramsMap);
            String shellCommandFilePath = this.buildShellCommandFile(jsonFilePath, paramsMap);
            TaskResponse commandExecuteResult = this.shellCommandExecutor.run(shellCommandFilePath);
            this.setExitStatusCode(commandExecuteResult.getExitStatusCode());
            this.setAppIds(String.join((CharSequence)",", Collections.emptySet()));
            this.setProcessId(commandExecuteResult.getProcessId());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.error("The current ChunJun Task has been interrupted", (Throwable)e);
            this.setExitStatusCode(-1);
            throw new TaskException("The current ChunJun Task has been interrupted", (Throwable)e);
        }
        catch (Exception e) {
            this.logger.error("chunjun task failed.", (Throwable)e);
            this.setExitStatusCode(-1);
            throw new TaskException("Execute chunjun task failed", (Throwable)e);
        }
    }

    private String buildChunJunJsonFile(Map<String, Property> paramsMap) throws Exception {
        String fileName = String.format("%s/%s_job.json", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId());
        String json = null;
        Path path = new File(fileName).toPath();
        if (Files.exists(path, new LinkOption[0])) {
            return fileName;
        }
        if (this.chunJunParameters.getCustomConfig() == Flag.YES.ordinal()) {
            json = this.chunJunParameters.getJson().replaceAll("\\r\\n", "\n");
        }
        json = ParameterUtils.convertParameterPlaceholders(json, (Map)ParamUtils.convert(paramsMap));
        this.logger.debug("chunjun job json : {}", (Object)json);
        FileUtils.writeStringToFile((File)new File(fileName), (String)json, (Charset)StandardCharsets.UTF_8);
        return fileName;
    }

    private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
        String fileName = String.format("%s/%s_node.%s", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
        Path path = new File(fileName).toPath();
        if (Files.exists(path, new LinkOption[0])) {
            return fileName;
        }
        ArrayList<String> args = new ArrayList<String>();
        args.add(CHUNJUN_PATH);
        args.add("-mode");
        args.add(this.getExecMode(this.chunJunParameters));
        args.add("-jobType sync");
        args.add("-job");
        args.add(jobConfigFilePath);
        args.add("-chunjunDistDir");
        args.add(CHUNJUN_DIST_DIR);
        if (!"local".equalsIgnoreCase(this.getExecMode(this.chunJunParameters))) {
            args.add("-flinkConfDir");
            args.add("${FLINK_HOME}/conf");
            args.add("-flinkLibDir");
            args.add("${FLINK_HOME}/lib");
            args.add("-hadoopConfDir");
            args.add("${HADOOP_HOME}/etc/hadoop");
        }
        if (this.chunJunParameters.getOthers() != null) {
            args.add(this.chunJunParameters.getOthers());
        }
        String command = String.join((CharSequence)" ", args);
        String chunjunCommand = ParameterUtils.convertParameterPlaceholders((String)command, (Map)ParamUtils.convert(paramsMap));
        this.logger.info("raw script : {}", (Object)chunjunCommand);
        Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
        FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
        if (SystemUtils.IS_OS_WINDOWS) {
            Files.createFile(path, new FileAttribute[0]);
        } else {
            Files.createFile(path, attr);
        }
        Files.write(path, chunjunCommand.getBytes(), StandardOpenOption.APPEND);
        return fileName;
    }

    public String getExecMode(ChunJunParameters chunJunParameters) {
        if (chunJunParameters.getDeployMode() == null) {
            return "local";
        }
        return chunJunParameters.getDeployMode();
    }

    public AbstractParameters getParameters() {
        return this.chunJunParameters;
    }

    public void cancel() throws TaskException {
        try {
            this.shellCommandExecutor.cancelApplication();
        }
        catch (Exception e) {
            throw new TaskException("cancel application error", (Throwable)e);
        }
    }
}

