package org.jetlinks.supports.scalecube;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

/* loaded from: input_file:org/jetlinks/supports/scalecube/ExtendedClusterImpl.class */
public class ExtendedClusterImpl implements ExtendedCluster {
    private static final Logger log = LoggerFactory.getLogger(ExtendedClusterImpl.class);
    private static final String FEATURE_QUALIFIER = "_c_fts_q";
    private static final String FEATURE_FROM = "_c_fts_f";
    private final ClusterImpl real;
    private final Sinks.Many<Message> messageSink;
    private final Sinks.Many<Message> gossipSink;
    private final Sinks.Many<MembershipEvent> membershipEvents;
    private final List<ClusterMessageHandler> handlers;
    private volatile boolean started;
    private final List<Mono<Void>> startThen;
    private final List<Message> messageCache;
    private long cacheEndWithTime;
    private final List<String> localFeatures;
    private final Map<String, Set<String>> featureMembers;
    private final Disposable.Composite disposable;

    /* loaded from: input_file:org/jetlinks/supports/scalecube/ExtendedClusterImpl$ClusterMessageHandlerDispatcher.class */
    class ClusterMessageHandlerDispatcher implements ClusterMessageHandler {
        ClusterMessageHandlerDispatcher() {
        }

        public void onMessage(Message message) {
            if (System.currentTimeMillis() <= ExtendedClusterImpl.this.cacheEndWithTime && ExtendedClusterImpl.this.messageCache.size() < 2048) {
                ExtendedClusterImpl.this.messageCache.add(message);
            }
            ExtendedClusterImpl.this.messageSink.emitNext(message, Reactors.emitFailureHandler());
            ExtendedClusterImpl.this.doHandler(message, (v0, v1) -> {
                v0.onMessage(v1);
            });
        }

        public void onGossip(Message message) {
            if (ExtendedClusterImpl.FEATURE_QUALIFIER.equals(message.qualifier())) {
                ExtendedClusterImpl.this.member(message.header(ExtendedClusterImpl.FEATURE_FROM)).ifPresent(member -> {
                    ExtendedClusterImpl.this.addFeature(member, (Collection) message.data());
                });
            } else {
                ExtendedClusterImpl.this.messageSink.emitNext(message, Reactors.emitFailureHandler());
                ExtendedClusterImpl.this.doHandler(message, (v0, v1) -> {
                    v0.onGossip(v1);
                });
            }
        }

        public void onMembershipEvent(MembershipEvent membershipEvent) {
            ExtendedClusterImpl.this.membershipEvents.emitNext(membershipEvent, Reactors.emitFailureHandler());
            ExtendedClusterImpl.this.doHandler(membershipEvent, (v0, v1) -> {
                v0.onMembershipEvent(v1);
            });
            if (membershipEvent.isRemoved() || membershipEvent.isLeaving()) {
                ExtendedClusterImpl.this.removeFeature(membershipEvent.member());
            }
            if (membershipEvent.isAdded()) {
                ExtendedClusterImpl.this.broadcastFeature().subscribe();
            }
        }
    }

    public ExtendedClusterImpl(ClusterConfig clusterConfig) {
        this(new ClusterImpl(clusterConfig));
    }

