package org.jetlinks.supports.device.session;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/jetlinks/supports/device/session/AbstractDeviceSessionManager.class */
public abstract class AbstractDeviceSessionManager implements DeviceSessionManager {
    private static final Logger log = LoggerFactory.getLogger(AbstractDeviceSessionManager.class);
    private static final AtomicLongFieldUpdater<AbstractDeviceSessionManager> CLOSE_WIP = AtomicLongFieldUpdater.newUpdater(AbstractDeviceSessionManager.class, "closeWip");
    protected final Map<String, DeviceSessionRef> localSessions = new ConcurrentHashMap(2048);
    private final List<Function<DeviceSessionEvent, Mono<Void>>> sessionEventHandlers = new CopyOnWriteArrayList();
    protected final Disposable.Composite disposable = Disposables.composite();
    private Duration sessionLoadTimeout = Duration.ofSeconds(5);
    private Duration sessionCheckInterval = Duration.ofSeconds(30);
    private int sessionCheckConcurrency = Integer.getInteger("jetlinks.session.check.concurrency", Runtime.getRuntime().availableProcessors() * 64).intValue();
    private int sessionCloseConcurrency = Integer.getInteger("jetlinks.session.close.concurrency", 3000).intValue();
    protected Sinks.Many<DeviceSession> closeSink = Reactors.createMany();
    private volatile long closeWip = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/jetlinks/supports/device/session/AbstractDeviceSessionManager$DeviceSessionRef.class */
    public static class DeviceSessionRef {
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, Mono> LOADER = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, Mono.class, "loader");
        private static final AtomicReferenceFieldUpdater<DeviceSessionRef, Sinks.One> AWAIT = AtomicReferenceFieldUpdater.newUpdater(DeviceSessionRef.class, Sinks.One.class, "await");
        private final AbstractDeviceSessionManager manager;
        private volatile Sinks.One<DeviceSession> await;
        public final String deviceId;
        public volatile DeviceSession loaded;
        protected volatile Mono<DeviceSession> loader;
        private volatile Disposable disposable;
        private volatile Set<String> children;

        public Set<String> children() {
            if (this.children != null) {
                return this.children;
            }
            synchronized (this) {
                if (this.children != null) {
                    return this.children;
                }
                ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
                this.children = newKeySet;
                return newKeySet;
            }
        }

        public void removeChild(String str) {
            if (this.children != null) {
                this.children.remove(str);
            }
        }

        public DeviceSessionRef(String str, AbstractDeviceSessionManager abstractDeviceSessionManager, Mono<DeviceSession> mono) {
            this.deviceId = str;
            this.manager = abstractDeviceSessionManager;
            update(mono);
        }

        public DeviceSessionRef(String str, AbstractDeviceSessionManager abstractDeviceSessionManager, DeviceSession deviceSession) {
            this.deviceId = str;
            this.manager = abstractDeviceSessionManager;
            this.loaded = deviceSession;
            this.await = Sinks.one();
            this.await.tryEmitValue(deviceSession);
        }

        public void update(Function<Mono<DeviceSession>, Mono<DeviceSession>> function) {
            update(function.apply(Mono.fromSupplier(() -> {
                return this.loaded;
            })));
        }

        public void update(Mono<DeviceSession> mono) {
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            Sinks.One andSet = AWAIT.getAndSet(this, Sinks.one());
            if (andSet != null) {
                andSet.tryEmitEmpty();
            }
            this.loader = mono.flatMap(this::handleLoaded).timeout(this.manager.sessionLoadTimeout, Mono.error(() -> {
                return new TimeoutException("device [" + this.deviceId + "] session load timeout");
            })).switchIfEmpty(Mono.fromRunnable(this::loadEmpty)).doOnError(this::loadError).doOnNext(this::afterLoaded);
        }

        private void handleParentChanged(DeviceSession deviceSession, DeviceSession deviceSession2) {
            DeviceSessionRef deviceSessionRef = this.manager.localSessions.get(deviceSession.getDeviceId());
            DeviceSessionRef deviceSessionRef2 = this.manager.localSessions.get(deviceSession2.getDeviceId());
            if (null != deviceSessionRef) {
                deviceSessionRef.removeChild(this.deviceId);
            }
            if (null != deviceSessionRef2) {
                deviceSessionRef2.children().add(this.deviceId);
            }
        }

