package org.jetlinks.supports.scalecube.event;

import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.scalecube.services.annotations.Service;
import io.scalecube.services.annotations.ServiceMethod;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.jetlinks.core.NativePayload;
import org.jetlinks.core.Payload;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventConsumer;
import org.jetlinks.supports.event.EventProducer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/supports/scalecube/event/ClusterEventBusBroker.class */
public class ClusterEventBusBroker implements EventBroker, Disposable {
    private static final Logger log = LoggerFactory.getLogger(ClusterEventBusBroker.class);
    private final RpcManager rpcManager;
    private final Map<String, RpcEventConnection> connections = new ConcurrentHashMap();
    private final Sinks.Many<EventConnection> acceptSink = Reactors.createMany();

    @Service
    /* loaded from: input_file:org/jetlinks/supports/scalecube/event/ClusterEventBusBroker$Api.class */
    public interface Api {
        @ServiceMethod
        Mono<Void> sub(ByteBuf byteBuf);

        @ServiceMethod
        Mono<Void> unsub(ByteBuf byteBuf);

        @ServiceMethod
        Mono<Void> pub(ByteBuf byteBuf);
    }

    /* loaded from: input_file:org/jetlinks/supports/scalecube/event/ClusterEventBusBroker$ApiImpl.class */
    public class ApiImpl implements Api {
        public ApiImpl() {
        }

