package azkaban.execapp;

import azkaban.execapp.event.BlockingStatus;
import azkaban.execapp.event.Event;
import azkaban.execapp.event.EventHandler;
import azkaban.execapp.event.FlowWatcher;
import azkaban.executor.ExecutableFlowBase;
import azkaban.executor.ExecutableNode;
import azkaban.executor.ExecutorLoader;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.AbstractProcessJob;
import azkaban.jobExecutor.Job;
import azkaban.jobtype.JobTypeManager;
import azkaban.jobtype.JobTypeManagerException;
import azkaban.utils.Props;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Appender;
import org.apache.log4j.EnhancedPatternLayout;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.RollingFileAppender;

/* loaded from: input_file:azkaban/execapp/JobRunner.class */
public class JobRunner extends EventHandler implements Runnable {
    private ExecutorLoader loader;
    private Props props;
    private ExecutableNode node;
    private File workingDir;
    private Appender jobAppender;
    private File logFile;
    private String attachmentFileName;
    private Job job;
    private int executionId;
    private String jobId;
    private static final Object logCreatorLock = new Object();
    private final JobTypeManager jobtypeManager;
    private String jobLogChunkSize;
    private int jobLogBackupIndex;
    private final Layout DEFAULT_LAYOUT = new EnhancedPatternLayout("%d{dd-MM-yyyy HH:mm:ss z} %c{1} %p - %m\n");
    private Logger logger = null;
    private Layout loggerLayout = this.DEFAULT_LAYOUT;
    private Logger flowLogger = null;
    private Object syncObject = new Object();
    private Integer pipelineLevel = null;
    private FlowWatcher watcher = null;
    private Set<String> pipelineJobs = new HashSet();
    private Set<String> proxyUsers = null;
    private long delayStartMs = 0;
    private boolean killed = false;
    private BlockingStatus currentBlockStatus = null;

    public JobRunner(ExecutableNode executableNode, File file, ExecutorLoader executorLoader, JobTypeManager jobTypeManager) {
        this.executionId = -1;
        this.props = executableNode.getInputProps();
        this.node = executableNode;
        this.workingDir = file;
        this.executionId = executableNode.getParentFlow().getExecutionId();
        this.jobId = executableNode.getId();
        this.loader = executorLoader;
        this.jobtypeManager = jobTypeManager;
    }

    public void setValidatedProxyUsers(Set<String> set) {
        this.proxyUsers = set;
    }

    public void setLogSettings(Logger logger, String str, int i) {
        this.flowLogger = logger;
        this.jobLogChunkSize = str;
        this.jobLogBackupIndex = i;
    }

    public Props getProps() {
        return this.props;
    }

    public void setPipeline(FlowWatcher flowWatcher, int i) {
        this.watcher = flowWatcher;
        this.pipelineLevel = Integer.valueOf(i);
        if (this.pipelineLevel.intValue() == 1) {
            this.pipelineJobs.add(this.node.getNestedId());
            return;
        }
        if (this.pipelineLevel.intValue() == 2) {
            this.pipelineJobs.add(this.node.getNestedId());
            ExecutableFlowBase parentFlow = this.node.getParentFlow();
            if (!parentFlow.getEndNodes().contains(this.node.getId())) {
                Iterator<String> it = this.node.getOutNodes().iterator();
                while (it.hasNext()) {
                    ExecutableNode executableNode = parentFlow.getExecutableNode(it.next());
                    if (executableNode instanceof ExecutableFlowBase) {
                        findAllStartingNodes((ExecutableFlowBase) executableNode, this.pipelineJobs);
                    } else {
                        this.pipelineJobs.add(executableNode.getNestedId());
                    }
                }
                return;
            }
            if (parentFlow.getOutNodes().isEmpty()) {
                return;
            }
            ExecutableFlowBase parentFlow2 = parentFlow.getParentFlow();
            Iterator<String> it2 = parentFlow.getOutNodes().iterator();
            while (it2.hasNext()) {
                ExecutableNode executableNode2 = parentFlow2.getExecutableNode(it2.next());
                if (executableNode2 instanceof ExecutableFlowBase) {
                    findAllStartingNodes((ExecutableFlowBase) executableNode2, this.pipelineJobs);
                } else {
                    this.pipelineJobs.add(executableNode2.getNestedId());
                }
            }
        }
    }

    private void findAllStartingNodes(ExecutableFlowBase executableFlowBase, Set<String> set) {
        Iterator<String> it = executableFlowBase.getStartNodes().iterator();
        while (it.hasNext()) {
            ExecutableNode executableNode = executableFlowBase.getExecutableNode(it.next());
            if (executableNode instanceof ExecutableFlowBase) {
                findAllStartingNodes((ExecutableFlowBase) executableNode, set);
            } else {
                set.add(executableNode.getNestedId());
            }
        }
    }

