package org.jetlinks.supports.cluster;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.cluster.AbstractDeviceOperationBroker;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

@Deprecated
/* loaded from: input_file:org/jetlinks/supports/cluster/ClusterDeviceOperationBroker.class */
public class ClusterDeviceOperationBroker extends AbstractDeviceOperationBroker {
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceOperationBroker.class);
    private static final String QUALIFIER_REPLY = "cdob_r";
    private static final String QUALIFIER_SEND = "cdob_s";
    final ExtendedCluster cluster;
    final DeviceSessionManager sessionManager;
    private final Map<AbstractDeviceOperationBroker.AwaitKey, RepayableDeviceMessage<?>> awaits = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(5)).removalListener(removalNotification -> {
        if (removalNotification.getCause() == RemovalCause.EXPIRED) {
            try {
                log.debug("discard await reply message[{}] message,{}", removalNotification.getKey(), removalNotification.getValue());
            } catch (Throwable th) {
            }
        }
    }).build().asMap();
    private final Sinks.Many<Message> sendToDevice = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);

    /* loaded from: input_file:org/jetlinks/supports/cluster/ClusterDeviceOperationBroker$ClusterHandler.class */
    class ClusterHandler implements ClusterMessageHandler {
        ClusterHandler() {
        }

        public void onGossip(io.scalecube.cluster.transport.api.Message message) {
            onMessage(message);
        }

        public void onMessage(io.scalecube.cluster.transport.api.Message message) {
            if (ClusterDeviceOperationBroker.QUALIFIER_SEND.equals(message.qualifier())) {
                TraceHolder.copyContext(message.headers(), (Message) message.data(), (v0, v1, v2) -> {
                    v0.addHeader(v1, v2);
                });
                ClusterDeviceOperationBroker.this.handleSendToDevice((Message) message.data()).contextWrite(TraceHolder.readToContext(Context.empty(), message.headers())).subscribe();
            } else if (ClusterDeviceOperationBroker.QUALIFIER_REPLY.equals(message.qualifier())) {
                TraceHolder.copyContext(message.headers(), (Message) message.data(), (v0, v1, v2) -> {
                    v0.addHeader(v1, v2);
                });
                ClusterDeviceOperationBroker.this.handleReply((DeviceMessageReply) message.data());
            }
        }
    }

    public ClusterDeviceOperationBroker(ExtendedCluster extendedCluster, DeviceSessionManager deviceSessionManager) {
        this.cluster = extendedCluster;
        this.sessionManager = deviceSessionManager;
        extendedCluster.handler(extendedCluster2 -> {
            return new ClusterHandler();
        });
    }

    private String currentServerId() {
        return this.sessionManager.getCurrentServerId();
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Flux<DeviceStateInfo> getDeviceState(String str, Collection<String> collection) {
        return Flux.fromIterable(collection).flatMap(str2 -> {
            return this.sessionManager.checkAlive(str2, false).map(bool -> {
                return new DeviceStateInfo(str2, bool.booleanValue() ? (byte) 1 : (byte) -1);
            });
        });
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Flux<DeviceMessageReply> handleReply(String str, String str2, Duration duration) {
        return super.handleReply(str, str2, duration).doOnCancel(() -> {
            this.awaits.remove(getAwaitReplyKey(str, str2));
        });
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Mono<Integer> send(String str, Publisher<? extends Message> publisher) {
        if (currentServerId().equals(str)) {
            return Flux.from(publisher).flatMap(this::handleSendToDevice).then(Reactors.ALWAYS_ONE);
        }
        Member member = getMember(str);
        return null == member ? Reactors.ALWAYS_ZERO : Flux.from(publisher).flatMap(message -> {
            message.addHeader(Headers.sendFrom, this.sessionManager.getCurrentServerId());
            addAwaitReplyKey(message);
            return this.cluster.send(member, io.scalecube.cluster.transport.api.Message.builder().qualifier(QUALIFIER_SEND).data(message).build());
        }).then(Reactors.ALWAYS_ONE);
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> publisher) {
        return Reactors.ALWAYS_ZERO;
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Flux<Message> handleSendToDeviceMessage(String str) {
        return this.sendToDevice.asFlux();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> handleSendToDevice(Message message) {
        addAwaitReplyKey(message);
        if (this.sendToDevice.currentSubscriberCount() == 0) {
            log.warn("no handler for message {}", message);
            return doReply(createReply(message).error(ErrorCode.SYSTEM_ERROR));
        }
        try {
            this.sendToDevice.emitNext(message, Reactors.emitFailureHandler());
            return Mono.empty();
        } catch (Throwable th) {
            return doReply(createReply(message).error(th));
        }
    }

    private void addAwaitReplyKey(Message message) {
        if (!(message instanceof RepayableDeviceMessage) || ((Boolean) message.getHeader(Headers.sendAndForget).orElse(false)).booleanValue()) {
            return;
        }
        RepayableDeviceMessage<?> repayableDeviceMessage = (RepayableDeviceMessage) message;
        this.awaits.put(getAwaitReplyKey(repayableDeviceMessage), repayableDeviceMessage);
    }

    private DeviceMessageReply createReply(Message message) {
        return message instanceof RepayableDeviceMessage ? ((RepayableDeviceMessage) message).newReply() : new CommonDeviceMessageReply().messageId(message.getMessageId());
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    protected Mono<Void> doReply(DeviceMessageReply deviceMessageReply) {
        RepayableDeviceMessage<?> remove = this.awaits.remove(getAwaitReplyKey(deviceMessageReply));
        Member member = null;
        if (null != remove) {
            member = (Member) remove.getHeader(Headers.sendFrom).map(this::getMember).orElse(null);
        }
        Function function = member2 -> {
            return this.cluster.send(member2, io.scalecube.cluster.transport.api.Message.builder().qualifier(QUALIFIER_REPLY).data(deviceMessageReply).build());
        };
        return null != member ? (Mono) function.apply(member) : Flux.fromIterable(this.cluster.otherMembers()).flatMap(function).then();
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Disposable handleGetDeviceState(String str, Function<Publisher<String>, Flux<DeviceStateInfo>> function) {
        return Disposables.disposed();
    }

    public Member getMember(String str) {
        for (Member member : this.cluster.otherMembers()) {
            if (Objects.equals(member.id(), str) || Objects.equals(member.alias(), str)) {
                return member;
            }
        }
        return null;
    }
}