        private void handleSubs(ByteBuf byteBuf, BiConsumer<RpcEventConnection, Subscription> biConsumer) {
            try {
                ObjectInput createInput = ClusterEventBusBroker.this.createInput(byteBuf);
                Throwable th = null;
                try {
                    try {
                        RpcEventConnection rpcEventConnection = (RpcEventConnection) ClusterEventBusBroker.this.connections.get(createInput.readUTF());
                        if (null != rpcEventConnection) {
                            Subscription subscription = new Subscription();
                            subscription.readExternal(createInput);
                            biConsumer.accept(rpcEventConnection, subscription);
                        }
                        if (createInput != null) {
                            if (0 != 0) {
                                try {
                                    createInput.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createInput.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (createInput != null) {
                        if (th != null) {
                            try {
                                createInput.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createInput.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                ClusterEventBusBroker.log.error("Error handling subscription", th5);
            }
        }

        @Override // org.jetlinks.supports.scalecube.event.ClusterEventBusBroker.Api
        public Mono<Void> sub(ByteBuf byteBuf) {
            handleSubs(byteBuf, (rpcEventConnection, subscription) -> {
                rpcEventConnection.subscriptions.emitNext(subscription, Reactors.emitFailureHandler());
            });
            return Mono.empty();
        }

        @Override // org.jetlinks.supports.scalecube.event.ClusterEventBusBroker.Api
        public Mono<Void> unsub(ByteBuf byteBuf) {
            handleSubs(byteBuf, (rpcEventConnection, subscription) -> {
                rpcEventConnection.subscriptions.emitNext(subscription, Reactors.emitFailureHandler());
            });
            return Mono.empty();
        }

        @Override // org.jetlinks.supports.scalecube.event.ClusterEventBusBroker.Api
        public Mono<Void> pub(ByteBuf byteBuf) {
            try {
                ObjectInput createInput = ClusterEventBusBroker.this.createInput(byteBuf);
                Throwable th = null;
                try {
                    try {
                        RpcEventConnection rpcEventConnection = (RpcEventConnection) ClusterEventBusBroker.this.connections.get(createInput.readUTF());
                        if (null != rpcEventConnection) {
                            String readUTF = createInput.readUTF();
                            SerializeUtils.readMap(createInput, (v0) -> {
                                return Maps.newLinkedHashMapWithExpectedSize(v0);
                            });
                            Object readObject = SerializeUtils.readObject(createInput);
                            rpcEventConnection.producer.emitNext(readObject instanceof ByteBuf ? TopicPayload.of(readUTF, Payload.of((ByteBuf) readObject)) : TopicPayload.of(readUTF, NativePayload.of(readObject)), Reactors.emitFailureHandler());
                        }
                        if (createInput != null) {
                            if (0 != 0) {
                                try {
                                    createInput.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createInput.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th3) {
                ClusterEventBusBroker.log.error("Error handling subscription", th3);
            }
            return Mono.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/supports/scalecube/event/ClusterEventBusBroker$RpcEventConnection.class */
    public class RpcEventConnection implements EventConnection, EventProducer, EventConsumer {
        private final String id;
        private final Api api;
        private final Disposable.Composite disposable = Disposables.composite();
        private final Sinks.Many<TopicPayload> consumer = Reactors.createMany(Integer.MAX_VALUE, false);
        private final Sinks.Many<TopicPayload> producer = Reactors.createMany(Integer.MAX_VALUE, false);
        private final Sinks.Many<Subscription> subscriptions = Reactors.createMany(Integer.MAX_VALUE, false);
        private final Sinks.Many<Subscription> unSubscriptions = Reactors.createMany(Integer.MAX_VALUE, false);
        private FluxSink<TopicPayload> sink;

        public RpcEventConnection(String str, Api api) {
            this.id = str;
            this.api = api;
            doOnDispose(Flux.create(fluxSink -> {
                this.sink = fluxSink;
            }).flatMap(topicPayload -> {
                ByteBuf encodePayload = encodePayload(topicPayload);
                return null != encodePayload ? this.api.pub(encodePayload) : Mono.empty();
            }).subscribe());
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public String getId() {
            return this.id;
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public boolean isAlive() {
            return true;
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public void doOnDispose(Disposable disposable) {
            this.disposable.add(disposable);
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public EventBroker getBroker() {
            return ClusterEventBusBroker.this;
        }

        public void dispose() {
            this.disposable.dispose();
            this.subscriptions.tryEmitComplete();
            this.unSubscriptions.tryEmitComplete();
            this.consumer.tryEmitComplete();
            this.sink.complete();
        }

        public boolean isDisposed() {
            return this.disposable.isDisposed();
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Mono<Void> subscribe(Subscription subscription) {
            return this.api.sub(encodeSubscription(subscription));
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Mono<Void> unsubscribe(Subscription subscription) {
            return this.api.unsub(encodeSubscription(subscription));
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Flux<TopicPayload> subscribe() {
            return this.producer.asFlux();
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public Flux<Subscription> handleSubscribe() {
            return this.subscriptions.asFlux();
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public Flux<Subscription> handleUnSubscribe() {
            return this.unSubscriptions.asFlux();
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public FluxSink<TopicPayload> sink() {
            return this.sink;
        }

        /* JADX WARN: Failed to calculate best type for var: r7v1 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Failed to calculate best type for var: r7v1 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
         */
        /* JADX WARN: Failed to calculate best type for var: r8v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Failed to calculate best type for var: r8v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 7, insn: 0x0095: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:28:0x0095 */
        /* JADX WARN: Not initialized variable reg: 8, insn: 0x0099: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:30:0x0099 */
        /* JADX WARN: Type inference failed for: r7v1, types: [java.io.ObjectOutput] */
        /* JADX WARN: Type inference failed for: r8v0, types: [java.lang.Throwable] */
        private ByteBuf encodePayload(TopicPayload topicPayload) {
            ?? r7;
            ?? r8;
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
            try {
                try {
                    ObjectOutput createOutput = ClusterEventBusBroker.this.createOutput(buffer);
                    Throwable th = null;
                    createOutput.writeUTF(ClusterEventBusBroker.this.rpcManager.currentServerId());
                    createOutput.writeUTF(topicPayload.getTopic());
                    SerializeUtils.writeKeyValue(topicPayload.getHeaders(), createOutput);
                    NativePayload payload = topicPayload.getPayload();
                    if (payload instanceof NativePayload) {
                        SerializeUtils.writeObject(payload.getNativeObject(), createOutput);
                    } else {
                        createOutput.write(payload.getBytes());
                    }
                    if (createOutput != null) {
                        if (0 != 0) {
                            try {
                                createOutput.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createOutput.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (r7 != 0) {
                        if (r8 != 0) {
                            try {
                                r7.close();
                            } catch (Throwable th4) {
                                r8.addSuppressed(th4);
                            }
                        } else {
                            r7.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                ClusterEventBusBroker.log.error(th5.getMessage(), th5);
            }
            return buffer;
        }

        private ByteBuf encodeSubscription(Subscription subscription) {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
            ObjectOutput createOutput = ClusterEventBusBroker.this.createOutput(buffer);
            Throwable th = null;
            try {
                try {
                    createOutput.writeUTF(ClusterEventBusBroker.this.rpcManager.currentServerId());
                    subscription.writeExternal(createOutput);
                    if (createOutput != null) {
                        if (0 != 0) {
                            try {
                                createOutput.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createOutput.close();
                        }
                    }
                    return buffer;
                } finally {
                }
            } finally {
            }
        }

        public RpcEventConnection(String str, Api api, FluxSink<TopicPayload> fluxSink) {
            this.id = str;
            this.api = api;
            this.sink = fluxSink;
        }
    }

    public ClusterEventBusBroker(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
        rpcManager.listen(Api.class).subscribe(this::handleServiceEvent);
        rpcManager.getServices(Api.class).subscribe(this::handleService);
        rpcManager.registerService(new ApiImpl());
    }

    private void handleServiceEvent(ServiceEvent serviceEvent) {
        if (serviceEvent.getType() == ServiceEvent.Type.removed) {
            disposeConnection(this.connections.remove(serviceEvent.getServerNodeId()));
        } else {
            this.rpcManager.getService(serviceEvent.getServerNodeId(), Api.class).subscribe(api -> {
                handleService(serviceEvent, api);
            });
        }
    }

    protected void handleService(ServiceEvent serviceEvent, Api api) {
        try {
            RpcEventConnection rpcEventConnection = new RpcEventConnection(serviceEvent.getServerNodeId(), api);
            RpcEventConnection put = this.connections.put(serviceEvent.getServerNodeId(), rpcEventConnection);
            if (put != null) {
                disposeConnection(put);
            } else if (this.acceptSink.currentSubscriberCount() > 0) {
                this.acceptSink.emitNext(rpcEventConnection, Reactors.emitFailureHandler());
            }
        } catch (Throwable th) {
            log.warn("register service error {}", serviceEvent.getServiceId(), th);
        }
    }

    protected void handleService(RpcService<Api> rpcService) {
        try {
            RpcEventConnection rpcEventConnection = new RpcEventConnection(rpcService.serverNodeId(), (Api) rpcService.service());
            RpcEventConnection put = this.connections.put(rpcService.serverNodeId(), rpcEventConnection);
            if (put != null) {
                disposeConnection(put);
            } else if (this.acceptSink.currentSubscriberCount() > 0) {
                this.acceptSink.emitNext(rpcEventConnection, Reactors.emitFailureHandler());
            }
        } catch (Throwable th) {
            log.warn("register service error {}", rpcService.serverNodeId(), th);
        }
    }

    private void disposeConnection(RpcEventConnection rpcEventConnection) {
        if (null != rpcEventConnection) {
            rpcEventConnection.dispose();
        }
    }

    @Override // org.jetlinks.supports.event.EventBroker
    public String getId() {
        return "rpc-cluster-broker";
    }

    @Override // org.jetlinks.supports.event.EventBroker
    public Flux<EventConnection> accept() {
        return Flux.concat(new Publisher[]{Flux.fromIterable(this.connections.values()), this.acceptSink.asFlux()});
    }

    public void dispose() {
        this.connections.values().forEach(this::disposeConnection);
    }

    protected ObjectOutput createOutput(ByteBuf byteBuf) {
        return new ObjectOutputStream(new ByteBufOutputStream(byteBuf));
    }

    protected ObjectInput createInput(ByteBuf byteBuf) {
        return new ObjectInputStream(new ByteBufInputStream(byteBuf, true));
    }
}
