package org.jetlinks.rule.engine.defaults;

import java.util.HashMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.task.ExecutableTaskExecutor;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/DefaultTask.class */
public class DefaultTask implements Task {
    private static final Logger log = LoggerFactory.getLogger(DefaultTask.class);
    private final String workerId;
    private final String schedulerId;
    private final AbstractExecutionContext context;
    private final TaskExecutor executor;
    private long lastStateTime;
    private long startTime;
    private final String id;

    public DefaultTask(String str, String str2, AbstractExecutionContext abstractExecutionContext, TaskExecutor taskExecutor) {
        this.schedulerId = str;
        this.workerId = str2;
        this.context = abstractExecutionContext;
        this.executor = taskExecutor;
        this.id = DigestUtils.md5Hex(str2 + ":" + abstractExecutionContext.getInstanceId() + ":" + abstractExecutionContext.getJob().getNodeId());
        taskExecutor.onStateChanged((state, state2) -> {
            this.lastStateTime = System.currentTimeMillis();
            HashMap hashMap = new HashMap();
            hashMap.put("from", state.name());
            hashMap.put("to", state2.name());
            hashMap.put("taskId", getId());
            hashMap.put("instanceId", abstractExecutionContext.getInstanceId());
            hashMap.put("nodeId", abstractExecutionContext.getJob().getNodeId());
            hashMap.put("timestamp", Long.valueOf(System.currentTimeMillis()));
            Flux.merge(new Publisher[]{abstractExecutionContext.getEventBus().publish(RuleConstants.Topics.state(abstractExecutionContext.getInstanceId(), abstractExecutionContext.getJob().getNodeId()), hashMap), abstractExecutionContext.getEventBus().publish(RuleConstants.Topics.event(abstractExecutionContext.getInstanceId(), abstractExecutionContext.getJob().getNodeId(), state2.name()), abstractExecutionContext.newRuleData(hashMap))}).doOnError(th -> {
                log.error(th.getMessage(), th);
            }).subscribe();
        });
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public String getId() {
        return this.id;
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public String getName() {
        return this.executor.getName();
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public ScheduleJob getJob() {
        return this.context.getJob();
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> setJob(ScheduleJob scheduleJob) {
        return Mono.fromRunnable(() -> {
            ScheduleJob job = this.context.getJob();
            this.context.setJob(scheduleJob);
            try {
                this.executor.validate();
            } catch (Throwable th) {
                this.context.setJob(job);
                throw th;
            }
        });
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> reload() {
        log.debug("reload task[{}]:[{}]", getId(), getJob());
        return ((Mono) Mono.fromRunnable(() -> {
            this.context.reload();
            this.executor.reload();
        }).as(MonoTracer.create(RuleConstants.Trace.reloadNodeSpanName(getJob().getInstanceId(), getJob().getNodeId()), reactiveSpanBuilder -> {
            reactiveSpanBuilder.setAttribute(RuleConstants.Trace.executor, getJob().getExecutor());
        }))).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> start() {
        log.debug("start task[{}]:[{}]", getId(), getJob());
        TaskExecutor taskExecutor = this.executor;
        taskExecutor.getClass();
        return ((Mono) Mono.fromRunnable(taskExecutor::start).doOnSuccess(r5 -> {
            this.startTime = System.currentTimeMillis();
        }).as(MonoTracer.create(RuleConstants.Trace.startNodeSpanName(getJob().getInstanceId(), getJob().getNodeId()), reactiveSpanBuilder -> {
            reactiveSpanBuilder.setAttribute(RuleConstants.Trace.executor, getJob().getExecutor());
        }))).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> pause() {
        log.debug("pause task[{}]:[{}]", getId(), getJob());
        TaskExecutor taskExecutor = this.executor;
        taskExecutor.getClass();
        return Mono.fromRunnable(taskExecutor::pause);
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> shutdown() {
        log.debug("shutdown task[{}]:[{}]", getId(), getJob());
        TaskExecutor taskExecutor = this.executor;
        taskExecutor.getClass();
        Mono fromRunnable = Mono.fromRunnable(taskExecutor::shutdown);
        AbstractExecutionContext abstractExecutionContext = this.context;
        abstractExecutionContext.getClass();
        return ((Mono) fromRunnable.then(Mono.fromRunnable(abstractExecutionContext::doShutdown)).as(MonoTracer.create(RuleConstants.Trace.shutdownNodeSpanName(getJob().getInstanceId(), getJob().getNodeId()), reactiveSpanBuilder -> {
            reactiveSpanBuilder.setAttribute(RuleConstants.Trace.executor, getJob().getExecutor());
        }))).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> execute(RuleData ruleData) {
        log.debug("execute task[{}]:[{}]", getId(), getJob());
        return this.executor instanceof ExecutableTaskExecutor ? TraceHolder.writeContextTo(ruleData, (v0, v1, v2) -> {
            v0.setHeader(v1, v2);
        }).flatMap(ruleData2 -> {
            return ((ExecutableTaskExecutor) this.executor).execute(ruleData2);
        }) : Mono.empty();
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Task.State> getState() {
        return Mono.just(this.executor.getState());
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Void> debug(boolean z) {
        log.debug("set task debug[{}] [{}]:[{}]", new Object[]{Boolean.valueOf(z), getId(), getJob()});
        return Mono.fromRunnable(() -> {
            this.context.setDebug(z);
        });
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Long> getLastStateTime() {
        return Mono.just(Long.valueOf(this.lastStateTime));
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public Mono<Long> getStartTime() {
        return Mono.just(Long.valueOf(this.startTime));
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public String getWorkerId() {
        return this.workerId;
    }

    @Override // org.jetlinks.rule.engine.api.task.Task
    public String getSchedulerId() {
        return this.schedulerId;
    }
}
