/*
 * Decompiled with CFR 0.152.
 */
package com.github.kfcfans.powerjob.worker.core.executor;

import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.google.common.base.Stopwatch;
import java.util.List;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;

public class ProcessorRunnable
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ProcessorRunnable.class);
    private final InstanceInfo instanceInfo;
    private final ActorSelection taskTrackerActor;
    private final TaskDO task;
    private final BasicProcessor processor;
    private final OmsLogger omsLogger;
    private final ClassLoader classLoader;
    private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;

    public void innerRun() throws InterruptedException {
        ProcessResult processResult;
        String taskId = this.task.getTaskId();
        Long instanceId = this.task.getInstanceId();
        log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", new Object[]{instanceId, taskId, this.task.getTaskName()});
        TaskContext taskContext = new TaskContext();
        BeanUtils.copyProperties((Object)this.task, (Object)taskContext);
        taskContext.setJobId(this.instanceInfo.getJobId());
        taskContext.setMaxRetryTimes(this.instanceInfo.getTaskRetryNum());
        taskContext.setCurrentRetryTimes(this.task.getFailedCnt());
        taskContext.setJobParams(this.instanceInfo.getJobParams());
        taskContext.setInstanceParams(this.instanceInfo.getInstanceParams());
        taskContext.setOmsLogger(this.omsLogger);
        if (this.task.getTaskContent() != null && this.task.getTaskContent().length > 0) {
            taskContext.setSubTask(SerializerUtils.deSerialized(this.task.getTaskContent()));
        }
        taskContext.setUserContext(OhMyWorker.getConfig().getUserContext());
        ThreadLocalStore.setTask(this.task);
        this.reportStatus(TaskStatus.WORKER_PROCESSING, null, null);
        ExecuteType executeType = ExecuteType.valueOf((String)this.instanceInfo.getExecuteType());
        if ("OMS_ROOT_TASK".equals(this.task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
            ProcessResult processResult2;
            if (this.processor instanceof BroadcastProcessor) {
                BroadcastProcessor broadcastProcessor = (BroadcastProcessor)this.processor;
                try {
                    processResult2 = broadcastProcessor.preProcess(taskContext);
                }
                catch (Throwable e) {
                    log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", (Object)instanceId, (Object)e);
                    processResult2 = new ProcessResult(false, e.toString());
                }
            } else {
                processResult2 = new ProcessResult(true, "NO_PREPOST_TASK");
            }
            this.reportStatus(processResult2.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, this.suit(processResult2.getMsg()), ProcessorReportTaskStatusReq.BROADCAST);
            return;
        }
        if ("OMS_LAST_TASK".equals(this.task.getTaskName())) {
            ProcessResult processResult3;
            Stopwatch stopwatch = Stopwatch.createStarted();
            log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", (Object)instanceId, (Object)taskId);
            List<TaskResult> taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, this.task.getSubInstanceId());
            try {
                switch (executeType) {
                    case BROADCAST: {
                        if (this.processor instanceof BroadcastProcessor) {
                            BroadcastProcessor broadcastProcessor = (BroadcastProcessor)this.processor;
                            processResult3 = broadcastProcessor.postProcess(taskContext, taskResults);
                            break;
                        }
                        processResult3 = BroadcastProcessor.defaultResult(taskResults);
                        break;
                    }
                    case MAP_REDUCE: {
                        if (this.processor instanceof MapReduceProcessor) {
                            MapReduceProcessor mapReduceProcessor = (MapReduceProcessor)this.processor;
                            processResult3 = mapReduceProcessor.reduce(taskContext, taskResults);
                            break;
                        }
                        processResult3 = new ProcessResult(false, "not implement the MapReduceProcessor");
                        break;
                    }
                    default: {
                        processResult3 = new ProcessResult(false, "IMPOSSIBLE OR BUG");
                        break;
                    }
                }
            }
            catch (Throwable e) {
                processResult3 = new ProcessResult(false, e.toString());
                log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", new Object[]{instanceId, taskId, e});
            }
            TaskStatus status = processResult3.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
            this.reportStatus(status, this.suit(processResult3.getMsg()), null);
            log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", (Object)instanceId, (Object)stopwatch);
            return;
        }
        try {
            processResult = this.processor.process(taskContext);
        }
        catch (Throwable e) {
            log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", new Object[]{instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e});
            processResult = new ProcessResult(false, e.toString());
        }
        this.reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, this.suit(processResult.getMsg()), null);
    }

    private void reportStatus(TaskStatus status, String result, Integer cmd) {
        ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
        req.setInstanceId(this.task.getInstanceId());
        req.setSubInstanceId(this.task.getSubInstanceId());
        req.setTaskId(this.task.getTaskId());
        req.setStatus(status.getValue());
        req.setResult(result);
        req.setReportTime(System.currentTimeMillis());
        req.setCmd(cmd);
        if (TaskStatus.finishedStatus.contains(status.getValue())) {
            boolean success = AkkaUtils.reliableTransmit(this.taskTrackerActor, req);
            if (!success) {
                this.statusReportRetryQueue.add(req);
                log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed.", new Object[]{this.task.getInstanceId(), this.task.getTaskId(), status, result});
            }
        } else {
            this.taskTrackerActor.tell((Object)req, null);
        }
    }

    @Override
    public void run() {
        Thread.currentThread().setContextClassLoader(this.classLoader);
        try {
            this.innerRun();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (Throwable e) {
            this.reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null);
            log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", (Object)this.task.getInstanceId(), (Object)e);
        }
        finally {
            ThreadLocalStore.clear();
        }
    }

    private String suit(String result) {
        if (StringUtils.isEmpty((Object)result)) {
            return "";
        }
        int maxLength = OhMyWorker.getConfig().getMaxResultLength();
        if (result.length() <= maxLength) {
            return result;
        }
        log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.", new Object[]{this.task.getInstanceId(), this.task.getTaskId(), result.length(), maxLength});
        return result.substring(0, maxLength).concat("...");
    }

    public ProcessorRunnable(InstanceInfo instanceInfo, ActorSelection taskTrackerActor, TaskDO task, BasicProcessor processor, OmsLogger omsLogger, ClassLoader classLoader, Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue) {
        this.instanceInfo = instanceInfo;
        this.taskTrackerActor = taskTrackerActor;
        this.task = task;
        this.processor = processor;
        this.omsLogger = omsLogger;
        this.classLoader = classLoader;
        this.statusReportRetryQueue = statusReportRetryQueue;
    }
}