    public Set<String> getPipelineWatchedJobs() {
        return this.pipelineJobs;
    }

    public void setDelayStart(long j) {
        this.delayStartMs = j;
    }

    public long getDelayStart() {
        return this.delayStartMs;
    }

    public ExecutableNode getNode() {
        return this.node;
    }

    public String getLogFilePath() {
        if (this.logFile == null) {
            return null;
        }
        return this.logFile.getPath();
    }

    private void createLogger() {
        synchronized (logCreatorLock) {
            this.logger = Logger.getLogger(System.currentTimeMillis() + "." + this.executionId + "." + this.jobId);
            this.logFile = new File(this.workingDir, createLogFileName(this.node));
            String absolutePath = this.logFile.getAbsolutePath();
            this.jobAppender = null;
            try {
                RollingFileAppender rollingFileAppender = new RollingFileAppender(this.loggerLayout, absolutePath, true);
                rollingFileAppender.setMaxBackupIndex(this.jobLogBackupIndex);
                rollingFileAppender.setMaxFileSize(this.jobLogChunkSize);
                this.jobAppender = rollingFileAppender;
                this.logger.addAppender(this.jobAppender);
                this.logger.setAdditivity(false);
            } catch (IOException e) {
                this.flowLogger.error("Could not open log file in " + this.workingDir + " for job " + this.jobId, e);
            }
        }
    }

    private void createAttachmentFile() {
        this.attachmentFileName = new File(this.workingDir, createAttachmentFileName(this.node)).getAbsolutePath();
    }

    private void closeLogger() {
        if (this.jobAppender != null) {
            this.logger.removeAppender(this.jobAppender);
            this.jobAppender.close();
        }
    }

    private void writeStatus() {
        try {
            this.node.setUpdateTime(System.currentTimeMillis());
            this.loader.updateExecutableNode(this.node);
        } catch (ExecutorManagerException e) {
            this.flowLogger.error("Could not update job properties in db for " + this.jobId, e);
        }
    }

    private boolean handleNonReadyStatus() {
        Status status = this.node.getStatus();
        boolean z = false;
        long currentTimeMillis = System.currentTimeMillis();
        if (Status.isStatusFinished(status)) {
            z = true;
        } else if (status == Status.DISABLED) {
            changeStatus(Status.SKIPPED, currentTimeMillis);
            z = true;
        } else if (isKilled()) {
            changeStatus(Status.KILLED, currentTimeMillis);
            z = true;
        }
        if (!z) {
            return false;
        }
        this.node.setStartTime(currentTimeMillis);
        fireEvent(Event.create(this, Event.Type.JOB_STARTED, null, false));
        this.node.setEndTime(currentTimeMillis);
        fireEvent(Event.create(this, Event.Type.JOB_FINISHED));
        return true;
    }

