package org.jetlinks.rule.engine.defaults;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.api.worker.WorkerSelector;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/LocalScheduler.class */
public class LocalScheduler implements Scheduler, Disposable {
    private final String id;
    static final WorkerSelector defaultSelector = (flux, scheduleJob) -> {
        return flux.take(1L);
    };
    private WorkerSelector workerSelector = defaultSelector;
    private final Map<String, Worker> workers = new ConcurrentHashMap();
    private final Map<String, Map<String, List<Task>>> executors = new ConcurrentHashMap();
    private final Map<String, Task> tasks = new ConcurrentHashMap();

    public LocalScheduler(String str) {
        this.id = str;
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Flux<Worker> getWorkers() {
        return Flux.fromIterable(this.workers.values());
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Mono<Worker> getWorker(String str) {
        return Mono.justOrEmpty(this.workers.get(str));
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Mono<Boolean> canSchedule(ScheduleJob scheduleJob) {
        return findWorker(scheduleJob.getExecutor(), scheduleJob).hasElements();
    }

    protected Flux<Worker> findWorker(String str, ScheduleJob scheduleJob) {
        return this.workerSelector.select(Flux.fromIterable(this.workers.values()).filterWhen(worker -> {
            return worker.getSupportExecutors().map(list -> {
                return Boolean.valueOf(list.contains(str));
            }).defaultIfEmpty(false);
        }), scheduleJob);
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Flux<Task> schedule(ScheduleJob scheduleJob) {
        return (Flux) Flux.fromIterable(getExecutor(scheduleJob.getInstanceId(), scheduleJob.getNodeId())).flatMap(task -> {
            removeTask(task);
            return task.shutdown();
        }).thenMany(createExecutor(scheduleJob)).as(RuleConstants.Trace.traceFlux(scheduleJob, "schedule"));
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Mono<Void> shutdown(String str) {
        return getSchedulingTask(str).doOnNext(task -> {
            this.tasks.remove(task.getId());
        }).concatMapDelayError((v0) -> {
            return v0.shutdown();
        }).doAfterTerminate(() -> {
            clearExecutor(str);
        }).then();
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Mono<Void> shutdownTask(String str) {
        Task removeTask = removeTask(str);
        return null != removeTask ? removeTask.shutdown().then() : Mono.empty();
    }

    private Flux<Task> createExecutor(ScheduleJob scheduleJob) {
        return findWorker(scheduleJob.getExecutor(), scheduleJob).switchIfEmpty(Mono.error(() -> {
            return new UnsupportedOperationException("unsupported executor:" + scheduleJob.getExecutor());
        })).flatMap(worker -> {
            return worker.createTask(this.id, scheduleJob);
        }).doOnNext(this::addTask);
    }

    private void addTask(Task task) {
        Task put = this.tasks.put(task.getId(), task);
        if (put != null && task != put) {
            put.shutdown().subscribe();
        }
        getExecutor(task.getJob().getInstanceId(), task.getJob().getNodeId()).add(task);
    }

    private Task removeTask(String str) {
        Task task = this.tasks.get(str);
        if (task != null) {
            removeTask(task);
        }
        return task;
    }

    private void removeTask(Task task) {
        this.tasks.remove(task.getId());
        getExecutor(task.getJob().getInstanceId(), task.getJob().getNodeId()).remove(task);
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Flux<Task> getSchedulingTask(String str) {
        return Flux.fromIterable(getExecutor(str).values()).flatMapIterable(Function.identity());
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Mono<Task> getTask(String str) {
        return Mono.justOrEmpty(this.tasks.get(str));
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Flux<Task> getSchedulingTasks() {
        return Flux.fromIterable(this.executors.values()).flatMapIterable((v0) -> {
            return v0.values();
        }).flatMapIterable(Function.identity());
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public Mono<Long> totalTask() {
        return getSchedulingTasks().count();
    }

    private List<Task> getExecutor(String str, String str2) {
        return getExecutor(str).computeIfAbsent(str2, str3 -> {
            return new CopyOnWriteArrayList();
        });
    }

    private void clearExecutor(String str) {
        this.executors.remove(str);
    }

    private Map<String, List<Task>> getExecutor(String str) {
        return this.executors.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        });
    }

    public void addWorker(Worker worker) {
        this.workers.put(worker.getId(), worker);
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public void dispose() {
        Iterator<Task> it = this.tasks.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown().subscribe();
        }
    }

    @Override // org.jetlinks.rule.engine.api.scheduler.Scheduler
    public String getId() {
        return this.id;
    }

    public void setWorkerSelector(WorkerSelector workerSelector) {
        this.workerSelector = workerSelector;
    }
}
