package org.jetlinks.supports.cluster.event;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.DefaultPayload;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ServerNode;
import org.jetlinks.core.event.TopicPayload;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@Deprecated
/* loaded from: input_file:org/jetlinks/supports/cluster/event/RedisRSocketEventBroker.class */
public class RedisRSocketEventBroker extends RedisClusterEventBroker {
    private static final Logger log = LoggerFactory.getLogger(RedisRSocketEventBroker.class);
    private final RSocketAddress address;
    private String serverId;
    private final ConcurrentMap<String, RSocket> sockets;
    private ClusterCache<String, RSocketAddress> addressCache;
    private final ConcurrentMap<String, RSocketAddress> remotes;
    private final Map<String, EmitterProcessor<TopicPayload>> remoteSink;
    private final Map<String, EmitterProcessor<TopicPayload>> localSink;
    private final Set<String> connecting;
    private final Map<String, Disposable> polling;

    public RedisRSocketEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, RSocketAddress rSocketAddress) {
        super(clusterManager, reactiveRedisConnectionFactory);
        this.sockets = new ConcurrentHashMap();
        this.remotes = new ConcurrentHashMap();
        this.remoteSink = new ConcurrentHashMap();
        this.localSink = new ConcurrentHashMap();
        this.connecting = new HashSet();
        this.polling = new ConcurrentHashMap();
        this.address = rSocketAddress;
        init();
    }

    private void doStartPollEvent(String str, RSocket rSocket) {
        log.debug("{} start poll broker event from {}", this.serverId, str);
        Disposable remove = this.polling.remove(str);
        if (null != remove) {
            remove.dispose();
        }
        this.polling.put(str, rSocket.requestStream(ByteBufPayload.create(this.serverId)).doOnCancel(() -> {
            rSocket.dispose();
            log.debug("{} cancel poll broker event from {}", this.serverId, str);
        }).subscribe(payload -> {
            try {
                EmitterProcessor<TopicPayload> orCreateLocalSink = getOrCreateLocalSink(str);
                if (orCreateLocalSink.hasDownstreams()) {
                    orCreateLocalSink.onNext(TopicPayload.of(payload.getMetadataUtf8(), RSocketPayload.of(payload)));
                } else {
                    ReferenceCountUtil.safeRelease(payload);
                }
            } catch (Throwable th) {
                log.error("handle broker [{}] event error", str, th);
                ReferenceCountUtil.safeRelease(payload);
            }
        }));
    }

    public void connectRemote(String str) {
        if (this.serverId.equals(str) || this.connecting.contains(str)) {
            return;
        }
        EmitterProcessor<TopicPayload> orCreateLocalSink = getOrCreateLocalSink(str);
        RSocket rSocket = this.sockets.get(str);
        if (rSocket == null || rSocket.isDisposed() || !orCreateLocalSink.hasDownstreams()) {
            this.connecting.add(str);
            RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).reconnect(Retry.backoff(10L, Duration.ofSeconds(1L)).filter(th -> {
                return this.remotes.containsKey(str);
            }).doBeforeRetry(retrySignal -> {
                if (retrySignal.failure() != null) {
                    log.warn("reconnect rsocket event broker {}{}:{}", new Object[]{str, this.remotes.get(str), retrySignal.failure().getMessage()});
                }
            })).connect(() -> {
                RSocketAddress rSocketAddress = this.remotes.get(str);
                if (rSocketAddress == null) {
                    return null;
                }
                return TcpClientTransport.create(rSocketAddress.getPublicAddress(), rSocketAddress.getPublicPort());
            }).doOnNext(rSocket2 -> {
                RSocket put = this.sockets.put(str, rSocket2);
                if (put != null && put != rSocket2) {
                    put.dispose();
                }
                doStartPollEvent(str, rSocket2);
            }).doOnError(th2 -> {
                log.error("connect to cluster node [{}] error", str, th2);
            }).doFinally(signalType -> {
                this.connecting.remove(str);
            }).subscribe();
        } else {
            if (this.polling.get(str) == null || !this.polling.get(str).isDisposed()) {
                return;
            }
            doStartPollEvent(str, rSocket);
        }
    }

    @Override // org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    protected void handleServerNodeLeave(ServerNode serverNode) {
        this.remotes.clear();
        reloadAddresses().subscribe();
    }

    @Override // org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    protected void handleServerNodeJoin(ServerNode serverNode) {
        if (this.serverId.equals(serverNode.getId())) {
            return;
        }
        getOrCreateRemoteSink(serverNode.getId());
        this.addressCache.get(serverNode.getId()).switchIfEmpty(Mono.delay(Duration.ofSeconds(1L)).then(this.addressCache.get(serverNode.getId()))).subscribe(rSocketAddress -> {
            this.remotes.put(serverNode.getId(), rSocketAddress);
            connectRemote(serverNode.getId());
        });
    }

    public Mono<Void> reloadAddresses() {
        return this.addressCache.entries().doOnNext(entry -> {
            this.remotes.put(entry.getKey(), entry.getValue());
            connectRemote((String) entry.getKey());
        }).then();
    }

    protected Mono<Payload> topicPayloadToRSocketPayload(TopicPayload topicPayload) {
        try {
            try {
                Mono<Payload> just = Mono.just(DefaultPayload.create(topicPayload.getBody().nioBuffer(), ByteBuffer.wrap(topicPayload.getTopic().getBytes())));
                ReferenceCountUtil.safeRelease(topicPayload);
                return just;
            } catch (Exception e) {
                log.error(e.getMessage(), e);
                ReferenceCountUtil.safeRelease(topicPayload);
                return Mono.empty();
            }
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(topicPayload);
            throw th;
        }
    }

    public void init() {
        this.addressCache = this.clusterManager.getCache("__rsocket_addresses");
        this.serverId = this.clusterManager.getCurrentServerId();
        CloseableChannel closeableChannel = (CloseableChannel) RSocketServer.create(SocketAcceptor.forRequestStream(payload -> {
            String dataUtf8 = payload.getDataUtf8();
            log.debug("{} handle broker[{}] event request", this.serverId, dataUtf8);
            ReferenceCountUtil.safeRelease(payload);
            EmitterProcessor<TopicPayload> orCreateRemoteSink = getOrCreateRemoteSink(dataUtf8);
            return orCreateRemoteSink.hasDownstreams() ? Flux.empty() : orCreateRemoteSink.doOnCancel(() -> {
                log.debug("stop handle broker[{}] event request", dataUtf8);
            }).flatMap(this::topicPayloadToRSocketPayload);
        })).bind(TcpServerTransport.create(this.address.getPort())).doOnError(th -> {
            log.error(th.getMessage(), th);
        }).block();
        if (closeableChannel == null) {
            throw new IllegalStateException("start rsocket server" + this.address + " error");
        }
        this.disposable.add(closeableChannel);
        this.addressCache.put(this.serverId, this.address).block(Duration.ofSeconds(10L));
        reloadAddresses().block(Duration.ofSeconds(10L));
        this.disposable.add(Flux.interval(Duration.ofSeconds(10L)).flatMap(l -> {
            return reloadAddresses().onErrorContinue((th2, obj) -> {
            });
        }).subscribe());
        super.startup();
    }

    @Override // org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    public void shutdown() {
        super.shutdown();
        this.addressCache.remove(this.serverId).block();
        this.sockets.values().forEach((v0) -> {
            v0.dispose();
        });
    }

    @Override // org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    public void startup() {
    }

    private EmitterProcessor<TopicPayload> getOrCreateRemoteSink(String str) {
        return this.remoteSink.compute(str, (str2, emitterProcessor) -> {
            return (emitterProcessor == null || emitterProcessor.isDisposed()) ? EmitterProcessor.create(Integer.MAX_VALUE, false) : emitterProcessor;
        });
    }

    private EmitterProcessor<TopicPayload> getOrCreateLocalSink(String str) {
        return this.localSink.compute(str, (str2, emitterProcessor) -> {
            return (emitterProcessor == null || emitterProcessor.isDisposed()) ? EmitterProcessor.create(Integer.MAX_VALUE, false) : emitterProcessor;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.supports.cluster.event.RedisClusterEventBroker, org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    public Flux<TopicPayload> listen(String str, String str2) {
        return Flux.merge(new Publisher[]{getOrCreateLocalSink(str2), super.listen(str, str2)});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.supports.cluster.event.RedisClusterEventBroker, org.jetlinks.supports.cluster.event.AbstractClusterEventBroker
    public Mono<Void> dispatch(String str, String str2, TopicPayload topicPayload) {
        if (!this.remotes.containsKey(str2)) {
            ReferenceCountUtil.safeRelease(topicPayload);
            return Mono.empty();
        }
        EmitterProcessor<TopicPayload> emitterProcessor = this.remoteSink.get(str2);
        if (emitterProcessor != null && emitterProcessor.hasDownstreams() && !emitterProcessor.isDisposed()) {
            emitterProcessor.onNext(topicPayload);
            return Mono.empty();
        }
        log.debug("no rsocket broker [{}] event listener,fallback to redis", str2);
        connectRemote(str2);
        return super.dispatch(str, str2, topicPayload);
    }
}
