package org.jetlinks.supports.device.session;

import io.scalecube.services.annotations.ServiceMethod;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.ServiceEvent;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.device.session.AbstractDeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/device/session/ClusterDeviceSessionManager.class */
public class ClusterDeviceSessionManager extends AbstractDeviceSessionManager {
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceSessionManager.class);
    private final RpcManager rpcManager;
    private final Map<String, Service> services = new NonBlockingHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/supports/device/session/ClusterDeviceSessionManager$ErrorHandleService.class */
    public static class ErrorHandleService implements Service {
        private final String id;
        private final Service service;

        private void handleError(Throwable th) {
            ClusterDeviceSessionManager.log.warn("cluster[{}] session manager is failed", this.id, th);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Boolean> isAlive(String str) {
            return this.service.isAlive(str).onErrorResume(th -> {
                handleError(th);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Boolean> checkAlive(String str) {
            return this.service.checkAlive(str).onErrorResume(th -> {
                handleError(th);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Long> total() {
            return this.service.total().onErrorResume(th -> {
                handleError(th);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Boolean> init(String str) {
            return this.service.init(str).onErrorResume(th -> {
                handleError(th);
                return Reactors.ALWAYS_FALSE;
            });
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Long> remove(String str) {
            return this.service.remove(str).onErrorResume(th -> {
                handleError(th);
                return Reactors.ALWAYS_ZERO_LONG;
            });
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Flux<DeviceSessionInfo> sessions() {
            return this.service.sessions().onErrorResume(th -> {
                handleError(th);
                return Mono.empty();
            });
        }

        public ErrorHandleService(String str, Service service) {
            this.id = str;
            this.service = service;
        }
    }

    @io.scalecube.services.annotations.Service
    /* loaded from: input_file:org/jetlinks/supports/device/session/ClusterDeviceSessionManager$Service.class */
    public interface Service {
        @ServiceMethod
        Mono<Boolean> isAlive(String str);

        @ServiceMethod
        Mono<Boolean> checkAlive(String str);

        @ServiceMethod
        Mono<Long> total();

        @ServiceMethod
        Mono<Boolean> init(String str);

        @ServiceMethod
        Mono<Long> remove(String str);

        @ServiceMethod
        Flux<DeviceSessionInfo> sessions();
    }

    /* loaded from: input_file:org/jetlinks/supports/device/session/ClusterDeviceSessionManager$ServiceImpl.class */
    public static class ServiceImpl implements Service {
        private final Supplier<AbstractDeviceSessionManager> managerSupplier;

        private <T, Arg0> T doWith(Arg0 arg0, BiFunction<AbstractDeviceSessionManager, Arg0, T> biFunction, T t) {
            AbstractDeviceSessionManager abstractDeviceSessionManager = this.managerSupplier.get();
            return abstractDeviceSessionManager == null ? t : biFunction.apply(abstractDeviceSessionManager, arg0);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Boolean> checkAlive(String str) {
            return (Mono) doWith(str, (abstractDeviceSessionManager, str2) -> {
                return abstractDeviceSessionManager.checkLocalAlive(str);
            }, Reactors.ALWAYS_FALSE);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Boolean> isAlive(String str) {
            return (Mono) doWith(str, (abstractDeviceSessionManager, str2) -> {
                AbstractDeviceSessionManager.DeviceSessionRef deviceSessionRef = abstractDeviceSessionManager.localSessions.get(str);
                return deviceSessionRef == null ? Reactors.ALWAYS_FALSE : deviceSessionRef.loaded == null ? Reactors.ALWAYS_TRUE : deviceSessionRef.loaded.isAliveAsync();
            }, Reactors.ALWAYS_FALSE);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Long> total() {
            return (Mono) doWith(null, (abstractDeviceSessionManager, obj) -> {
                return abstractDeviceSessionManager.totalSessions(true);
            }, Reactors.ALWAYS_ZERO_LONG);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Boolean> init(String str) {
            return (Mono) doWith(str, (v0, v1) -> {
                return v0.doInit(v1);
            }, Reactors.ALWAYS_FALSE);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Mono<Long> remove(String str) {
            return (Mono) doWith(str, (v0, v1) -> {
                return v0.removeFromCluster(v1);
            }, Reactors.ALWAYS_ZERO_LONG);
        }

        @Override // org.jetlinks.supports.device.session.ClusterDeviceSessionManager.Service
        public Flux<DeviceSessionInfo> sessions() {
            return (Flux) doWith(null, (abstractDeviceSessionManager, obj) -> {
                return abstractDeviceSessionManager.getLocalSessionInfo();
            }, Flux.empty());
        }

        public ServiceImpl(Supplier<AbstractDeviceSessionManager> supplier) {
            this.managerSupplier = supplier;
        }
    }

    public ClusterDeviceSessionManager(RpcManager rpcManager) {
        this.rpcManager = rpcManager;
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    public void init() {
        super.init();
        this.rpcManager.registerService(new ServiceImpl(() -> {
            return this;
        }));
        this.rpcManager.getServices(Service.class).subscribe(rpcService -> {
            addService(rpcService.serverNodeId(), (Service) rpcService.service());
        });
        this.rpcManager.listen(Service.class).subscribe(serviceEvent -> {
            if (serviceEvent.getType() == ServiceEvent.Type.removed) {
                this.services.remove(serviceEvent.getServerNodeId());
            } else if (serviceEvent.getType() == ServiceEvent.Type.added) {
                this.rpcManager.getService(serviceEvent.getServerNodeId(), Service.class).subscribe(service -> {
                    addService(serviceEvent.getServerNodeId(), service);
                });
            }
        });
    }

    private void addService(String str, Service service) {
        this.services.put(str, new ErrorHandleService(str, service));
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    public final String getCurrentServerId() {
        return this.rpcManager.currentServerId();
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    protected final Mono<Boolean> initSessionConnection(DeviceSession deviceSession) {
        return this.services.isEmpty() ? Reactors.ALWAYS_FALSE : getServices().concatMap(service -> {
            return service.init(deviceSession.getDeviceId());
        }).takeUntil((v0) -> {
            return v0.booleanValue();
        }).any((v0) -> {
            return v0.booleanValue();
        });
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    protected final Mono<Long> removeRemoteSession(String str) {
        return this.services.isEmpty() ? Reactors.ALWAYS_ZERO_LONG : getServices().concatMap(service -> {
            return service.remove(str);
        }).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        });
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    protected final Mono<Long> getRemoteTotalSessions() {
        return this.services.isEmpty() ? Reactors.ALWAYS_ZERO_LONG : getServices().flatMap((v0) -> {
            return v0.total();
        }).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        });
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    protected final Mono<Boolean> remoteSessionIsAlive(String str) {
        return this.services.isEmpty() ? Reactors.ALWAYS_FALSE : getServices().flatMap(service -> {
            return service.isAlive(str);
        }).any((v0) -> {
            return v0.booleanValue();
        }).defaultIfEmpty(false);
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    protected Mono<Boolean> checkRemoteSessionIsAlive(String str) {
        return this.services.isEmpty() ? Reactors.ALWAYS_FALSE : getServices().flatMap(service -> {
            return service.checkAlive(str);
        }).any((v0) -> {
            return v0.booleanValue();
        }).defaultIfEmpty(false);
    }

    @Override // org.jetlinks.supports.device.session.AbstractDeviceSessionManager
    protected Flux<DeviceSessionInfo> remoteSessions(String str) {
        if (ObjectUtils.isEmpty(str)) {
            return getServices().flatMap((v0) -> {
                return v0.sessions();
            });
        }
        Service service = this.services.get(str);
        return service == null ? Flux.empty() : service.sessions();
    }

    private Flux<Service> getServices() {
        return Flux.fromIterable(this.services.values());
    }
}