    public ExtendedClusterImpl(ClusterImpl clusterImpl) {
        this.messageSink = Sinks.many().multicast().directBestEffort();
        this.gossipSink = Sinks.many().multicast().directBestEffort();
        this.membershipEvents = Sinks.many().multicast().directBestEffort();
        this.handlers = new CopyOnWriteArrayList();
        this.startThen = new CopyOnWriteArrayList();
        this.messageCache = new CopyOnWriteArrayList();
        this.localFeatures = new CopyOnWriteArrayList();
        this.featureMembers = new ConcurrentHashMap();
        this.disposable = Disposables.composite();
        this.real = clusterImpl.handler(cluster -> {
            return new ClusterMessageHandlerDispatcher();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addFeature(Member member, Collection<String> collection) {
        if (CollectionUtils.isEmpty(collection)) {
            return;
        }
        log.debug("register cluster [{}] feature:{}", member.alias() == null ? member.id() : member.alias(), collection);
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            Set<String> computeIfAbsent = this.featureMembers.computeIfAbsent(it.next(), str -> {
                return new ConcurrentHashMap().keySet(str);
            });
            computeIfAbsent.add(member.id());
            if (StringUtils.hasText(member.alias())) {
                computeIfAbsent.add(member.alias());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeFeature(Member member) {
        for (Set<String> set : this.featureMembers.values()) {
            if (set.remove(member.id())) {
                log.debug("remove cluster [{}] features", member.alias() == null ? member.id() : member.alias());
            }
            if (StringUtils.hasText(member.alias())) {
                set.remove(member.alias());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void doHandler(T t, BiConsumer<ClusterMessageHandler, T> biConsumer) {
        Iterator<ClusterMessageHandler> it = this.handlers.iterator();
        while (it.hasNext()) {
            biConsumer.accept(it.next(), t);
        }
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public ExtendedClusterImpl handler(Function<ExtendedCluster, ClusterMessageHandler> function) {
        ClusterMessageHandler apply = function.apply(this);
        this.handlers.add(apply);
        writeCacheMessage(apply);
        return this;
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public ExtendedClusterImpl handler(ClusterMessageHandler clusterMessageHandler) {
        this.handlers.add(clusterMessageHandler);
        writeCacheMessage(clusterMessageHandler);
        return this;
    }

    private void writeCacheMessage(ClusterMessageHandler clusterMessageHandler) {
        Iterator<Message> it = this.messageCache.iterator();
        while (it.hasNext()) {
            clusterMessageHandler.onMessage(it.next());
        }
    }

    public Mono<ExtendedCluster> start() {
        this.started = true;
        this.cacheEndWithTime = System.currentTimeMillis() + Duration.ofSeconds(30L).toMillis();
        Mono then = this.real.start().then(Mono.defer(this::broadcastFeature));
        Flux flatMap = Flux.fromIterable(this.startThen).flatMap(Function.identity());
        List<Mono<Void>> list = this.startThen;
        list.getClass();
        return then.then(flatMap.then(Mono.fromRunnable(list::clear))).then(Mono.fromRunnable(this::startBroadcastFeature)).thenReturn(this);
    }

    public ExtendedCluster startAwait() {
        start().block();
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> broadcastFeature() {
        return spreadGossip(Message.builder().qualifier(FEATURE_QUALIFIER).header(FEATURE_FROM, member().id()).data(this.localFeatures).build()).then();
    }

    private void startBroadcastFeature() {
        addFeature(member(), this.localFeatures);
        this.disposable.add(Flux.interval(Duration.ofSeconds(10L), Duration.ofSeconds(30L)).flatMap(l -> {
            return broadcastFeature().onErrorResume(th -> {
                return Mono.empty();
            });
        }).subscribe());
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public Flux<MembershipEvent> listenMembership() {
        return this.membershipEvents.asFlux();
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public Disposable listenMessage(@Nonnull String str, BiFunction<Message, ExtendedCluster, Mono<Void>> biFunction) {
        return listen(this.messageSink, str, biFunction);
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public Disposable listenGossip(@Nonnull String str, BiFunction<Message, ExtendedCluster, Mono<Void>> biFunction) {
        return listen(this.gossipSink, str, biFunction);
    }

    private Disposable listen(Sinks.Many<Message> many, @Nonnull String str, BiFunction<Message, ExtendedCluster, Mono<Void>> biFunction) {
        return many.asFlux().filter(message -> {
            return Objects.equals(str, message.qualifier());
        }).flatMap(message2 -> {
            return ((Mono) biFunction.apply(message2, this)).contextWrite(TraceHolder.readToContext(Context.empty(), message2.headers())).onErrorResume(th -> {
                log.error(th.getMessage(), th);
                return Mono.empty();
            });
        }).subscribe();
    }

    public Address address() {
        return this.real.address();
    }

    public Mono<Void> send(Member member, Message message) {
        return TraceHolder.isEnabled() ? TraceHolder.writeContextTo(Message.with(message), (v0, v1, v2) -> {
            v0.header(v1, v2);
        }).flatMap(builder -> {
            return this.real.send(member, builder.build());
        }) : this.real.send(member, message);
    }

    public Mono<Void> send(Address address, Message message) {
        return TraceHolder.isEnabled() ? TraceHolder.writeContextTo(Message.with(message), (v0, v1, v2) -> {
            v0.header(v1, v2);
        }).flatMap(builder -> {
            return this.real.send(address, builder.build());
        }) : this.real.send(address, message);
    }

    public Mono<Message> requestResponse(Address address, Message message) {
        return TraceHolder.isEnabled() ? TraceHolder.writeContextTo(Message.with(message), (v0, v1, v2) -> {
            v0.header(v1, v2);
        }).flatMap(builder -> {
            return this.real.requestResponse(address, message);
        }) : this.real.requestResponse(address, message);
    }

    public Mono<Message> requestResponse(Member member, Message message) {
        return TraceHolder.isEnabled() ? TraceHolder.writeContextTo(Message.with(message), (v0, v1, v2) -> {
            v0.header(v1, v2);
        }).flatMap(builder -> {
            return this.real.requestResponse(member, message);
        }) : this.real.requestResponse(member, message);
    }

    public Mono<String> spreadGossip(Message message) {
        return TraceHolder.isEnabled() ? TraceHolder.writeContextTo(Message.with(message), (v0, v1, v2) -> {
            v0.header(v1, v2);
        }).flatMap(builder -> {
            return this.real.spreadGossip(message);
        }) : this.real.spreadGossip(message);
    }

    public <T> Optional<T> metadata() {
        return this.real.metadata();
    }

    public <T> Optional<T> metadata(Member member) {
        return this.real.metadata(member);
    }

    public Member member() {
        return this.real.member();
    }

    public Optional<Member> member(String str) {
        return this.real.member(str);
    }

    public Optional<Member> member(Address address) {
        return this.real.member(address);
    }

    public Collection<Member> members() {
        return this.real.members();
    }

    public Collection<Member> otherMembers() {
        return this.real.otherMembers();
    }

    public <T> Mono<Void> updateMetadata(T t) {
        if (this.started) {
            return this.real.updateMetadata(t);
        }
        this.startThen.add(this.real.updateMetadata(t));
        return null;
    }

    public void shutdown() {
        this.real.shutdown();
    }

    public Mono<Void> onShutdown() {
        return this.real.onShutdown();
    }

    public boolean isShutdown() {
        return this.real.isShutdown();
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public void registerFeatures(Collection<String> collection) {
        this.localFeatures.addAll(collection);
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public List<Member> featureMembers(String str) {
        Set<String> set = this.featureMembers.get(str);
        if (CollectionUtils.isEmpty(set)) {
            return Collections.emptyList();
        }
        Collection<Member> otherMembers = otherMembers();
        ArrayList arrayList = new ArrayList(otherMembers.size() + 1);
        for (Member member : otherMembers) {
            if (set.contains(member.id())) {
                arrayList.add(member);
            }
        }
        if (set.contains(member().id())) {
            arrayList.add(member());
        }
        return arrayList;
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public boolean supportFeature(String str, String str2) {
        Set<String> set = this.featureMembers.get(str2);
        if (CollectionUtils.isEmpty(set)) {
            return false;
        }
        return set.contains(str);
    }

    @Override // org.jetlinks.supports.scalecube.ExtendedCluster
    public /* bridge */ /* synthetic */ ExtendedCluster handler(Function function) {
        return handler((Function<ExtendedCluster, ClusterMessageHandler>) function);
    }
}