    private boolean blockOnPipeLine() {
        if (isKilled()) {
            return true;
        }
        if (!this.pipelineJobs.isEmpty()) {
            String str = "";
            ArrayList arrayList = new ArrayList();
            for (String str2 : this.pipelineJobs) {
                Status peekStatus = this.watcher.peekStatus(str2);
                if (peekStatus != null && !Status.isStatusFinished(peekStatus)) {
                    arrayList.add(this.watcher.getBlockingStatus(str2));
                    str = str + str2 + ",";
                }
            }
            if (!arrayList.isEmpty()) {
                this.logger.info("Pipeline job " + this.jobId + " waiting on " + str + " in execution " + this.watcher.getExecId());
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    BlockingStatus blockingStatus = (BlockingStatus) it.next();
                    this.logger.info("Waiting on pipelined job " + blockingStatus.getJobId());
                    this.currentBlockStatus = blockingStatus;
                    blockingStatus.blockOnFinishedStatus();
                    if (isKilled()) {
                        this.logger.info("Job was killed while waiting on pipeline. Quiting.");
                        return true;
                    }
                    this.logger.info("Pipelined job " + blockingStatus.getJobId() + " finished.");
                }
            }
        }
        this.currentBlockStatus = null;
        return false;
    }

    private boolean delayExecution() {
        if (isKilled()) {
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.delayStartMs <= 0) {
            return false;
        }
        this.logger.info("Delaying start of execution for " + this.delayStartMs + " milliseconds.");
        synchronized (this) {
            try {
                wait(this.delayStartMs);
                this.logger.info("Execution has been delayed for " + this.delayStartMs + " ms. Continuing with execution.");
            } catch (InterruptedException e) {
                this.logger.error("Job " + this.jobId + " was to be delayed for " + this.delayStartMs + ". Interrupted after " + (System.currentTimeMillis() - currentTimeMillis));
            }
        }
        if (!isKilled()) {
            return false;
        }
        this.logger.info("Job was killed while in delay. Quiting.");
        return true;
    }

    private void finalizeLogFile() {
        closeLogger();
        if (this.logFile == null) {
            this.flowLogger.info("Log file for job " + this.jobId + " is null");
            return;
        }
        try {
            File[] listFiles = this.logFile.getParentFile().listFiles(new FilenameFilter() { // from class: azkaban.execapp.JobRunner.1
                @Override // java.io.FilenameFilter
                public boolean accept(File file, String str) {
                    return str.startsWith(JobRunner.this.logFile.getName());
                }
            });
            Arrays.sort(listFiles, Collections.reverseOrder());
            this.loader.uploadLogFile(this.executionId, this.node.getNestedId(), this.node.getAttempt(), listFiles);
        } catch (ExecutorManagerException e) {
            this.flowLogger.error("Error writing out logs for job " + this.node.getNestedId(), e);
        }
    }

    private void finalizeAttachmentFile() {
        if (this.attachmentFileName == null) {
            this.flowLogger.info("Attachment file for job " + this.jobId + " is null");
            return;
        }
        try {
            File file = new File(this.attachmentFileName);
            if (file.exists()) {
                this.loader.uploadAttachmentFile(this.node, file);
            } else {
                this.flowLogger.info("No attachment file for job " + this.jobId + " written.");
            }
        } catch (ExecutorManagerException e) {
            this.flowLogger.error("Error writing out attachment for job " + this.node.getNestedId(), e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName("JobRunner-" + this.jobId + "-" + this.executionId);
        if (handleNonReadyStatus()) {
            return;
        }
        createAttachmentFile();
        createLogger();
        boolean delayExecution = false | delayExecution() | blockOnPipeLine();
        this.node.setStartTime(System.currentTimeMillis());
        if (!delayExecution && !isKilled()) {
            fireEvent(Event.create(this, Event.Type.JOB_STARTED, null, false));
            try {
                this.loader.uploadExecutableNode(this.node, this.props);
            } catch (ExecutorManagerException e) {
                this.logger.error("Error writing initial node properties");
            }
            if (prepareJob()) {
                writeStatus();
                fireEvent(Event.create(this, Event.Type.JOB_STATUS_CHANGED), false);
                runJob();
            } else {
                changeStatus(Status.FAILED);
                logError("Job run failed preparing the job.");
            }
        }
        this.node.setEndTime(System.currentTimeMillis());
        if (isKilled()) {
            changeStatus(Status.KILLED);
        }
        logInfo("Finishing job " + this.jobId + " at " + this.node.getEndTime() + " with status " + this.node.getStatus());
        fireEvent(Event.create(this, Event.Type.JOB_FINISHED), false);
        finalizeLogFile();
        finalizeAttachmentFile();
        writeStatus();
    }

    private boolean prepareJob() throws RuntimeException {
        if (this.props == null || isKilled()) {
            logError("Failing job. The job properties don't exist");
            return false;
        }
        synchronized (this.syncObject) {
            if (this.node.getStatus() == Status.FAILED || isKilled()) {
                return false;
            }
            if (this.node.getAttempt() > 0) {
                logInfo("Starting job " + this.jobId + " attempt " + this.node.getAttempt() + " at " + this.node.getStartTime());
            } else {
                logInfo("Starting job " + this.jobId + " at " + this.node.getStartTime());
            }
            if (this.node.getExecutableFlow() != this.node.getParentFlow()) {
                this.props.put(CommonJobProperties.NESTED_FLOW_PATH, this.node.getPrintableId(":"));
            }
            this.props.put(CommonJobProperties.JOB_ATTEMPT, Integer.valueOf(this.node.getAttempt()));
            this.props.put(CommonJobProperties.JOB_METADATA_FILE, createMetaDataFileName(this.node));
            this.props.put(CommonJobProperties.JOB_ATTACHMENT_FILE, this.attachmentFileName);
            changeStatus(Status.RUNNING);
            if (!this.props.containsKey(AbstractProcessJob.WORKING_DIR)) {
                this.props.put(AbstractProcessJob.WORKING_DIR, this.workingDir.getAbsolutePath());
            }
            if (this.props.containsKey("user.to.proxy")) {
                String string = this.props.getString("user.to.proxy");
                if (this.proxyUsers != null && !this.proxyUsers.contains(string)) {
                    this.logger.error("User " + string + " has no permission to execute this job " + this.jobId + "!");
                    return false;
                }
            }
            try {
                this.job = this.jobtypeManager.buildJobExecutor(this.jobId, this.props, this.logger);
                return true;
            } catch (JobTypeManagerException e) {
                this.logger.error("Failed to build job type");
                return false;
            }
        }
    }

    private void runJob() {
        try {
            this.job.run();
        } catch (Exception e) {
            e.printStackTrace();
            if (this.props.getBoolean("job.succeed.on.failure", false)) {
                changeStatus(Status.FAILED_SUCCEEDED);
                logError("Job run failed, but will treat it like success.");
                logError(e.getMessage() + e.getCause());
            } else {
                changeStatus(Status.FAILED);
                logError("Job run failed!");
                logError(e.getMessage() + e.getCause());
            }
        }
        if (this.job != null) {
            this.node.setOutputProps(this.job.getJobGeneratedProperties());
        }
        if (Status.isStatusFinished(this.node.getStatus())) {
            return;
        }
        changeStatus(Status.SUCCEEDED);
    }

    private void changeStatus(Status status) {
        changeStatus(status, System.currentTimeMillis());
    }

    private void changeStatus(Status status, long j) {
        this.node.setStatus(status);
        this.node.setUpdateTime(j);
    }

    private void fireEvent(Event event) {
        fireEvent(event, true);
    }

    private void fireEvent(Event event, boolean z) {
        if (z) {
            this.node.setUpdateTime(System.currentTimeMillis());
        }
        fireEventListeners(event);
    }

    public void kill() {
        synchronized (this.syncObject) {
            if (Status.isStatusFinished(this.node.getStatus())) {
                return;
            }
            logError("Kill has been called.");
            this.killed = true;
            BlockingStatus blockingStatus = this.currentBlockStatus;
            if (blockingStatus != null) {
                blockingStatus.unblock();
            }
            if (this.job == null) {
                logError("Job hasn't started yet.");
                synchronized (this) {
                    notify();
                }
                return;
            } else {
                try {
                    this.job.cancel();
                } catch (Exception e) {
                    logError(e.getMessage());
                    logError("Failed trying to cancel job. Maybe it hasn't started running yet or just finished.");
                }
                changeStatus(Status.KILLED);
                return;
            }
        }
    }

    public boolean isKilled() {
        return this.killed;
    }

    public Status getStatus() {
        return this.node.getStatus();
    }

    private void logError(String str) {
        if (this.logger != null) {
            this.logger.error(str);
        }
    }

    private void logInfo(String str) {
        if (this.logger != null) {
            this.logger.info(str);
        }
    }

    public File getLogFile() {
        return this.logFile;
    }

    public static String createLogFileName(ExecutableNode executableNode, int i) {
        int executionId = executableNode.getExecutableFlow().getExecutionId();
        String id = executableNode.getId();
        if (executableNode.getExecutableFlow() != executableNode.getParentFlow()) {
            id = executableNode.getPrintableId("._.");
        }
        return i > 0 ? "_job." + executionId + "." + i + "." + id + ".log" : "_job." + executionId + "." + id + ".log";
    }

    public static String createLogFileName(ExecutableNode executableNode) {
        return createLogFileName(executableNode, executableNode.getAttempt());
    }

    public static String createMetaDataFileName(ExecutableNode executableNode, int i) {
        int executionId = executableNode.getExecutableFlow().getExecutionId();
        String id = executableNode.getId();
        if (executableNode.getExecutableFlow() != executableNode.getParentFlow()) {
            id = executableNode.getPrintableId("._.");
        }
        return i > 0 ? "_job." + executionId + "." + i + "." + id + ".meta" : "_job." + executionId + "." + id + ".meta";
    }

    public static String createMetaDataFileName(ExecutableNode executableNode) {
        return createMetaDataFileName(executableNode, executableNode.getAttempt());
    }

    public static String createAttachmentFileName(ExecutableNode executableNode) {
        return createAttachmentFileName(executableNode, executableNode.getAttempt());
    }

    public static String createAttachmentFileName(ExecutableNode executableNode, int i) {
        int executionId = executableNode.getExecutableFlow().getExecutionId();
        String id = executableNode.getId();
        if (executableNode.getExecutableFlow() != executableNode.getParentFlow()) {
            id = executableNode.getPrintableId("._.");
        }
        return i > 0 ? "_job." + executionId + "." + i + "." + id + ".attach" : "_job." + executionId + "." + id + ".attach";
    }
}
