package org.jetlinks.supports.cluster.redis;

import java.util.concurrent.atomic.AtomicBoolean;
import org.jetlinks.core.cluster.ClusterTopic;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/cluster/redis/RedisClusterTopic.class */
public class RedisClusterTopic<T> implements ClusterTopic<T> {
    private final String topicName;
    private final ReactiveRedisOperations<Object, T> operations;
    private Disposable disposable;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final FluxProcessor<ClusterTopic.TopicMessage<T>, ClusterTopic.TopicMessage<T>> processor = EmitterProcessor.create(false);
    private final FluxSink<ClusterTopic.TopicMessage<T>> sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);

    public RedisClusterTopic(String str, ReactiveRedisOperations<Object, T> reactiveRedisOperations) {
        this.topicName = str;
        this.operations = reactiveRedisOperations;
    }

    private void doSubscribe() {
        if (this.subscribed.compareAndSet(false, true)) {
            this.disposable = this.operations.listenToPattern(new String[]{this.topicName}).subscribe(message -> {
                if (this.processor.hasDownstreams()) {
                    this.sink.next(new ClusterTopic.TopicMessage<T>() { // from class: org.jetlinks.supports.cluster.redis.RedisClusterTopic.1
                        public String getTopic() {
                            return (String) message.getChannel();
                        }

                        public T getMessage() {
                            return (T) message.getMessage();
                        }
                    });
                } else {
                    this.disposable.dispose();
                    this.subscribed.compareAndSet(true, false);
                }
            });
        }
    }

    public Flux<ClusterTopic.TopicMessage<T>> subscribePattern() {
        return this.processor.doOnSubscribe(subscription -> {
            doSubscribe();
        });
    }

    public Mono<Integer> publish(Publisher<? extends T> publisher) {
        return Flux.from(publisher).flatMap(obj -> {
            return this.operations.convertAndSend(this.topicName, obj);
        }).last(1L).map((v0) -> {
            return v0.intValue();
        });
    }
}
