package org.jetlinks.supports.scalecube.event;

import com.alibaba.fastjson.JSON;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.NativePayload;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cache.Caches;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventConsumer;
import org.jetlinks.supports.event.EventProducer;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/supports/scalecube/event/ScalecubeEventBusBroker.class */
public class ScalecubeEventBusBroker implements EventBroker, Disposable {
    private static final Logger log = LoggerFactory.getLogger(ScalecubeEventBusBroker.class);
    private static final String SUB_QUALIFIER = "/jeb/_sub";
    private static final String UNSUB_QUALIFIER = "/jeb/_unsub";
    private static final String HELLO_QUALIFIER = "/jeb/_hello";
    private static final String PUB_QUALIFIER = "/jeb/_pub";
    private static final String FROM_HEADER = "_f";
    private static final String TOPIC_HEADER = "_t";
    private static final String TOPIC_HEADER_HEADER = "_th";
    final ExtendedCluster cluster;
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<String, MemberEventConnection> cachedConnections = new NonBlockingHashMap();
    private final Sinks.Many<EventConnection> connections = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
    private final Map<String, List<Message>> earlyMessage = Caches.newCache(Duration.ofMinutes(10));

    /* loaded from: input_file:org/jetlinks/supports/scalecube/event/ScalecubeEventBusBroker$MemberEventConnection.class */
    public class MemberEventConnection implements EventConnection, EventProducer, EventConsumer {
        private Member member;
        private final Sinks.Many<Subscription> subscriptions = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
        private final Sinks.Many<Subscription> unSubscriptions = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
        private final Sinks.Many<TopicPayload> subscriber = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);
        private final Disposable.Composite disposable = Disposables.composite();
        private FluxSink<TopicPayload> publisher;

        public MemberEventConnection(Member member) {
            this.member = member;
            doOnDispose(Flux.create(fluxSink -> {
                this.publisher = fluxSink;
            }).flatMap(this::doPublish).subscribe());
        }

