package org.jetlinks.core.utils;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* loaded from: input_file:org/jetlinks/core/utils/SerialFlux.class */
public class SerialFlux<T> {
    static final AtomicReferenceFieldUpdater<SerialFlux, Pending> WIP = AtomicReferenceFieldUpdater.newUpdater(SerialFlux.class, Pending.class, "wip");
    static final AtomicReferenceFieldUpdater<Pending.PendingSubscriber, CoreSubscriber> ACTUAL = AtomicReferenceFieldUpdater.newUpdater(Pending.PendingSubscriber.class, CoreSubscriber.class, "actual");
    final Queue<Pending<T>> queue;
    volatile Pending<T> wip;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/core/utils/SerialFlux$Pending.class */
    public static class Pending<T> extends FluxOperator<T, T> {
        private final SerialFlux<T> main;
        private final Pending<T>.PendingSubscriber subscriber;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/jetlinks/core/utils/SerialFlux$Pending$PendingSubscriber.class */
        public class PendingSubscriber extends BaseSubscriber<T> {
            volatile CoreSubscriber<? super T> actual = null;
            long requested = Long.MAX_VALUE;

            PendingSubscriber() {
            }

            protected void hookFinally(@Nonnull SignalType signalType) {
                complete();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean isCompleted() {
                return SerialFlux.ACTUAL.get(this) == this;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void complete() {
                SerialFlux.ACTUAL.set(this, this);
                if (SerialFlux.WIP.compareAndSet(Pending.this.main, Pending.this, null)) {
                    Pending.this.main.drain();
                }
            }

            @Nonnull
            public Context currentContext() {
                return (this.actual == null || isCompleted()) ? super.currentContext() : this.actual.currentContext();
            }

            protected void hookOnNext(@Nonnull T t) {
                CoreSubscriber<? super T> coreSubscriber = this.actual;
                if (isCompleted() || coreSubscriber == null) {
                    Operators.onDiscard(t, currentContext());
                } else {
                    coreSubscriber.onNext(t);
                }
            }

            protected void hookOnComplete() {
                CoreSubscriber<? super T> coreSubscriber;
                if (isCompleted() || (coreSubscriber = this.actual) == null) {
                    return;
                }
                coreSubscriber.onComplete();
            }

            protected void hookOnError(@Nonnull Throwable th) {
                CoreSubscriber<? super T> coreSubscriber;
                if (isCompleted() || (coreSubscriber = this.actual) == null) {
                    return;
                }
                coreSubscriber.onError(th);
            }

            protected void hookOnSubscribe(@Nonnull Subscription subscription) {
                subscription.request(this.requested);
            }

            /* JADX INFO: Access modifiers changed from: private */
            public synchronized void addSubscriber(CoreSubscriber<? super T> coreSubscriber) {
                if (SerialFlux.ACTUAL.compareAndSet(this, null, coreSubscriber)) {
                    coreSubscriber.onSubscribe(new Subscription() { // from class: org.jetlinks.core.utils.SerialFlux.Pending.PendingSubscriber.1
                        public void request(long j) {
                            PendingSubscriber.this.requested = j;
                        }

                        public void cancel() {
                            if (PendingSubscriber.this.upstream() == null) {
                                PendingSubscriber.this.dispose();
                                PendingSubscriber.this.complete();
                            } else {
                                PendingSubscriber.this.upstream().cancel();
                                PendingSubscriber.this.complete();
                            }
                        }
                    });
                } else {
                    Operators.error(coreSubscriber, new IllegalStateException("SerialFlux allows only a single Subscriber"));
                }
            }
        }

        protected Pending(SerialFlux<T> serialFlux, Flux<? extends T> flux) {
            super(flux);
            this.subscriber = new PendingSubscriber();
            this.main = serialFlux;
        }

        public Object scanUnsafe(@Nonnull Scannable.Attr attr) {
            return attr == Scannable.Attr.TERMINATED ? Boolean.valueOf(this.subscriber.isDisposed()) : super.scanUnsafe(attr);
        }

        public void subscribe(@Nonnull CoreSubscriber<? super T> coreSubscriber) {
            this.subscriber.addSubscriber(coreSubscriber);
            this.main.drain();
        }

        void doSubscribe() {
            if (!this.subscriber.isCompleted()) {
                this.source.subscribe(this.subscriber);
            } else {
                SerialFlux.WIP.compareAndSet(this.main, this, null);
                this.main.drain();
            }
        }
    }

    public SerialFlux() {
        this(Queues.small());
    }

    public SerialFlux(Supplier<Queue<Pending<T>>> supplier) {
        this.queue = supplier.get();
    }

    public int size() {
        return this.queue.size();
    }

    public Flux<T> join(Flux<T> flux) {
        Pending<T> pending = new Pending<>(this, flux);
        return !this.queue.offer(pending) ? Flux.error(new IllegalStateException("pending queue is full")) : pending;
    }

    void drain() {
        if (this.wip != null) {
            return;
        }
        while (true) {
            AtomicReferenceFieldUpdater<SerialFlux, Pending> atomicReferenceFieldUpdater = WIP;
            Pending<T> poll = this.queue.poll();
            if (atomicReferenceFieldUpdater.compareAndSet(this, null, poll)) {
                if (poll != null) {
                    poll.doSubscribe();
                    return;
                }
                return;
            } else if (poll != null && !this.queue.offer(poll)) {
                poll.doSubscribe();
                return;
            }
        }
    }
}
