package org.jetlinks.supports.config;

import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.Value;
import org.jetlinks.core.Values;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.utils.Reactors;
import org.springframework.util.CollectionUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/supports/config/LocalCacheClusterConfigStorage.class */
public class LocalCacheClusterConfigStorage implements ConfigStorage {
    private final Map<String, Cache> caches = new NonBlockingHashMap();
    private final String id;
    private final EventBus eventBus;
    private final ClusterCache<String, Object> clusterCache;
    private long expires;
    private final Runnable doOnClear;
    private static final AtomicReferenceFieldUpdater<Cache, Mono> CACHE_REF = AtomicReferenceFieldUpdater.newUpdater(Cache.class, Mono.class, "ref");
    private static final AtomicIntegerFieldUpdater<Cache> CACHE_VERSION = AtomicIntegerFieldUpdater.newUpdater(Cache.class, "version");
    private static final AtomicReferenceFieldUpdater<Cache, Disposable> CACHE_LOADER = AtomicReferenceFieldUpdater.newUpdater(Cache.class, Disposable.class, "loader");
    public static final Value NULL = Value.simple((Object) null);

    /* loaded from: input_file:org/jetlinks/supports/config/LocalCacheClusterConfigStorage$Cache.class */
    public class Cache {
        final String key;
        long t;
        volatile int version;
        volatile Value cached;
        volatile Mono<Value> ref;
        Sinks.One<Value> sink;
        volatile Disposable loader;

        public Cache(String str) {
            this.key = str;
            updateTime();
        }

        boolean isExpired() {
            return LocalCacheClusterConfigStorage.this.expires > 0 && System.currentTimeMillis() - this.t > LocalCacheClusterConfigStorage.this.expires;
        }

        Mono<Value> getRef() {
            if (isExpired() || this.ref == null) {
                reload();
            }
            return this.ref;
        }

        public Value getCached() {
            if (isExpired()) {
                return null;
            }
            return this.cached;
        }

        public Object getCachedValue() {
            Value cached = getCached();
            if (cached == null) {
                return null;
            }
            return cached.get();
        }

        void updateTime() {
            if (LocalCacheClusterConfigStorage.this.expires > 0) {
                this.t = System.currentTimeMillis();
            }
        }

        void setValue(Object obj) {
            setValue(obj == null ? null : Value.simple(obj));
        }

        void setValue(Value value) {
            updateTime();
            LocalCacheClusterConfigStorage.CACHE_VERSION.incrementAndGet(this);
            this.ref = Mono.justOrEmpty(value);
            this.cached = value == null ? LocalCacheClusterConfigStorage.NULL : value;
            dispose();
        }

        synchronized void reload() {
            this.cached = null;
            dispose();
            int i = this.version;
            this.sink = Sinks.one();
            LocalCacheClusterConfigStorage.CACHE_REF.set(this, this.sink.asMono());
            this.loader = LocalCacheClusterConfigStorage.this.clusterCache.get(this.key).switchIfEmpty(Mono.fromRunnable(() -> {
                if (i == this.version) {
                    setValue((Value) null);
                } else {
                    clear();
                }
            })).subscribe(obj -> {
                if (this.version == i) {
                    setValue(obj);
                } else {
                    clear();
                }
            }, th -> {
                clear();
                this.sink.tryEmitError(th);
            });
        }

        void clear() {
            dispose();
            this.cached = null;
            LocalCacheClusterConfigStorage.CACHE_VERSION.incrementAndGet(this);
            LocalCacheClusterConfigStorage.CACHE_REF.set(this, null);
        }

        void dispose() {
            Disposable disposable = (Disposable) LocalCacheClusterConfigStorage.CACHE_LOADER.getAndSet(this, null);
            if (null != disposable) {
                disposable.dispose();
            }
            Sinks.One<Value> one = this.sink;
            this.sink = null;
            if (one != null) {
                Value value = this.cached != null ? this.cached : null;
                if (value == null || value.get() == null) {
                    one.tryEmitEmpty();
                } else {
                    one.tryEmitValue(value);
                }
            }
        }
    }

    private Cache createCache(String str) {
        return new Cache(str);
    }

    private Cache getOrCreateCache(String str) {
        return this.caches.computeIfAbsent(str, this::createCache);
    }

    public Mono<Value> getConfig(String str) {
        return getOrCreateCache(str).getRef();
    }

    Values wrapCache(Collection<String> collection) {
        return Values.of(Maps.filterEntries(Maps.transformValues(this.caches, (v0) -> {
            return v0.getCachedValue();
        }), entry -> {
            return entry.getValue() != null && collection.contains(entry.getKey());
        }));
    }

