package org.springframework.integration.channel;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/springframework/integration/channel/FluxMessageChannel.class */
public class FluxMessageChannel extends AbstractMessageChannel implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
    private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);
    private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
    private final EmitterProcessor<Message<?>> processor = EmitterProcessor.create(1, false);
    private final FluxSink<Message<?>> sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);

    @Override // org.springframework.integration.channel.AbstractMessageChannel
    protected boolean doSend(Message<?> message, long j) {
        Assert.state(this.processor.hasDownstreams(), () -> {
            return "The [" + this + "] doesn't have subscribers to accept messages";
        });
        this.sink.next(message);
        return true;
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        this.processor.doFinally(signalType -> {
            this.subscribedSignal.onNext(Boolean.valueOf(this.processor.hasDownstreams()));
        }).subscribe(subscriber);
        this.subscribedSignal.onNext(Boolean.valueOf(this.processor.hasDownstreams()));
    }

    @Override // org.springframework.integration.channel.ReactiveStreamsSubscribableChannel
    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        this.upstreamSubscriptions.add(Flux.from(publisher).delaySubscription(this.subscribedSignal.filter((v0) -> {
            return v0.booleanValue();
        }).next()).publishOn(Schedulers.boundedElastic()).doOnNext(message -> {
            try {
                send(message);
            } catch (Exception e) {
                this.logger.warn("Error during processing event: " + message, e);
            }
        }).subscribe());
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.support.management.IntegrationManagement
    public void destroy() {
        this.subscribedSignal.onNext(false);
        this.upstreamSubscriptions.dispose();
        this.processor.onComplete();
        super.destroy();
    }
}
