package org.jetlinks.supports.protocol.management;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.supports.protocol.StaticProtocolSupports;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/protocol/management/DefaultProtocolSupportManager.class */
public class DefaultProtocolSupportManager extends StaticProtocolSupports implements ProtocolSupportManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultProtocolSupportManager.class);
    public static final String topic = "/_sys/protocol/changed";
    private final EventBus eventBus;
    private final ClusterCache<String, ProtocolSupportDefinition> cache;
    private final ProtocolSupportLoader loader;
    private final Map<String, String> configProtocolIdMapping = new ConcurrentHashMap();
    private final Disposable.Composite disposable = Disposables.composite();
    private Duration loadTimeout = Duration.ofSeconds(30);

    public void init() {
        this.disposable.add(this.eventBus.subscribe(Subscription.builder().topics(new String[]{topic}).subscriberId("protocol-manager").local().broker().build(), topicPayload -> {
            return init((ProtocolSupportDefinition) topicPayload.decode(ProtocolSupportDefinition.class));
        }));
        try {
            loadAll().filter(protocolSupportDefinition -> {
                return protocolSupportDefinition.getState() == 1;
            }).flatMap(protocolSupportDefinition2 -> {
                return init(protocolSupportDefinition2).onErrorResume(th -> {
                    return Mono.empty();
                });
            }).blockLast(this.loadTimeout);
        } catch (Throwable th) {
            log.warn("load protocol error", th);
        }
    }

    public void shutdown() {
        this.disposable.dispose();
    }

    @Override // org.jetlinks.supports.protocol.management.ProtocolSupportManager
    public Mono<Boolean> store(Flux<ProtocolSupportDefinition> flux) {
        Mono collect = flux.collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, Function.identity()));
        ClusterCache<String, ProtocolSupportDefinition> clusterCache = this.cache;
        clusterCache.getClass();
        return collect.flatMap(clusterCache::putAll);
    }

    @Override // org.jetlinks.supports.protocol.management.ProtocolSupportManager
    public Flux<ProtocolSupportDefinition> loadAll() {
        return this.cache.values();
    }

    @Override // org.jetlinks.supports.protocol.management.ProtocolSupportManager
    public Mono<Boolean> save(ProtocolSupportDefinition protocolSupportDefinition) {
        return this.cache.put(protocolSupportDefinition.getId(), protocolSupportDefinition).flatMap(bool -> {
            return this.eventBus.publish(topic, protocolSupportDefinition).thenReturn(bool);
        });
    }

    @Override // org.jetlinks.supports.protocol.management.ProtocolSupportManager
    public Mono<Boolean> remove(String str) {
        return this.cache.get(str).doOnNext(protocolSupportDefinition -> {
            protocolSupportDefinition.setState((byte) -1);
        }).flatMap(protocolSupportDefinition2 -> {
            return this.eventBus.publish(topic, protocolSupportDefinition2);
        }).then(this.cache.remove(str));
    }

    public Mono<Void> init(ProtocolSupportDefinition protocolSupportDefinition) {
        return (Mono) Mono.defer(() -> {
            String str;
            String str2 = protocolSupportDefinition.getState() != 1 ? "uninstall" : "install";
            try {
                if (protocolSupportDefinition.getState() == 1 || (str = this.configProtocolIdMapping.get(protocolSupportDefinition.getId())) == null) {
                    Consumer consumer = protocolSupportDefinition.getState() != 1 ? this::unRegister : this::register;
                    log.debug("{} protocol:{}", str2, protocolSupportDefinition);
                    return this.loader.load(protocolSupportDefinition).doOnNext(protocolSupport -> {
                        protocolSupport.init(protocolSupportDefinition.getConfiguration());
                        log.debug("{} protocol[{}] success: {}", new Object[]{str2, protocolSupportDefinition.getId(), protocolSupport});
                        this.configProtocolIdMapping.put(protocolSupportDefinition.getId(), protocolSupport.getId());
                        consumer.accept(afterLoaded(protocolSupportDefinition, protocolSupport));
                    }).onErrorResume(th -> {
                        log.error("{} protocol[{}] error", new Object[]{str2, protocolSupportDefinition.getId(), th});
                        loadError(protocolSupportDefinition, th);
                        return Mono.empty();
                    }).then();
                }
                log.debug("uninstall protocol:{}", protocolSupportDefinition);
                unRegister(str);
                return Mono.empty();
            } catch (Throwable th2) {
                log.error("{} protocol error", str2, th2);
                loadError(protocolSupportDefinition, th2);
                return Mono.empty();
            }
        }).as(MonoTracer.create("/protocol/" + protocolSupportDefinition.getId() + "/init"));
    }

    protected void loadError(ProtocolSupportDefinition protocolSupportDefinition, Throwable th) {
    }

    protected ProtocolSupport afterLoaded(ProtocolSupportDefinition protocolSupportDefinition, ProtocolSupport protocolSupport) {
        return protocolSupport;
    }

    public DefaultProtocolSupportManager(EventBus eventBus, ClusterCache<String, ProtocolSupportDefinition> clusterCache, ProtocolSupportLoader protocolSupportLoader) {
        this.eventBus = eventBus;
        this.cache = clusterCache;
        this.loader = protocolSupportLoader;
    }

    public void setLoadTimeout(Duration duration) {
        this.loadTimeout = duration;
    }
}
