/*
 * Decompiled with CFR 0.152.
 */
package com.github.kfcfans.powerjob.worker.core.tracker.task;

import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.core.ha.ProcessorTrackerStatusHolder;
import com.github.kfcfans.powerjob.worker.core.tracker.task.CommonTaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.FrequentTaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public abstract class TaskTracker {
    private static final Logger log = LoggerFactory.getLogger(TaskTracker.class);
    protected long createTime = System.currentTimeMillis();
    protected long instanceId;
    protected InstanceInfo instanceInfo;
    protected ProcessorTrackerStatusHolder ptStatusHolder;
    protected TaskPersistenceService taskPersistenceService;
    protected ScheduledExecutorService scheduledPool;
    protected AtomicBoolean finished;
    private Cache<String, Long> taskId2LastReportTime;
    private SegmentLock segmentLock;
    private static final int UPDATE_CONCURRENCY = 4;

    protected TaskTracker(ServerScheduleJobReq req) {
        this.instanceId = req.getInstanceId();
        this.instanceInfo = new InstanceInfo();
        BeanUtils.copyProperties((Object)req, (Object)this.instanceInfo);
        if (this.instanceInfo.getInstanceTimeoutMS() <= 0L) {
            this.instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);
        }
        this.instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf((String)req.getTimeExpressionType()).getV());
        this.instanceInfo.setThreadConcurrency(Math.max(1, this.instanceInfo.getThreadConcurrency()));
        this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress());
        this.taskPersistenceService = TaskPersistenceService.INSTANCE;
        this.finished = new AtomicBoolean(false);
        this.taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024L).build();
        this.segmentLock = new SegmentLock(4);
        this.initTaskTracker(req);
        log.info("[TaskTracker-{}] create TaskTracker successfully.", (Object)this.instanceId);
    }

    public static TaskTracker create(ServerScheduleJobReq req) {
        try {
            TimeExpressionType timeExpressionType = TimeExpressionType.valueOf((String)req.getTimeExpressionType());
            switch (timeExpressionType) {
                case FIX_RATE: 
                case FIX_DELAY: {
                    return new FrequentTaskTracker(req);
                }
            }
            return new CommonTaskTracker(req);
        }
        catch (Exception e) {
            log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", new Object[]{req.getInstanceId(), req, e});
            TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq();
            BeanUtils.copyProperties((Object)req, (Object)response);
            response.setInstanceStatus(InstanceStatus.FAILED.getV());
            response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString()));
            response.setReportTime(System.currentTimeMillis());
            response.setStartTime(System.currentTimeMillis());
            response.setSourceAddress(OhMyWorker.getWorkerAddress());
            String serverPath = AkkaUtils.getAkkaServerPath("server_actor");
            ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
            serverActor.tell((Object)response, null);
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateTaskStatus(String taskId, int newStatus, long reportTime, @Nullable String result) {
        if (this.finished.get()) {
            return;
        }
        TaskStatus nTaskStatus = TaskStatus.of(newStatus);
        int lockId = taskId.hashCode();
        try {
            int failedCnt;
            Optional<TaskDO> taskOpt;
            this.segmentLock.lockInterruptible(lockId);
            Long lastReportTime = (Long)this.taskId2LastReportTime.getIfPresent((Object)taskId);
            if (lastReportTime == null) {
                Optional<TaskDO> taskOpt2 = this.taskPersistenceService.getTask(this.instanceId, taskId);
                if (taskOpt2.isPresent()) {
                    lastReportTime = taskOpt2.get().getLastReportTime();
                } else {
                    log.error("[TaskTracker-{}] can't find task by pkey(instanceId={}&taskId={}).", new Object[]{this.instanceId, this.instanceId, taskId});
                }
                if (lastReportTime == null) {
                    lastReportTime = -1L;
                }
            }
            if (lastReportTime > reportTime) {
                log.warn("[TaskTracker-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.", new Object[]{this.instanceId, lastReportTime, reportTime, taskId, newStatus});
                return;
            }
            this.taskId2LastReportTime.put((Object)taskId, (Object)reportTime);
            int configTaskRetryNum = this.instanceInfo.getTaskRetryNum();
            if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1 && (taskOpt = this.taskPersistenceService.getTask(this.instanceId, taskId)).isPresent() && (failedCnt = taskOpt.get().getFailedCnt().intValue()) < configTaskRetryNum) {
                TaskDO updateEntity = new TaskDO();
                updateEntity.setFailedCnt(failedCnt + 1);
                String taskName = taskOpt.get().getTaskName();
                ExecuteType executeType = ExecuteType.valueOf((String)this.instanceInfo.getExecuteType());
                if (!taskName.equals("OMS_ROOT_TASK") && !taskName.equals("OMS_LAST_TASK") && executeType != ExecuteType.BROADCAST) {
                    updateEntity.setAddress("N/A");
                }
                updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                updateEntity.setLastReportTime(reportTime);
                boolean retryTask = this.taskPersistenceService.updateTask(this.instanceId, taskId, updateEntity);
                if (retryTask) {
                    log.info("[TaskTracker-{}] task(taskId={}) process failed, TaskTracker will have a retry.", (Object)this.instanceId, (Object)taskId);
                    return;
                }
            }
            result = result == null ? "" : result;
            boolean updateResult = this.taskPersistenceService.updateTaskStatus(this.instanceId, taskId, newStatus, reportTime, result);
            if (!updateResult) {
                log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", (Object)this.instanceId, (Object)taskId);
            }
        }
        catch (InterruptedException lastReportTime) {
        }
        catch (Exception e) {
            log.warn("[TaskTracker-{}] update task status failed.", (Object)this.instanceId, (Object)e);
        }
        finally {
            this.segmentLock.unlock(lockId);
        }
    }

    public boolean submitTask(List<TaskDO> newTaskList) {
        if (this.finished.get()) {
            return true;
        }
        if (CollectionUtils.isEmpty(newTaskList)) {
            return true;
        }
        newTaskList.forEach(task -> {
            task.setInstanceId(this.instanceId);
            task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
            task.setFailedCnt(0);
            task.setLastModifiedTime(System.currentTimeMillis());
            task.setCreatedTime(System.currentTimeMillis());
            task.setLastReportTime(-1L);
        });
        log.debug("[TaskTracker-{}] receive new tasks: {}", (Object)this.instanceId, newTaskList);
        return this.taskPersistenceService.batchSave(newTaskList);
    }

    public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
        log.debug("[TaskTracker-{}] receive heartbeat: {}", (Object)this.instanceId, (Object)heartbeatReq);
        this.ptStatusHolder.updateStatus(heartbeatReq);
        if (heartbeatReq.getType() == 1) {
            String idlePtAddress = heartbeatReq.getAddress();
            this.ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
            List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(this.instanceId, idlePtAddress);
            if (!CollectionUtils.isEmpty(unfinishedTask)) {
                log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", new Object[]{this.instanceId, idlePtAddress, unfinishedTask});
                unfinishedTask.forEach(task -> this.updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
            }
        }
    }

    public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
        if (this.finished.get()) {
            return;
        }
        log.info("[TaskTracker-{}] finished broadcast's preProcess.", (Object)this.instanceId);
        if (preExecuteSuccess) {
            List<String> allWorkerAddress = this.ptStatusHolder.getAllProcessorTrackers();
            LinkedList subTaskList = Lists.newLinkedList();
            for (int i = 0; i < allWorkerAddress.size(); ++i) {
                TaskDO subTask = new TaskDO();
                subTask.setSubInstanceId(subInstanceId);
                subTask.setTaskName("OMS_BROADCAST_TASK");
                subTask.setTaskId(preTaskId + "." + i);
                subTaskList.add(subTask);
            }
            this.submitTask(subTaskList);
        } else {
            log.debug("[TaskTracker-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", (Object)this.instanceId, (Object)result);
        }
    }

    public void destroy() {
        this.finished.set(true);
        Stopwatch sw = Stopwatch.createStarted();
        this.scheduledPool.shutdown();
        Long instanceId = this.instanceInfo.getInstanceId();
        TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
        stopRequest.setInstanceId(instanceId);
        this.ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
            String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, "processor_tracker");
            ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath);
            ptActor.tell((Object)stopRequest, null);
        });
        boolean dbSuccess = this.taskPersistenceService.deleteAllTasks(instanceId);
        if (!dbSuccess) {
            log.error("[TaskTracker-{}] delete tasks from database failed.", (Object)instanceId);
        } else {
            log.debug("[TaskTracker-{}] delete all tasks from database successfully.", (Object)instanceId);
        }
        TaskTrackerPool.remove(instanceId);
        log.info("[TaskTracker-{}] TaskTracker has left the world(using {}), bye~", (Object)instanceId, (Object)sw.stop());
        if (!this.scheduledPool.isTerminated()) {
            CommonUtils.executeIgnoreException(() -> this.scheduledPool.shutdownNow());
        }
    }

    protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
        TaskDO updateEntity = new TaskDO();
        updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue());
        updateEntity.setAddress(processorTrackerAddress);
        boolean success = this.taskPersistenceService.updateTask(this.instanceId, task.getTaskId(), updateEntity);
        if (!success) {
            log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", new Object[]{this.instanceId, task.getTaskId(), task.getTaskName()});
            return;
        }
        this.ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true);
        this.taskId2LastReportTime.put((Object)task.getTaskId(), (Object)-1L);
        TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(this.instanceInfo, task);
        String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, "processor_tracker");
        ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath);
        ptActor.tell((Object)startTaskReq, null);
        log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", new Object[]{this.instanceId, task.getTaskId(), task.getTaskName()});
    }

    protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {
        Map<TaskStatus, Long> status2Num = this.taskPersistenceService.getTaskStatusStatistics(this.instanceId, subInstanceId);
        InstanceStatisticsHolder holder = new InstanceStatisticsHolder();
        holder.waitingDispatchNum = status2Num.getOrDefault((Object)TaskStatus.WAITING_DISPATCH, 0L);
        holder.workerUnreceivedNum = status2Num.getOrDefault((Object)TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
        holder.receivedNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_RECEIVED, 0L);
        holder.runningNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_PROCESSING, 0L);
        holder.failedNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_PROCESS_FAILED, 0L);
        holder.succeedNum = status2Num.getOrDefault((Object)TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
        return holder;
    }

    protected abstract void initTaskTracker(ServerScheduleJobReq var1);

    public abstract InstanceDetail fetchRunningStatus();

    protected static class InstanceStatisticsHolder {
        protected long waitingDispatchNum;
        protected long workerUnreceivedNum;
        protected long receivedNum;
        protected long runningNum;
        protected long failedNum;
        protected long succeedNum;

        public long getTotalTaskNum() {
            return this.waitingDispatchNum + this.workerUnreceivedNum + this.receivedNum + this.runningNum + this.failedNum + this.succeedNum;
        }

        public long getWaitingDispatchNum() {
            return this.waitingDispatchNum;
        }

        public long getWorkerUnreceivedNum() {
            return this.workerUnreceivedNum;
        }

        public long getReceivedNum() {
            return this.receivedNum;
        }

        public long getRunningNum() {
            return this.runningNum;
        }

        public long getFailedNum() {
            return this.failedNum;
        }

        public long getSucceedNum() {
            return this.succeedNum;
        }

        public void setWaitingDispatchNum(long waitingDispatchNum) {
            this.waitingDispatchNum = waitingDispatchNum;
        }

        public void setWorkerUnreceivedNum(long workerUnreceivedNum) {
            this.workerUnreceivedNum = workerUnreceivedNum;
        }

        public void setReceivedNum(long receivedNum) {
            this.receivedNum = receivedNum;
        }

        public void setRunningNum(long runningNum) {
            this.runningNum = runningNum;
        }

        public void setFailedNum(long failedNum) {
            this.failedNum = failedNum;
        }

        public void setSucceedNum(long succeedNum) {
            this.succeedNum = succeedNum;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof InstanceStatisticsHolder)) {
                return false;
            }
            InstanceStatisticsHolder other = (InstanceStatisticsHolder)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getWaitingDispatchNum() != other.getWaitingDispatchNum()) {
                return false;
            }
            if (this.getWorkerUnreceivedNum() != other.getWorkerUnreceivedNum()) {
                return false;
            }
            if (this.getReceivedNum() != other.getReceivedNum()) {
                return false;
            }
            if (this.getRunningNum() != other.getRunningNum()) {
                return false;
            }
            if (this.getFailedNum() != other.getFailedNum()) {
                return false;
            }
            return this.getSucceedNum() == other.getSucceedNum();
        }

        protected boolean canEqual(Object other) {
            return other instanceof InstanceStatisticsHolder;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $waitingDispatchNum = this.getWaitingDispatchNum();
            result = result * 59 + (int)($waitingDispatchNum >>> 32 ^ $waitingDispatchNum);
            long $workerUnreceivedNum = this.getWorkerUnreceivedNum();
            result = result * 59 + (int)($workerUnreceivedNum >>> 32 ^ $workerUnreceivedNum);
            long $receivedNum = this.getReceivedNum();
            result = result * 59 + (int)($receivedNum >>> 32 ^ $receivedNum);
            long $runningNum = this.getRunningNum();
            result = result * 59 + (int)($runningNum >>> 32 ^ $runningNum);
            long $failedNum = this.getFailedNum();
            result = result * 59 + (int)($failedNum >>> 32 ^ $failedNum);
            long $succeedNum = this.getSucceedNum();
            result = result * 59 + (int)($succeedNum >>> 32 ^ $succeedNum);
            return result;
        }

        public String toString() {
            return "TaskTracker.InstanceStatisticsHolder(waitingDispatchNum=" + this.getWaitingDispatchNum() + ", workerUnreceivedNum=" + this.getWorkerUnreceivedNum() + ", receivedNum=" + this.getReceivedNum() + ", runningNum=" + this.getRunningNum() + ", failedNum=" + this.getFailedNum() + ", succeedNum=" + this.getSucceedNum() + ")";
        }
    }

    protected class Dispatcher
    implements Runnable {
        private static final int DB_QUERY_LIMIT = 100;

        protected Dispatcher() {
        }

        @Override
        public void run() {
            long currentDispatchNum;
            List<TaskDO> needDispatchTasks;
            if (TaskTracker.this.finished.get()) {
                return;
            }
            Stopwatch stopwatch = Stopwatch.createStarted();
            Long instanceId = TaskTracker.this.instanceInfo.getInstanceId();
            List<String> availablePtIps = TaskTracker.this.ptStatusHolder.getAvailableProcessorTrackers();
            if (availablePtIps.isEmpty()) {
                log.debug("[TaskTracker-{}] no available ProcessorTracker now.", (Object)instanceId);
                return;
            }
            long maxDispatchNum = availablePtIps.size() * TaskTracker.this.instanceInfo.getThreadConcurrency() * 2;
            AtomicInteger index = new AtomicInteger(0);
            for (currentDispatchNum = 0L; maxDispatchNum > currentDispatchNum; currentDispatchNum += (long)needDispatchTasks.size()) {
                int dbQueryLimit = Math.min(100, (int)maxDispatchNum);
                needDispatchTasks = TaskTracker.this.taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
                needDispatchTasks.forEach(task -> {
                    String ptAddress = task.getAddress();
                    if (StringUtils.isEmpty((Object)ptAddress) || "N/A".equals(ptAddress)) {
                        ptAddress = (String)availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
                    }
                    TaskTracker.this.dispatchTask((TaskDO)task, ptAddress);
                });
                if (needDispatchTasks.size() >= dbQueryLimit) continue;
                break;
            }
            log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", new Object[]{instanceId, currentDispatchNum, stopwatch.stop()});
        }
    }
}

