package org.jetlinks.core.utils;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import javax.annotation.Nonnull;
import lombok.NonNull;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

/* loaded from: input_file:org/jetlinks/core/utils/FluxUtils.class */
public class FluxUtils {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/core/utils/FluxUtils$BufferRateSubscriber.class */
    public static class BufferRateSubscriber<T> extends BaseSubscriber<Tuple2<Long, T>> {
        int bufferSize;
        int rate;
        volatile List<T> bufferArray;
        FluxSink<List<T>> sink;
        Duration timeout;
        Scheduler timer = Schedulers.parallel();
        Disposable timerDispose;
        private final BiPredicate<T, List<T>> flushCondition;

        BufferRateSubscriber(FluxSink<List<T>> fluxSink, int i, int i2, Duration duration, BiPredicate<T, List<T>> biPredicate) {
            this.sink = fluxSink;
            this.bufferSize = i;
            this.rate = i2;
            this.timeout = duration;
            this.flushCondition = biPredicate;
            newBuffer();
        }

        protected List<T> newBuffer() {
            List<T> list = this.bufferArray;
            this.bufferArray = new ArrayList(this.bufferSize);
            return list;
        }

        protected void hookFinally(@Nonnull SignalType signalType) {
            doFlush();
        }

        void doFlush() {
            if (this.bufferArray.size() > 0) {
                this.sink.next(newBuffer());
            }
            request(this.bufferSize);
            if (this.timerDispose == null || this.timerDispose.isDisposed()) {
                return;
            }
            this.timerDispose.dispose();
        }

        protected void hookOnSubscribe(@Nonnull Subscription subscription) {
            request(this.bufferSize);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        public void hookOnNext(Tuple2<Long, T> tuple2) {
            this.bufferArray.add(tuple2.getT2());
            if (((Long) tuple2.getT1()).longValue() > this.rate) {
                doFlush();
                return;
            }
            if (this.flushCondition.test(tuple2.getT2(), this.bufferArray)) {
                doFlush();
            } else if (this.timerDispose == null || this.timerDispose.isDisposed()) {
                this.timerDispose = this.timer.schedule(this::doFlush, this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }

    public static <T> Function<Flux<T>, Flux<T>> distinct(Function<T, ?> function, Duration duration) {
        return flux -> {
            return flux.distinct(function, () -> {
                return Caffeine.newBuilder().expireAfter(new Expiry<Object, Object>() { // from class: org.jetlinks.core.utils.FluxUtils.1
                    public long expireAfterCreate(@NonNull Object obj, @NonNull Object obj2, long j) {
                        if (obj == null) {
                            throw new NullPointerException("key is marked non-null but is null");
                        }
                        if (obj2 == null) {
                            throw new NullPointerException("value is marked non-null but is null");
                        }
                        return duration.toNanos();
                    }

                    public long expireAfterUpdate(@NonNull Object obj, @NonNull Object obj2, long j, long j2) {
                        if (obj == null) {
                            throw new NullPointerException("key is marked non-null but is null");
                        }
                        if (obj2 == null) {
                            throw new NullPointerException("value is marked non-null but is null");
                        }
                        return j2;
                    }

                    public long expireAfterRead(@NonNull Object obj, @NonNull Object obj2, long j, long j2) {
                        if (obj == null) {
                            throw new NullPointerException("key is marked non-null but is null");
                        }
                        if (obj2 == null) {
                            throw new NullPointerException("value is marked non-null but is null");
                        }
                        return j2;
                    }
                }).build().asMap();
            }, (concurrentMap, obj) -> {
                return concurrentMap.put(obj, 1) == null;
            }, (v0) -> {
                v0.clear();
            });
        };
    }

    public static <S, T> Function<Flux<S>, Flux<T>> safeMap(Function<S, T> function) {
        return flux -> {
            return flux.mapNotNull(function);
        };
    }

    public static <T> Flux<List<T>> bufferRate(Flux<T> flux, int i, Duration duration) {
        return bufferRate(flux, i, 100, duration);
    }

    public static <T> Flux<List<T>> bufferRate(Flux<T> flux, int i, int i2, Duration duration) {
        return Flux.create(fluxSink -> {
            BufferRateSubscriber bufferRateSubscriber = new BufferRateSubscriber(fluxSink, i2, i, duration, (obj, list) -> {
                return list.size() >= i2;
            });
            flux.elapsed().subscribe(bufferRateSubscriber);
            fluxSink.onDispose(bufferRateSubscriber);
        });
    }

    public static <T> Flux<List<T>> bufferRate(Flux<T> flux, int i, int i2, Duration duration, BiPredicate<T, List<T>> biPredicate) {
        return Flux.create(fluxSink -> {
            BufferRateSubscriber bufferRateSubscriber = new BufferRateSubscriber(fluxSink, i2, i, duration, (obj, list) -> {
                return biPredicate.test(obj, list) || list.size() >= i2;
            });
            flux.elapsed().subscribe(bufferRateSubscriber);
            fluxSink.onDispose(bufferRateSubscriber);
        });
    }
}
