package org.jetlinks.supports.event;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ThreadLocalRandom;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.function.Function4;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/jetlinks/supports/event/InternalEventBus.class */
public class InternalEventBus implements EventBus {
    protected final Topic<SubscriptionInfo> subscriptionTable = Topic.createRoot();
    private static final Logger log = LoggerFactory.getLogger(InternalEventBus.class);
    private static final FastThreadLocal<Set<SubscriptionInfo>> PUB_HANDLERS = new FastThreadLocal<Set<SubscriptionInfo>>() { // from class: org.jetlinks.supports.event.InternalEventBus.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public Set<SubscriptionInfo> m34initialValue() {
            return new HashSet();
        }
    };
    private static final FastThreadLocal<Set<Object>> DISTINCT_HANDLERS = new FastThreadLocal<Set<Object>>() { // from class: org.jetlinks.supports.event.InternalEventBus.2
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public Set<Object> m35initialValue() {
            return new HashSet();
        }
    };
    private static final FastThreadLocal<Map<String, List<SubscriptionInfo>>> SHARED = new FastThreadLocal<Map<String, List<SubscriptionInfo>>>() { // from class: org.jetlinks.supports.event.InternalEventBus.3
        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: initialValue, reason: merged with bridge method [inline-methods] */
        public Map<String, List<SubscriptionInfo>> m36initialValue() {
            return new HashMap();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/supports/event/InternalEventBus$LocalHandler.class */
    public static class LocalHandler implements Function<TopicPayload, Mono<Void>> {
        private final int hashCode;
        private final Function<TopicPayload, Mono<Void>> handler;

        private LocalHandler(Function<TopicPayload, Mono<Void>> function) {
            this.hashCode = Objects.hashCode(this);
            this.handler = function;
        }

        public Mono<Void> apply0(TopicPayload topicPayload) {
            String topic = topicPayload.getTopic();
            try {
                return this.handler.apply(topicPayload).onErrorResume(th -> {
                    InternalEventBus.log.warn("handle publish [{}] error", topic, th);
                    return Mono.empty();
                });
            } catch (Throwable th2) {
                InternalEventBus.log.warn("handle publish [{}] error", topic, th2);
                return Mono.empty();
            }
        }

        @Override // java.util.function.Function
        public Mono<Void> apply(TopicPayload topicPayload) {
            return Mono.defer(() -> {
                return apply0(topicPayload);
            });
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    /* loaded from: input_file:org/jetlinks/supports/event/InternalEventBus$SubscriptionInfo.class */
    public static class SubscriptionInfo implements Disposable, Externalizable {
        private transient int $hashCodeCache;
        static Comparator<SubscriptionInfo> comparatorByTime = Comparator.comparingLong((v0) -> {
            return v0.getTime();
        });
        static Comparator<SubscriptionInfo> comparatorPriority = Comparator.comparingLong((v0) -> {
            return v0.getPriority();
        });
        private long time;
        private String subscriber;
        private String topic;
        private boolean cluster;
        private String clusterServerId;
        private long features;
        private transient Function<TopicPayload, Mono<Void>> handler;
        private int priority;

        public static SubscriptionInfo of(Subscription subscription, String str, Function<TopicPayload, Mono<Void>> function, boolean z) {
            SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
            subscriptionInfo.time = System.currentTimeMillis();
            subscriptionInfo.topic = str;
            subscriptionInfo.subscriber = subscription.getSubscriber();
            subscriptionInfo.handler = function;
            subscriptionInfo.features = EnumDict.toMask(subscription.getFeatures());
            subscriptionInfo.cluster = z;
            subscriptionInfo.priority = subscription.getPriority();
            return subscriptionInfo;
        }

        public void dispose() {
        }

        @JsonIgnore
        public boolean isDisposed() {
            return super.isDisposed();
        }

        boolean hasFeature(Subscription.Feature feature) {
            return feature.in(this.features);
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            objectOutput.writeLong(this.time);
            objectOutput.writeUTF(this.subscriber);
            objectOutput.writeUTF(this.topic);
            objectOutput.writeLong(this.features);
            objectOutput.writeBoolean(this.cluster);
            SerializeUtils.writeObject(this.clusterServerId, objectOutput);
            objectOutput.writeInt(this.priority);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.time = objectInput.readLong();
            this.subscriber = objectInput.readUTF();
            this.topic = objectInput.readUTF();
            this.features = objectInput.readLong();
            this.cluster = objectInput.readBoolean();
            this.clusterServerId = (String) SerializeUtils.readObject(objectInput);
            this.priority = objectInput.readInt();
        }

        public String toString() {
            return this.subscriber + "@" + (this.clusterServerId == null ? "local" : this.clusterServerId) + "::" + this.topic;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SubscriptionInfo)) {
                return false;
            }
            SubscriptionInfo subscriptionInfo = (SubscriptionInfo) obj;
            if (!subscriptionInfo.canEqual(this) || getTime() != subscriptionInfo.getTime() || isCluster() != subscriptionInfo.isCluster()) {
                return false;
            }
            String subscriber = getSubscriber();
            String subscriber2 = subscriptionInfo.getSubscriber();
            if (subscriber == null) {
                if (subscriber2 != null) {
                    return false;
                }
            } else if (!subscriber.equals(subscriber2)) {
                return false;
            }
            String topic = getTopic();
            String topic2 = subscriptionInfo.getTopic();
            if (topic == null) {
                if (topic2 != null) {
                    return false;
                }
            } else if (!topic.equals(topic2)) {
                return false;
            }
            String clusterServerId = getClusterServerId();
            String clusterServerId2 = subscriptionInfo.getClusterServerId();
            return clusterServerId == null ? clusterServerId2 == null : clusterServerId.equals(clusterServerId2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof SubscriptionInfo;
        }

        public int hashCode() {
            if (this.$hashCodeCache != 0) {
                return this.$hashCodeCache;
            }
            long time = getTime();
            int i = (((1 * 59) + ((int) ((time >>> 32) ^ time))) * 59) + (isCluster() ? 79 : 97);
            String subscriber = getSubscriber();
            int hashCode = (i * 59) + (subscriber == null ? 43 : subscriber.hashCode());
            String topic = getTopic();
            int hashCode2 = (hashCode * 59) + (topic == null ? 43 : topic.hashCode());
            String clusterServerId = getClusterServerId();
            int hashCode3 = (hashCode2 * 59) + (clusterServerId == null ? 43 : clusterServerId.hashCode());
            if (hashCode3 == 0) {
                hashCode3 = Integer.MIN_VALUE;
            }
            this.$hashCodeCache = hashCode3;
            return hashCode3;
        }

        public long getTime() {
            return this.time;
        }

        public String getSubscriber() {
            return this.subscriber;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean isCluster() {
            return this.cluster;
        }

        public String getClusterServerId() {
            return this.clusterServerId;
        }

        public long getFeatures() {
            return this.features;
        }

        public Function<TopicPayload, Mono<Void>> getHandler() {
            return this.handler;
        }

        public int getPriority() {
            return this.priority;
        }

        public SubscriptionInfo(long j, String str, String str2, boolean z, String str3, long j2, Function<TopicPayload, Mono<Void>> function, int i) {
            this.time = j;
            this.subscriber = str;
            this.topic = str2;
            this.cluster = z;
            this.clusterServerId = str3;
            this.features = j2;
            this.handler = function;
            this.priority = i;
        }

        public SubscriptionInfo() {
        }
    }

    public Flux<TopicPayload> subscribe(Subscription subscription) {
        return Flux.create(fluxSink -> {
            fluxSink.onDispose(subscribe(subscription, topicPayload -> {
                fluxSink.next(topicPayload);
                return Mono.empty();
            }));
        });
    }

    public <T> Flux<T> subscribe(Subscription subscription, Decoder<T> decoder) {
        return subscribe(subscription).mapNotNull(topicPayload -> {
            try {
                return topicPayload.decode(decoder, false);
            } catch (Throwable th) {
                log.error("decode message [{}] error", topicPayload.getTopic(), th);
                return null;
            } finally {
                ReferenceCountUtil.safeRelease(topicPayload);
            }
        });
    }

    public <T> Flux<T> subscribe(Subscription subscription, Class<T> cls) {
        return subscribe(subscription).mapNotNull(topicPayload -> {
            try {
                return topicPayload.decode(cls);
            } catch (Throwable th) {
                log.error("decode message [{}] error", topicPayload.getTopic(), th);
                return null;
            }
        });
    }

    public Disposable subscribe(Subscription subscription, Function<TopicPayload, Mono<Void>> function) {
        Disposable.Composite composite = Disposables.composite();
        LocalHandler localHandler = new LocalHandler(function);
        for (String str : subscription.getTopics()) {
            SubscriptionInfo of = SubscriptionInfo.of(subscription, str, localHandler, false);
            log.debug("subscribe: {}", of);
            Topic append = this.subscriptionTable.append(str);
            append.subscribe(new SubscriptionInfo[]{of});
            composite.add(() -> {
                log.debug("unsubscribe: {}", of);
                append.unsubscribe(new SubscriptionInfo[]{of});
                of.dispose();
            });
            if (of.hasFeature(Subscription.Feature.broker)) {
                composite.add(subscribeToCluster(of));
            }
        }
        return composite;
    }

    protected Disposable subscribeToCluster(SubscriptionInfo subscriptionInfo) {
        return Disposables.disposed();
    }

    public <T> Mono<Long> publish(String str, Publisher<T> publisher) {
        return doPublish(str, publisher, (str2, publisher2, list, contextView) -> {
            return Flux.from(publisher2).flatMap(obj -> {
                return publishFromLocal(str2, obj, list, contextView);
            }).then();
        }, subscriptionInfo -> {
            return subscriptionInfo.isCluster() || subscriptionInfo.hasFeature(Subscription.Feature.local);
        });
    }

    public <T> Mono<Long> publish(String str, T t, Scheduler scheduler) {
        return publish(str, (String) t).subscribeOn(scheduler);
    }

    public <T> Mono<Long> publish(String str, Encoder<T> encoder, T t) {
        return publish(str, (String) t);
    }

    public <T> Mono<Long> publish(String str, T t) {
        return doPublish(str, t, this::publishFromLocal, subscriptionInfo -> {
            return subscriptionInfo.isCluster() || subscriptionInfo.hasFeature(Subscription.Feature.local);
        });
    }

    protected Mono<Void> doPublish0(String str, TopicPayload topicPayload, List<SubscriptionInfo> list, ContextView contextView) {
        int size = list.size();
        TreeMap treeMap = size == 1 ? null : new TreeMap();
        Mono<Void> mono = null;
        for (SubscriptionInfo subscriptionInfo : list) {
            log.trace("publish {} to {}", str, subscriptionInfo);
            mono = (Mono) subscriptionInfo.handler.apply(topicPayload);
            if (size > 1) {
                ((ArrayList) treeMap.computeIfAbsent(Integer.valueOf(subscriptionInfo.priority), num -> {
                    return new ArrayList(size);
                })).add(mono);
            }
        }
        return mono == null ? Mono.empty() : size == 1 ? mono : treeMap.size() == 1 ? Flux.merge((Iterable) treeMap.get(Integer.valueOf(list.get(0).priority))).then() : Flux.fromIterable(treeMap.values()).concatMap((v0) -> {
            return Flux.merge(v0);
        }).then();
    }

    private <T> Mono<Void> publishFromLocal(String str, T t, List<SubscriptionInfo> list, ContextView contextView) {
        TopicPayload of = TopicPayload.of(str, Payload.of(t, (Encoder) null));
        TraceHolder.writeContextTo(contextView, of, (v0, v1, v2) -> {
            v0.addHeader(v1, v2);
        });
        return doPublish0(str, of, list, contextView);
    }

    public <T> Mono<Long> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher) {
        return publish(str, (Publisher) publisher);
    }

    public <T> Mono<Long> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher, Scheduler scheduler) {
        return publish(str, (Publisher) publisher);
    }

    private <T> Mono<Long> doPublish(String str, T t, Function4<String, T, List<SubscriptionInfo>, ContextView, Mono<Void>> function4, Predicate<SubscriptionInfo> predicate) {
        Map map = (Map) SHARED.get();
        Set set = (Set) DISTINCT_HANDLERS.get();
        Set set2 = (Set) PUB_HANDLERS.get();
        try {
            this.subscriptionTable.findTopic(str, predicate, map, set, set2, (predicate2, map2, set3, set4, topic) -> {
                Set<SubscriptionInfo> subscribers = topic.getSubscribers();
                if (subscribers.isEmpty()) {
                    return;
                }
                for (SubscriptionInfo subscriptionInfo : subscribers) {
                    if (predicate2.test(subscriptionInfo) && set3.add(subscriptionInfo.handler)) {
                        if (subscriptionInfo.hasFeature(Subscription.Feature.shared)) {
                            ((List) map2.computeIfAbsent(subscriptionInfo.subscriber, str2 -> {
                                return new ArrayList(8);
                            })).add(subscriptionInfo);
                        } else {
                            set4.add(subscriptionInfo);
                        }
                    }
                }
            }, (predicate3, map3, set5, set6) -> {
                if (map3.isEmpty()) {
                    return;
                }
                for (List<SubscriptionInfo> list : map3.values()) {
                    int size = list.size();
                    if (size != 0) {
                        SubscriptionInfo subscriptionInfo = (SubscriptionInfo) list.get(0);
                        if (size == 1) {
                            set6.add(subscriptionInfo);
                        } else {
                            if (subscriptionInfo.hasFeature(Subscription.Feature.sharedLocalFirst)) {
                                for (SubscriptionInfo subscriptionInfo2 : list) {
                                    if (!subscriptionInfo2.isCluster()) {
                                        set6.add(subscriptionInfo2);
                                        break;
                                    }
                                }
                            }
                            if (subscriptionInfo.hasFeature(Subscription.Feature.sharedOldest)) {
                                list.sort(SubscriptionInfo.comparatorByTime);
                                set6.add(list.get(0));
                            } else {
                                set6.add(list.get(ThreadLocalRandom.current().nextInt(0, size)));
                            }
                        }
                    }
                }
            });
            if (set2.isEmpty()) {
                Mono<Long> mono = Reactors.ALWAYS_ZERO_LONG;
                map.clear();
                set.clear();
                set2.clear();
                return mono;
            }
            ArrayList arrayList = new ArrayList(set2);
            Mono<Long> thenReturn = Mono.deferContextual(contextView -> {
                return (Mono) function4.apply(str, t, arrayList, contextView);
            }).thenReturn(Long.valueOf(arrayList.size()));
            map.clear();
            set.clear();
            set2.clear();
            return thenReturn;
        } catch (Throwable th) {
            map.clear();
            set.clear();
            set2.clear();
            throw th;
        }
    }
}