        private Mono<DeviceSession> handleLoaded(DeviceSession deviceSession) {
            DeviceSession deviceSession2 = this.loaded;
            this.loaded = deviceSession;
            await().tryEmitValue(deviceSession);
            handleParent(deviceSessionRef -> {
                deviceSessionRef.children().add(deviceSession.getDeviceId());
            });
            if (deviceSession.isWrapFrom(ChildrenDeviceSession.class)) {
                deviceSession.unwrap(ChildrenDeviceSession.class).doOnParentChanged(this::handleParentChanged);
            }
            return deviceSession2 == null ? this.manager.doRegister(deviceSession).then(this.manager.handleSessionCompute0(null, deviceSession)) : this.manager.handleSessionCompute0(deviceSession2, deviceSession);
        }

        private void afterLoaded(DeviceSession deviceSession) {
            if (!deviceSession.equals(this.loaded)) {
                this.loaded.close();
            }
            this.loaded = deviceSession;
        }

        protected void handleParent(Consumer<DeviceSessionRef> consumer) {
            DeviceSessionRef deviceSessionRef;
            if (!this.loaded.isWrapFrom(ChildrenDeviceSession.class) || null == (deviceSessionRef = this.manager.localSessions.get(this.loaded.unwrap(ChildrenDeviceSession.class).getParent().getDeviceId()))) {
                return;
            }
            consumer.accept(deviceSessionRef);
        }