        private Mono<Void> doPublish(TopicPayload topicPayload) {
            Object bytes;
            try {
                String topic = topicPayload.getTopic();
                if (topicPayload.getPayload() instanceof NativePayload) {
                    bytes = topicPayload.getPayload().getNativeObject();
                    topicPayload.release();
                } else {
                    bytes = topicPayload.getBytes();
                }
                String str = null;
                if (topicPayload.getHeaders() != null) {
                    str = JSON.toJSONString(topicPayload.getHeaders());
                }
                return ScalecubeEventBusBroker.this.cluster.send(this.member, Message.builder().qualifier(ScalecubeEventBusBroker.PUB_QUALIFIER).header(ScalecubeEventBusBroker.TOPIC_HEADER, topic).header(ScalecubeEventBusBroker.TOPIC_HEADER_HEADER, str).header(ScalecubeEventBusBroker.FROM_HEADER, ScalecubeEventBusBroker.this.cluster.member().id()).data(bytes).build()).onErrorResume(th -> {
                    ScalecubeEventBusBroker.log.error(th.getMessage(), th);
                    return Mono.empty();
                });
            } catch (Throwable th2) {
                ScalecubeEventBusBroker.log.error(th2.getMessage(), th2);
                return Mono.empty();
            }
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public String getId() {
            return this.member.id();
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public boolean isAlive() {
            return ScalecubeEventBusBroker.this.cluster.member(this.member.id()).isPresent();
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public void doOnDispose(Disposable disposable) {
            this.disposable.add(disposable);
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public EventBroker getBroker() {
            return ScalecubeEventBusBroker.this;
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public Flux<Subscription> handleSubscribe() {
            return this.subscriptions.asFlux();
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public Flux<Subscription> handleUnSubscribe() {
            return this.unSubscriptions.asFlux();
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public FluxSink<TopicPayload> sink() {
            return this.publisher;
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Mono<Void> subscribe(Subscription subscription) {
            return ScalecubeEventBusBroker.this.cluster.send(this.member, ScalecubeEventBusBroker.this.createMessage(ScalecubeEventBusBroker.SUB_QUALIFIER, subscription)).then();
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Mono<Void> unsubscribe(Subscription subscription) {
            return ScalecubeEventBusBroker.this.cluster.send(this.member, ScalecubeEventBusBroker.this.createMessage(ScalecubeEventBusBroker.UNSUB_QUALIFIER, subscription)).then();
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Flux<TopicPayload> subscribe() {
            return this.subscriber.asFlux();
        }

        public void dispose() {
            this.disposable.dispose();
        }

        public String toString() {
            return this.member.alias() + "@" + this.member.address();
        }

        public MemberEventConnection(Member member, FluxSink<TopicPayload> fluxSink) {
            this.member = member;
            this.publisher = fluxSink;
        }

        public void setMember(Member member) {
            this.member = member;
        }
    }

    public ScalecubeEventBusBroker(ExtendedCluster extendedCluster) {
        this.cluster = extendedCluster;
        init();
    }

    private void init() {
        this.cluster.handler(extendedCluster -> {
            return new ClusterMessageHandler() { // from class: org.jetlinks.supports.scalecube.event.ScalecubeEventBusBroker.1
                public void onMessage(Message message) {
                    String header = message.header(ScalecubeEventBusBroker.FROM_HEADER);
                    if (StringUtils.isEmpty(header)) {
                        return;
                    }
                    MemberEventConnection orCreateConnection = ScalecubeEventBusBroker.this.getOrCreateConnection(header);
                    if (null != orCreateConnection) {
                        ScalecubeEventBusBroker.this.handleMessage(orCreateConnection, message);
                    } else {
                        ScalecubeEventBusBroker.log.info("received early message {} {}", header, message.data());
                        ((List) ScalecubeEventBusBroker.this.earlyMessage.computeIfAbsent(header, str -> {
                            return new CopyOnWriteArrayList();
                        })).add(message);
                    }
                }

                public void onGossip(Message message) {
                    onMessage(message);
                }

                public void onMembershipEvent(MembershipEvent membershipEvent) {
                    if (membershipEvent.isLeaving() || membershipEvent.isRemoved()) {
                        ScalecubeEventBusBroker.this.earlyMessage.remove(membershipEvent.member().id());
                        MemberEventConnection memberEventConnection = (MemberEventConnection) ScalecubeEventBusBroker.this.cachedConnections.remove(membershipEvent.member().id());
                        if (memberEventConnection != null) {
                            ScalecubeEventBusBroker.log.debug("remove event broker {}", membershipEvent.member().address());
                            memberEventConnection.dispose();
                        }
                    }
                    if (membershipEvent.isAdded() || membershipEvent.isUpdated()) {
                        ScalecubeEventBusBroker.this.getOrCreateConnection(membershipEvent.member());
                    }
                }
            };
        });
        for (Member member : this.cluster.otherMembers()) {
            this.cachedConnections.putIfAbsent(member.id(), new MemberEventConnection(member));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MemberEventConnection getOrCreateConnection(String str) {
        Member member = (Member) this.cluster.member(str).orElse(null);
        if (member == null) {
            return null;
        }
        return getOrCreateConnection(member);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMessage(MemberEventConnection memberEventConnection, Message message) {
        if (Objects.equals(message.qualifier(), PUB_QUALIFIER)) {
            String header = message.header(TOPIC_HEADER);
            String header2 = message.header(TOPIC_HEADER_HEADER);
            Object data = message.data();
            TopicPayload of = data instanceof byte[] ? TopicPayload.of(header, Payload.of((byte[]) message.data())) : TopicPayload.of(header, NativePayload.of(data));
            if (StringUtils.hasText(header2)) {
                of.addHeader(JSON.parseObject(header2));
            }
            log.trace("publish from {} : {}", memberEventConnection, header);
            memberEventConnection.subscriber.emitNext(of, Reactors.emitFailureHandler());
            return;
        }
        if (Objects.equals(message.qualifier(), SUB_QUALIFIER)) {
            log.debug("subscribe from {} : {}", memberEventConnection, message.data());
            memberEventConnection.subscriptions.emitNext(message.data(), Reactors.emitFailureHandler());
        } else if (Objects.equals(message.qualifier(), UNSUB_QUALIFIER)) {
            log.debug("unsubscribe from {} : {}", memberEventConnection, message.data());
            memberEventConnection.unSubscriptions.emitNext(message.data(), Reactors.emitFailureHandler());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MemberEventConnection getOrCreateConnection(Member member) {
        return this.cachedConnections.compute(member.id(), (str, memberEventConnection) -> {
            if (memberEventConnection != null) {
                memberEventConnection.setMember(member);
                return memberEventConnection;
            }
            log.debug("add event broker {}", member.address());
            MemberEventConnection memberEventConnection = new MemberEventConnection(member);
            this.connections.emitNext(memberEventConnection, (signalType, emitResult) -> {
                return emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED || emitResult == Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER;
            });
            List<Message> remove = this.earlyMessage.remove(member.id());
            if (null != remove) {
                Iterator<Message> it = remove.iterator();
                while (it.hasNext()) {
                    handleMessage(memberEventConnection, it.next());
                }
                remove.clear();
            }
            return memberEventConnection;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message createMessage(String str, Object obj) {
        return Message.builder().qualifier(str).data(obj).header(FROM_HEADER, this.cluster.member().id()).build();
    }

    @Override // org.jetlinks.supports.event.EventBroker
    public String getId() {
        return "scalecube";
    }

    @Override // org.jetlinks.supports.event.EventBroker
    public Flux<EventConnection> accept() {
        return Flux.concat(new Publisher[]{Flux.fromIterable(this.cachedConnections.values()), this.connections.asFlux()});
    }

    public void dispose() {
        this.disposable.dispose();
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }
}