    public Mono<Values> getConfigs(Collection<String> collection) {
        int i = 0;
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            if (getOrCreateCache(it.next()).getCached() != null) {
                i++;
            }
        }
        if (i == collection.size()) {
            return Mono.just(wrapCache(collection));
        }
        Values wrapCache = wrapCache(collection);
        HashSet hashSet = new HashSet(collection);
        hashSet.removeAll(wrapCache.getAllValues().keySet());
        if (hashSet.isEmpty()) {
            return Mono.just(wrapCache);
        }
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(this.caches.size());
        for (Map.Entry<String, Cache> entry : this.caches.entrySet()) {
            newHashMapWithExpectedSize.put(entry.getKey(), Integer.valueOf(entry.getValue().version));
        }
        return this.clusterCache.get(hashSet).reduce(new HashMap(), (map, entry2) -> {
            String str = (String) entry2.getKey();
            Object value = entry2.getValue();
            Cache orCreateCache = getOrCreateCache(str);
            updateValue(orCreateCache, ((Integer) newHashMapWithExpectedSize.getOrDefault(str, Integer.valueOf(orCreateCache.version))).intValue(), value);
            if (null != value) {
                map.put(str, orCreateCache);
            }
            return map;
        }).defaultIfEmpty(Collections.emptyMap()).doOnNext(map2 -> {
            hashSet.removeAll(map2.keySet());
            if (hashSet.size() > 0) {
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    String str = (String) it2.next();
                    Cache orCreateCache = getOrCreateCache(str);
                    updateValue(orCreateCache, ((Integer) newHashMapWithExpectedSize.getOrDefault(str, Integer.valueOf(orCreateCache.version))).intValue(), null);
                }
            }
        }).thenReturn(wrapCache);
    }

    private void updateValue(Cache cache, int i, Object obj) {
        if (cache == null || cache.version != i) {
            return;
        }
        cache.setValue(obj);
    }

    public Mono<Boolean> setConfigs(Map<String, Object> map) {
        if (CollectionUtils.isEmpty(map)) {
            return Reactors.ALWAYS_TRUE;
        }
        map.forEach((str, obj) -> {
            getOrCreateCache(str).setValue(obj);
        });
        return this.clusterCache.putAll(map).then(notify(CacheNotify.expires(this.id, map.keySet()))).thenReturn(true);
    }

    public Mono<Boolean> setConfig(String str, Object obj) {
        if (str == null) {
            return Reactors.ALWAYS_FALSE;
        }
        if (obj == null) {
            return remove(str);
        }
        getOrCreateCache(str).setValue(obj);
        return this.clusterCache.put(str, obj).then(notifyRemoveKey(str)).thenReturn(true);
    }

    public Mono<Boolean> remove(String str) {
        return this.clusterCache.remove(str).then(notifyRemoveKey(str)).thenReturn(true);
    }

    public Mono<Value> getAndRemove(String str) {
        return this.clusterCache.getAndRemove(str).flatMap(obj -> {
            return notify(CacheNotify.expires(this.id, Collections.singleton(str))).thenReturn(obj);
        }).map(Value::simple);
    }

    public Mono<Boolean> remove(Collection<String> collection) {
        return this.clusterCache.remove(collection).then(notify(CacheNotify.expires(this.id, collection))).thenReturn(true);
    }

    public Mono<Boolean> clear() {
        return this.clusterCache.clear().then(notify(CacheNotify.clear(this.id))).thenReturn(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearLocalCache(CacheNotify cacheNotify) {
        if (CollectionUtils.isEmpty(cacheNotify.getKeys())) {
            this.caches.clear();
        } else {
            Collection<String> keys = cacheNotify.getKeys();
            Map<String, Cache> map = this.caches;
            map.getClass();
            keys.forEach((v1) -> {
                r1.remove(v1);
            });
        }
        if (!cacheNotify.isClear() || this.doOnClear == null) {
            return;
        }
        this.doOnClear.run();
    }

    Mono<Void> notify(CacheNotify cacheNotify) {
        clearLocalCache(cacheNotify);
        return this.eventBus.publish("/_sys/cluster_cache", cacheNotify).then();
    }

    Mono<Void> notifyRemoveKey(String str) {
        return notify(CacheNotify.expires(this.id, Collections.singleton(str)));
    }

    public Mono<Void> refresh() {
        return notify(CacheNotify.expiresAll(this.id));
    }

    public Mono<Void> refresh(Collection<String> collection) {
        return notify(CacheNotify.expires(this.id, collection));
    }

    public LocalCacheClusterConfigStorage(String str, EventBus eventBus, ClusterCache<String, Object> clusterCache, long j, Runnable runnable) {
        this.id = str;
        this.eventBus = eventBus;
        this.clusterCache = clusterCache;
        this.expires = j;
        this.doOnClear = runnable;
    }
}
