package org.jetlinks.rule.engine.cluster.balancer;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.scheduler.SchedulerSelector;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.cluster.RuleInstance;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.TaskSnapshotRepository;
import org.jetlinks.rule.engine.defaults.ScheduleJobCompiler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/balancer/DefaultSchedulerLoadBalancer.class */
public class DefaultSchedulerLoadBalancer implements SchedulerLoadBalancer {
    private static final Logger log = LoggerFactory.getLogger(DefaultSchedulerLoadBalancer.class);
    private boolean autoReBalance;
    protected final EventBus eventBus;
    protected final SchedulerRegistry registry;
    protected final TaskSnapshotRepository snapshotRepository;
    protected final SchedulerSelector schedulerSelector;

    public DefaultSchedulerLoadBalancer(EventBus eventBus, SchedulerRegistry schedulerRegistry, TaskSnapshotRepository taskSnapshotRepository) {
        this(eventBus, schedulerRegistry, taskSnapshotRepository, SchedulerSelector.selectAll);
    }

    public DefaultSchedulerLoadBalancer(EventBus eventBus, SchedulerRegistry schedulerRegistry, TaskSnapshotRepository taskSnapshotRepository, SchedulerSelector schedulerSelector) {
        this.autoReBalance = true;
        this.eventBus = eventBus;
        this.registry = schedulerRegistry;
        this.snapshotRepository = taskSnapshotRepository;
        this.schedulerSelector = schedulerSelector;
    }

    public void setup() {
        setupAsync().block(Duration.ofSeconds(30L));
    }

    public Mono<Void> setupAsync() {
        return Flux.fromIterable(this.registry.getLocalSchedulers()).flatMap(scheduler -> {
            return this.snapshotRepository.findBySchedulerId(scheduler.getId()).filterWhen(taskSnapshot -> {
                return canSchedule(scheduler, taskSnapshot.getJob()).flatMap(bool -> {
                    return !bool.booleanValue() ? this.snapshotRepository.removeTaskById(taskSnapshot.getId()).thenReturn(false) : Mono.just(true);
                });
            }).flatMap(taskSnapshot2 -> {
                return scheduler.schedule(taskSnapshot2.getJob()).flatMap(task -> {
                    return taskSnapshot2.getState() == Task.State.running ? task.start() : Mono.empty();
                }).onErrorResume(th -> {
                    log.debug(th.getMessage(), th);
                    return Mono.empty();
                });
            });
        }).doOnError(th -> {
            log.debug(th.getMessage(), th);
        }).then();
    }

    public void cleanup() {
    }

    @Override // org.jetlinks.rule.engine.cluster.balancer.SchedulerLoadBalancer
    public Mono<Void> reBalance(List<Scheduler> list, boolean z) {
        return CollectionUtils.isEmpty(list) ? Mono.empty() : Mono.error(new UnsupportedOperationException());
    }

    @Override // org.jetlinks.rule.engine.cluster.balancer.SchedulerLoadBalancer
    public Mono<Void> reBalance(List<Scheduler> list, RuleInstance ruleInstance, boolean z) {
        Map map = (Map) new ScheduleJobCompiler(ruleInstance.getId(), ruleInstance.getModel()).compile().stream().collect(Collectors.toMap((v0) -> {
            return v0.getNodeId();
        }, Function.identity()));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Iterator<Scheduler> it = list.iterator();
        while (it.hasNext()) {
            concurrentHashMap.put(it.next(), new ConcurrentHashMap(map));
        }
        return Flux.fromIterable(list).flatMap(scheduler -> {
            Map map2 = (Map) concurrentHashMap.get(scheduler);
            return scheduler.getSchedulingTask(ruleInstance.getId()).doOnNext(task -> {
            }).thenMany(Flux.defer(() -> {
                return createTask(scheduler, map2.values());
            }));
        }).collectList().flatMap(list2 -> {
            Flux flatMap = Flux.fromIterable(list2).flatMap(task -> {
                log.info("schedule new task[id={} instanceId={} ,nodeId={},executor={}] in scheduler[{}]", new Object[]{task.getId(), task.getJob().getInstanceId(), task.getJob().getNodeId(), task.getJob().getExecutor(), task.getSchedulerId()});
                return task.start().thenReturn(task);
            }).flatMap((v0) -> {
                return v0.dump();
            });
            TaskSnapshotRepository taskSnapshotRepository = this.snapshotRepository;
            taskSnapshotRepository.getClass();
            return (Mono) flatMap.as((v1) -> {
                return r1.saveTaskSnapshots(v1);
            });
        });
    }

    private Flux<Task> createTask(Scheduler scheduler, Collection<ScheduleJob> collection) {
        Flux filterWhen = Flux.fromIterable(collection).filterWhen(scheduleJob -> {
            return canSchedule(scheduler, scheduleJob);
        });
        scheduler.getClass();
        return filterWhen.flatMap(scheduler::schedule);
    }

    protected Mono<Boolean> canSchedule(Scheduler scheduler, ScheduleJob scheduleJob) {
        return Flux.merge(new Publisher[]{scheduler.canSchedule(scheduleJob), this.schedulerSelector.test(scheduler, scheduleJob)}).defaultIfEmpty(false).all((v0) -> {
            return v0.booleanValue();
        });
    }

    public void setAutoReBalance(boolean z) {
        this.autoReBalance = z;
    }
}
