package org.hswebframework.web.cache.supports;

import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import org.hswebframework.web.cache.ReactiveCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/hswebframework/web/cache/supports/RedisReactiveCache.class */
public class RedisReactiveCache<E> extends AbstractReactiveCache<E> {
    private static final Logger log = LoggerFactory.getLogger(RedisReactiveCache.class);
    private ReactiveRedisOperations<Object, Object> operations;
    private String redisKey;
    private ReactiveCache<E> localCache;
    private String topicName;

    public RedisReactiveCache(String str, ReactiveRedisOperations<Object, Object> reactiveRedisOperations, ReactiveCache<E> reactiveCache) {
        this.operations = reactiveRedisOperations;
        this.localCache = reactiveCache;
        this.redisKey = str;
        String str2 = "_cache_changed:" + str;
        this.topicName = str2;
        reactiveRedisOperations.listenToChannel(new String[]{str2}).map((v0) -> {
            return v0.getMessage();
        }).cast(String.class).subscribe(str3 -> {
            if (str3.equals("___all")) {
                reactiveCache.clear().subscribe();
            } else {
                reactiveCache.evict(str3).subscribe();
            }
        });
    }

    @Override // org.hswebframework.web.cache.supports.AbstractReactiveCache
    protected Mono<Object> getNow(Object obj) {
        return this.localCache.getMono(obj, () -> {
            return this.operations.opsForHash().get(this.redisKey, obj);
        });
    }

    @Override // org.hswebframework.web.cache.supports.AbstractReactiveCache
    public Mono<Void> putNow(Object obj, Object obj2) {
        return this.operations.opsForHash().put(this.redisKey, obj, obj2).then(this.localCache.evict(obj)).then(this.operations.convertAndSend(this.topicName, obj)).then();
    }

    protected <T> Mono<T> handleError(Throwable th) {
        log.error(th.getMessage(), th);
        return Mono.empty();
    }

    @Override // org.hswebframework.web.cache.supports.AbstractReactiveCache, org.hswebframework.web.cache.ReactiveCache
    public Mono<Void> evictAll(Iterable<?> iterable) {
        return this.operations.opsForHash().remove(this.redisKey, StreamSupport.stream(iterable.spliterator(), false).toArray()).then(this.localCache.evictAll(iterable)).flatMap(r6 -> {
            return Flux.fromIterable(iterable).flatMap(obj -> {
                return this.operations.convertAndSend(this.topicName, iterable);
            }).then();
        }).onErrorResume(th -> {
            return handleError(th);
        });
    }

    @Override // org.hswebframework.web.cache.supports.AbstractReactiveCache, org.hswebframework.web.cache.ReactiveCache
    public Flux<E> getAll(Object... objArr) {
        return objArr.length == 0 ? this.operations.opsForHash().values(this.redisKey).map(obj -> {
            return obj;
        }) : this.operations.opsForHash().multiGet(this.redisKey, Arrays.asList(objArr)).flatMapIterable(Function.identity()).map(obj2 -> {
            return obj2;
        }).onErrorResume(th -> {
            return handleError(th);
        });
    }

    @Override // org.hswebframework.web.cache.supports.AbstractReactiveCache, org.hswebframework.web.cache.ReactiveCache
    public Mono<Void> evict(Object obj) {
        return this.operations.opsForHash().remove(this.redisKey, new Object[]{obj}).then(this.localCache.evict(obj)).then(this.operations.convertAndSend(this.topicName, obj)).onErrorResume(th -> {
            return handleError(th);
        }).then();
    }

    @Override // org.hswebframework.web.cache.supports.AbstractReactiveCache, org.hswebframework.web.cache.ReactiveCache
    public Mono<Void> clear() {
        return this.operations.opsForHash().delete(this.redisKey).then(this.localCache.clear()).then(this.operations.convertAndSend(this.topicName, "___all")).onErrorResume(th -> {
            return handleError(th);
        }).then();
    }
}