        protected Mono<Void> checkChildren() {
            if (this.children == null) {
                return Mono.empty();
            }
            Flux fromIterable = Flux.fromIterable(this.children);
            AbstractDeviceSessionManager abstractDeviceSessionManager = this.manager;
            abstractDeviceSessionManager.getClass();
            return fromIterable.flatMap(str -> {
                return abstractDeviceSessionManager.checkSessionAlive(str);
            }).then();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<Long> close(DeviceSession deviceSession) {
            if (this.loaded != deviceSession || !this.manager.localSessions.remove(this.deviceId, this)) {
                return Reactors.ALWAYS_ZERO_LONG;
            }
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            return doClose(this.loaded);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<Long> close() {
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            DeviceSession deviceSession = this.loaded;
            return deviceSession != null ? doClose(deviceSession) : Reactors.ALWAYS_ZERO_LONG;
        }

        private Mono<Long> doClose(DeviceSession deviceSession) {
            handleParent(deviceSessionRef -> {
                deviceSessionRef.removeChild(deviceSession.getDeviceId());
            });
            return this.manager.closeSession(deviceSession).then(checkChildren()).then(Reactors.ALWAYS_ONE_LONG);
        }

        private void loadError(Throwable th) {
            if (this.loaded != null) {
                this.loaded.close();
            }
            await().tryEmitError(th);
            this.manager.localSessions.remove(this.deviceId, this);
        }

        private void loadEmpty() {
            if (this.loaded != null) {
                this.loaded.close();
            }
            await().tryEmitEmpty();
            this.manager.localSessions.remove(this.deviceId, this);
        }

        private void tryLoad(ContextView contextView) {
            Mono andSet = LOADER.getAndSet(this, null);
            if (andSet != null) {
                this.disposable = andSet.contextWrite(contextView).subscribe();
            }
        }

        public Mono<DeviceSession> ref() {
            return Mono.deferContextual(contextView -> {
                tryLoad(contextView);
                return await().asMono();
            });
        }

        private Sinks.One<DeviceSession> await() {
            return AWAIT.get(this);
        }
    }

    public abstract String getCurrentServerId();

    protected abstract Mono<Boolean> initSessionConnection(DeviceSession deviceSession);

    protected abstract Mono<Long> removeRemoteSession(String str);

    protected abstract Mono<Long> getRemoteTotalSessions();

    protected abstract Mono<Boolean> remoteSessionIsAlive(String str);

    protected abstract Mono<Boolean> checkRemoteSessionIsAlive(String str);

    protected abstract Flux<DeviceSessionInfo> remoteSessions(String str);

    public void init() {
        Scheduler newSingle = Schedulers.newSingle("device-session-checker");
        this.disposable.add(newSingle);
        this.disposable.add(Flux.interval(this.sessionCheckInterval, newSingle).onBackpressureDrop().concatMap(l -> {
            return executeInterval();
        }).subscribe());
        this.disposable.add(this.closeSink.asFlux().bufferTimeout(1000, Duration.ofSeconds(1L)).onBackpressureBuffer().concatMap(list -> {
            return Flux.fromIterable(list).filter(deviceSession -> {
                return !this.localSessions.containsKey(deviceSession.getDeviceId());
            }).flatMap(this::closeSessionSafe).then();
        }, 0).subscribe());
    }

    protected Mono<Void> executeInterval() {
        return checkSession().onErrorResume(th -> {
            return Mono.empty();
        });
    }

    public void shutdown() {
        this.disposable.dispose();
    }

    public Mono<DeviceSession> getSession(String str) {
        return getSession(str, true);
    }

    public Mono<DeviceSession> getSession(String str, boolean z) {
        DeviceSessionRef deviceSessionRef;
        if (!StringUtils.isEmpty(str) && (deviceSessionRef = this.localSessions.get(str)) != null) {
            return z ? deviceSessionRef.ref().filterWhen(this::checkSessionAlive) : deviceSessionRef.ref();
        }
        return Mono.empty();
    }

    public Flux<DeviceSession> getSessions() {
        return Flux.fromIterable(this.localSessions.values()).flatMap((v0) -> {
            return v0.ref();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Boolean> checkSessionAlive(String str) {
        DeviceSessionRef deviceSessionRef = this.localSessions.get(str);
        return (deviceSessionRef == null || deviceSessionRef.loaded == null) ? Reactors.ALWAYS_FALSE : checkSessionAlive(deviceSessionRef.loaded);
    }

    private Mono<Boolean> checkSessionAlive(DeviceSession deviceSession) {
        return deviceSession == null ? Reactors.ALWAYS_FALSE : deviceSession.isAliveAsync().defaultIfEmpty(true).flatMap(bool -> {
            return !bool.booleanValue() ? removeLocalSession(deviceSession).thenReturn(false) : Reactors.ALWAYS_TRUE;
        });
    }

    public final Mono<Long> remove(String str, boolean z) {
        return z ? removeLocalSession(str) : Flux.merge(new Publisher[]{removeLocalSession(str), removeRemoteSession(str)}).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        });
    }

    public final Mono<Boolean> isAlive(String str, boolean z) {
        Mono<Boolean> hasElement = getSession(str).hasElement();
        return z ? hasElement : hasElement.flatMap(bool -> {
            return bool.booleanValue() ? Reactors.ALWAYS_TRUE : remoteSessionIsAlive(str);
        });
    }

    public Mono<Boolean> checkAlive(String str, boolean z) {
        Mono<Boolean> checkLocalAlive = checkLocalAlive(str);
        return z ? checkLocalAlive : checkLocalAlive.flatMap(bool -> {
            return bool.booleanValue() ? Reactors.ALWAYS_TRUE : checkRemoteSessionIsAlive(str);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Mono<Boolean> checkLocalAlive(String str) {
        return getSession(str).flatMap(deviceSession -> {
            return deviceSession.getOperator() == null ? Reactors.ALWAYS_FALSE : syncConnectionInfo(deviceSession.getOperator(), deviceSession);
        }).defaultIfEmpty(false);
    }

    protected final Mono<Boolean> syncConnectionInfo(DeviceOperator deviceOperator, DeviceSession deviceSession) {
        return deviceOperator.online(getCurrentServerId(), (String) deviceSession.getClientAddress().map((v0) -> {
            return String.valueOf(v0);
        }).orElse(""), -1L).thenReturn(true);
    }

    public final Mono<Long> totalSessions(boolean z) {
        Mono<Long> just = Mono.just(Long.valueOf(this.localSessions.size()));
        return z ? just : Mono.zip(just, getRemoteTotalSessions(), (v0, v1) -> {
            return Math.addExact(v0, v1);
        });
    }

    public final Flux<DeviceSessionInfo> getSessionInfo() {
        return Flux.concat(new Publisher[]{getLocalSessionInfo(), remoteSessions(null)});
    }

    public final Flux<DeviceSessionInfo> getSessionInfo(String str) {
        return getCurrentServerId().equals(str) ? getLocalSessionInfo() : remoteSessions(str);
    }

    public final Flux<DeviceSessionInfo> getLocalSessionInfo() {
        return Flux.fromIterable(this.localSessions.values()).mapNotNull(deviceSessionRef -> {
            return deviceSessionRef.loaded;
        }).map(deviceSession -> {
            return DeviceSessionInfo.of(getCurrentServerId(), deviceSession);
        });
    }

    public Mono<DeviceSession> compute(@Nonnull String str, Mono<DeviceSession> mono, Function<DeviceSession, Mono<DeviceSession>> function) {
        DeviceSessionRef compute = this.localSessions.compute(str, (str2, deviceSessionRef) -> {
            if (deviceSessionRef == null) {
                if (mono == null) {
                    return null;
                }
                return new DeviceSessionRef(str2, this, (Mono<DeviceSession>) mono);
            }
            if (function == null) {
                return deviceSessionRef;
            }
            deviceSessionRef.update(mono2 -> {
                return mono2.flatMap(function);
            });
            return deviceSessionRef;
        });
        return compute == null ? Mono.empty() : compute.ref();
    }

    public final Mono<DeviceSession> compute(@Nonnull String str, @Nonnull Function<Mono<DeviceSession>, Mono<DeviceSession>> function) {
        return this.localSessions.compute(str, (str2, deviceSessionRef) -> {
            if (deviceSessionRef == null) {
                return new DeviceSessionRef(str2, this, (Mono<DeviceSession>) function.apply(Mono.empty()));
            }
            deviceSessionRef.update((Function<Mono<DeviceSession>, Mono<DeviceSession>>) function);
            return deviceSessionRef;
        }).ref();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<DeviceSession> handleSessionCompute0(DeviceSession deviceSession, DeviceSession deviceSession2) {
        if (deviceSession == null || !deviceSession.isChanged(deviceSession2) || deviceSession2.getOperator() == null) {
            return handleSessionCompute(deviceSession, deviceSession2);
        }
        log.info("device [{}] session [{}] changed to [{}]", new Object[]{deviceSession.getDeviceId(), deviceSession, deviceSession2});
        deviceSession.close();
        return deviceSession2.getOperator().online(getCurrentServerId(), (String) deviceSession2.getClientAddress().map((v0) -> {
            return v0.toString();
        }).orElse(null), -1L).then(handleSessionCompute(deviceSession, deviceSession2));
    }

    protected Mono<DeviceSession> handleSessionCompute(DeviceSession deviceSession, DeviceSession deviceSession2) {
        return Mono.just(deviceSession2);
    }

    protected final Mono<Void> closeSessionSafe(DeviceSession deviceSession) {
        return closeSession0(deviceSession).onErrorResume(th -> {
            log.warn("close session [{}] error", deviceSession.getDeviceId(), th);
            return Mono.empty();
        });
    }

    private Mono<Void> closeSession0(DeviceSession deviceSession) {
        try {
            deviceSession.close();
        } catch (Throwable th) {
        }
        if (deviceSession.getOperator() != null) {
            return initSessionConnection(deviceSession).flatMap(bool -> {
                if (bool.booleanValue() || this.localSessions.containsKey(deviceSession.getDeviceId())) {
                    log.info("device [{}] session [{}] closed,but session still exists!", deviceSession.getDeviceId(), deviceSession);
                    return fireEvent(DeviceSessionEvent.of(DeviceSessionEvent.Type.unregister, deviceSession, true));
                }
                log.info("device [{}] session [{}] closed", deviceSession.getDeviceId(), deviceSession);
                return deviceSession.getOperator().offline().then(fireEvent(DeviceSessionEvent.of(DeviceSessionEvent.Type.unregister, deviceSession, false)));
            }).doAfterTerminate(() -> {
                CLOSE_WIP.decrementAndGet(this);
            });
        }
        CLOSE_WIP.decrementAndGet(this);
        return Mono.empty();
    }

    protected final Mono<Void> closeSession(DeviceSession deviceSession) {
        return (CLOSE_WIP.incrementAndGet(this) <= ((long) this.sessionCloseConcurrency) || !this.closeSink.tryEmitNext(deviceSession).isSuccess()) ? closeSession0(deviceSession) : Mono.empty();
    }

    private Mono<Long> removeLocalSession(DeviceSession deviceSession) {
        DeviceSessionRef deviceSessionRef = this.localSessions.get(deviceSession.getDeviceId());
        return null == deviceSessionRef ? Reactors.ALWAYS_ZERO_LONG : deviceSessionRef.close(deviceSession);
    }

    protected final Mono<Long> removeLocalSession(String str) {
        DeviceSessionRef remove = this.localSessions.remove(str);
        return remove != null ? remove.close() : Reactors.ALWAYS_ZERO_LONG;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<DeviceSession> doRegister(DeviceSession deviceSession) {
        return deviceSession.getOperator() == null ? Mono.empty() : remoteSessionIsAlive(deviceSession.getDeviceId()).flatMap(bool -> {
            return deviceSession.getOperator().online(getCurrentServerId(), (String) deviceSession.getClientAddress().map((v0) -> {
                return v0.toString();
            }).orElse(null), bool.booleanValue() ? -1L : deviceSession.connectTime()).then(fireEvent(DeviceSessionEvent.of(DeviceSessionEvent.Type.register, deviceSession, bool.booleanValue())));
        }).thenReturn(deviceSession);
    }

    protected Mono<Void> fireEvent(DeviceSessionEvent deviceSessionEvent) {
        return this.sessionEventHandlers.isEmpty() ? Mono.empty() : Flux.fromIterable(this.sessionEventHandlers).flatMap(function -> {
            return Mono.defer(() -> {
                return (Mono) function.apply(deviceSessionEvent);
            }).onErrorResume(th -> {
                log.error("fire session event error {}", deviceSessionEvent, th);
                return Mono.empty();
            });
        }).then();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Boolean> doInit(String str) {
        DeviceSession deviceSession;
        DeviceOperator operator;
        DeviceSessionRef deviceSessionRef = this.localSessions.get(str);
        return (deviceSessionRef == null || (deviceSession = deviceSessionRef.loaded) == null || (operator = deviceSessionRef.loaded.getOperator()) == null) ? Mono.empty() : operator.online(getCurrentServerId(), (String) deviceSession.getClientAddress().map((v0) -> {
            return String.valueOf(v0);
        }).orElse(""), -1L).thenReturn(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Long> removeFromCluster(String str) {
        DeviceSessionRef remove = this.localSessions.remove(str);
        if (remove != null) {
            remove.disposable.dispose();
            DeviceSession deviceSession = remove.loaded;
            if (remove.loaded != null) {
                deviceSession.close();
                if (deviceSession.getOperator() == null) {
                    return Reactors.ALWAYS_ONE_LONG;
                }
                Mono connectionServerId = deviceSession.getOperator().getConnectionServerId();
                String currentServerId = getCurrentServerId();
                currentServerId.getClass();
                return connectionServerId.map((v1) -> {
                    return r1.equals(v1);
                }).defaultIfEmpty(false).flatMap(bool -> {
                    Mono empty = Mono.empty();
                    if (bool.booleanValue()) {
                        empty = deviceSession.getOperator().offline().then();
                    }
                    return empty.then(fireEvent(DeviceSessionEvent.of(DeviceSessionEvent.Type.unregister, deviceSession, !bool.booleanValue())));
                }).thenReturn(1L);
            }
        }
        return Reactors.ALWAYS_ZERO_LONG;
    }

    public Disposable listenEvent(Function<DeviceSessionEvent, Mono<Void>> function) {
        this.sessionEventHandlers.add(function);
        return () -> {
            this.sessionEventHandlers.remove(function);
        };
    }

    protected Mono<Void> checkSession() {
        return Flux.fromIterable(this.localSessions.values()).filter(deviceSessionRef -> {
            return deviceSessionRef.loaded != null;
        }).flatMap(deviceSessionRef2 -> {
            return checkSessionAlive(deviceSessionRef2.loaded).onErrorResume(th -> {
                log.warn("check session alive error", th);
                return Mono.empty();
            });
        }, this.sessionCheckConcurrency).then();
    }

    public void setSessionLoadTimeout(Duration duration) {
        this.sessionLoadTimeout = duration;
    }

    public Duration getSessionLoadTimeout() {
        return this.sessionLoadTimeout;
    }

    public Duration getSessionCheckInterval() {
        return this.sessionCheckInterval;
    }

    public void setSessionCheckInterval(Duration duration) {
        this.sessionCheckInterval = duration;
    }

    public void setSessionCheckConcurrency(int i) {
        this.sessionCheckConcurrency = i;
    }

    public void setSessionCloseConcurrency(int i) {
        this.sessionCloseConcurrency = i;
    }
}
