/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.NonBlocking;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.util.Gate;
import com.tangosol.util.LongArray;
import com.tangosol.util.NullImplementation;
import com.tangosol.util.TaskDaemon;
import com.tangosol.util.ThreadGateLite;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class BatchingOperationsQueue<V, R> {
    public static final int TRIGGER_OPEN = 0;
    public static final int TRIGGER_CLOSED = 1;
    public static final int TRIGGER_WAIT = 2;
    private final BiConsumer<BatchingOperationsQueue<V, R>, Integer> f_functionBatch;
    private final int f_cbInitialBatch;
    private final Deque<Element> f_queuePending;
    private final Deque<Element> f_queueCurrentBatch;
    private long m_cbCurrentBatch;
    private final Gate<?> f_gate;
    private final AtomicInteger f_lockTrigger = new AtomicInteger(0);
    private final DebouncedFlowControl f_backlog;
    private final ToLongFunction<V> f_backlogCalculator;
    private final Executor f_executor;
    private boolean m_fActive = true;

    public BatchingOperationsQueue(Consumer<Integer> functionBatch, int cbInitialBatch) {
        this((BatchingOperationsQueue<V, R> q, Integer i) -> functionBatch.accept((Integer)i), cbInitialBatch, new DebouncedFlowControl(cbInitialBatch, Integer.MAX_VALUE), (V v) -> 1L, Runnable::run);
    }

    public BatchingOperationsQueue(Consumer<Integer> functionBatch, int cbInitialBatch, DebouncedFlowControl backlog) {
        this(functionBatch, cbInitialBatch, backlog, (V v) -> 1L, Runnable::run);
    }

    public BatchingOperationsQueue(Consumer<Integer> functionBatch, int cbInitialBatch, DebouncedFlowControl backlog, ToLongFunction<V> backlogCalculator, Executor executor) {
        this((BatchingOperationsQueue<V, R> q, Integer i) -> functionBatch.accept((Integer)i), cbInitialBatch, backlog, backlogCalculator, executor);
    }

    public BatchingOperationsQueue(BiConsumer<BatchingOperationsQueue<V, R>, Integer> functionBatch, int cbInitialBatch, DebouncedFlowControl backlog, ToLongFunction<V> backlogCalculator, Executor executor) {
        this.f_functionBatch = functionBatch;
        this.f_cbInitialBatch = cbInitialBatch;
        this.f_queuePending = new ConcurrentLinkedDeque<Element>();
        this.f_queueCurrentBatch = new ConcurrentLinkedDeque<Element>();
        this.f_gate = new ThreadGateLite();
        this.f_backlog = backlog;
        this.f_backlogCalculator = backlogCalculator == null ? v -> 1L : backlogCalculator;
        this.f_executor = executor == null ? Executor.sameThread() : executor;
        this.resetTrigger();
    }

    public CompletableFuture<R> add(V value) {
        return this.add(value, false);
    }

    public CompletableFuture<R> addFirst(V value) {
        return this.add(value, true);
    }

    public void close() {
        Gate<?> gate = this.getGate();
        gate.close(-1L);
        try {
            this.m_fActive = false;
        }
        finally {
            gate.open();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> flush() {
        Gate<?> gate = this.getGate();
        gate.close(-1L);
        try {
            Deque<Element> queueCurrent = this.getCurrentBatch();
            Deque<Element> queuePending = this.getPending();
            CompletableFuture[] aFutures = (CompletableFuture[])Stream.concat(queueCurrent.stream(), queuePending.stream()).map(Element::getFuture).filter(future -> !future.isDone()).toArray(CompletableFuture[]::new);
            CompletionStage completionStage = CompletableFuture.allOf(aFutures).handle((_void, throwable) -> null);
            return completionStage;
        }
        finally {
            gate.open();
        }
    }

    public LinkedList<V> getCurrentBatchValues() {
        return this.getCurrentBatch().stream().filter(((Predicate<Element>)Element::isDone).negate()).map(Element::getValue).collect(Collectors.toCollection(LinkedList::new));
    }

    public boolean isBatchComplete() {
        return this.purgeCurrentBatch();
    }

    public int size() {
        return this.getCurrentBatchSize() + this.getPendingSize();
    }

    public int getCurrentBatchSize() {
        return this.f_queueCurrentBatch.size();
    }

    public int getPendingSize() {
        return this.f_queuePending.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void handleError(BiFunction<Throwable, V, Throwable> function, OnErrorAction action) {
        Gate<?> gate = this.getGate();
        gate.close(-1L);
        try {
            boolean fClose = false;
            if (action == null) {
                action = OnErrorAction.CompleteWithException;
            }
            switch (action.ordinal()) {
                case 0: {
                    Deque<Element> queueCurrent = this.getCurrentBatch();
                    while (true) {
                        if (queueCurrent.isEmpty()) {
                            this.resetTrigger();
                            this.triggerOperations(this.f_cbInitialBatch);
                            return;
                        }
                        Element element = queueCurrent.pollLast();
                        long cb = this.f_backlogCalculator.applyAsLong(element.getValue());
                        this.m_cbCurrentBatch -= cb;
                        if (element.isDone()) continue;
                        this.f_backlog.adjustBacklog(cb);
                        this.getPending().offerFirst(element);
                    }
                }
                case 2: {
                    fClose = true;
                }
                case 1: {
                    this.doErrorAction(e -> e.complete(null, null), fClose);
                    return;
                }
                case 4: {
                    fClose = true;
                }
                case 3: {
                    this.doErrorAction(e -> e.completeExceptionally(null, function), fClose);
                    return;
                }
                case 6: {
                    fClose = true;
                }
                case 5: {
                    this.doErrorAction(e -> e.cancel(function, null), fClose);
                    return;
                }
            }
            return;
        }
        finally {
            gate.open();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelAllAndClose(String sReason, BiFunction<Throwable, V, Throwable> function) {
        Gate<?> gate = this.getGate();
        gate.close(-1L);
        try {
            this.doErrorAction(e -> e.cancel(function, sReason), true);
        }
        finally {
            gate.open();
        }
    }

    public boolean isActive() {
        return this.m_fActive;
    }

    protected Element createElement(V value) {
        return new Element(value);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean fillCurrentBatch(int cbMaxElements) {
        if (this.m_cbCurrentBatch >= (long)cbMaxElements) {
            return true;
        }
        Gate<?> gate = this.getGate();
        gate.close(-1L);
        try {
            Deque<Element> queueCurrent = this.getCurrentBatch();
            Deque<Element> queuePending = this.getPending();
            Element element = (Element)queuePending.poll();
            while (element != null) {
                Object value = element.getValue();
                long lSize = this.f_backlogCalculator.applyAsLong(value);
                element.setSize(lSize);
                try (NonBlocking nb = new NonBlocking();){
                    this.f_backlog.adjustBacklog(-lSize);
                }
                if (!element.isDone()) {
                    queueCurrent.add(element);
                    long cbBatch = this.m_cbCurrentBatch += lSize;
                    if (cbBatch >= (long)cbMaxElements) break;
                }
                element = (Element)queuePending.poll();
            }
            if (queueCurrent.isEmpty()) {
                this.resetTrigger();
                boolean bl = false;
                return bl;
            }
            boolean bl = true;
            return bl;
        }
        finally {
            gate.open();
        }
    }

    protected void resetTrigger() {
        this.getTrigger().set(0);
    }

    protected void pause() {
        this.getTrigger().set(2);
    }

    public boolean resume() {
        return this.getTrigger().compareAndSet(2, 1);
    }

    protected void triggerOperations() {
        this.triggerOperations(Math.max(this.f_cbInitialBatch, 1));
    }

    protected void triggerOperations(int cBatchSize) {
        AtomicInteger trigger = this.getTrigger();
        if (trigger.get() == 0 && trigger.compareAndSet(0, 1)) {
            this.f_functionBatch.accept(this, cBatchSize);
        }
    }

    public boolean completeElement(Object oValue, Consumer<R> onComplete) {
        Deque<Element> queueCurrent = this.getCurrentBatch();
        boolean fCompleted = false;
        Element element = (Element)queueCurrent.poll();
        if (element != null) {
            Object value = element.getValue();
            this.m_cbCurrentBatch -= value != null ? this.f_backlogCalculator.applyAsLong(value) : 0L;
            if (!element.isDone()) {
                fCompleted = element.completeSynchronous(oValue, onComplete);
            }
        }
        return fCompleted;
    }

    public void completeElements(int cComplete, LongArray<R> aValues, BiFunction<Throwable, V, Throwable> function, Consumer<R> onComplete) {
        this.completeElements(cComplete, NullImplementation.getLongArray(), aValues, function, onComplete);
    }

    public void completeElements(int cComplete, LongArray<Throwable> aErrors, LongArray<R> aValues, BiFunction<Throwable, V, Throwable> errFunction, Consumer<R> onComplete) {
        Deque<Element> queueCurrent = this.getCurrentBatch();
        for (int i = 0; i < cComplete; ++i) {
            Throwable error;
            Element element = (Element)queueCurrent.poll();
            if (element == null) continue;
            Object value = element.getValue();
            this.m_cbCurrentBatch -= value != null ? this.f_backlogCalculator.applyAsLong(value) : 0L;
            if (element.isDone()) continue;
            Throwable throwable = error = aErrors == null ? null : aErrors.get(i);
            if (error == null) {
                R oValue = aValues.get(i);
                element.complete(oValue, onComplete);
                continue;
            }
            element.completeExceptionally(error, errFunction);
        }
    }

    protected Gate<?> getGate() {
        return this.f_gate;
    }

    protected Deque<Element> getCurrentBatch() {
        return this.f_queueCurrentBatch;
    }

    protected Deque<Element> getPending() {
        return this.f_queuePending;
    }

    protected AtomicInteger getTrigger() {
        return this.f_lockTrigger;
    }

    public String toString() {
        return "BatchingOperationsQueue(current=" + this.getCurrentBatch().size() + ", pending=" + this.getPending().size() + ", trigger=" + this.triggerToString(this.getTrigger().get()) + ", backlog=" + String.valueOf(this.f_backlog) + ")";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<R> add(V value, boolean fFirst) {
        Element element = this.createElement(value);
        Gate<?> gate = this.getGate();
        gate.enter(-1L);
        try {
            this.assertActive();
            if (fFirst) {
                this.getPending().addFirst(element);
            } else {
                this.getPending().add(element);
            }
        }
        finally {
            gate.exit();
        }
        this.triggerOperations(this.f_cbInitialBatch);
        this.f_backlog.adjustBacklog(this.f_backlogCalculator.applyAsLong(value));
        return element.getFuture();
    }

    protected void assertActive() {
        if (!this.isActive()) {
            throw new IllegalStateException("This batching queue is no longer active");
        }
    }

    protected void doErrorAction(Consumer<Element> action, boolean fClose) {
        Deque<Element> current = this.getCurrentBatch();
        Deque<Element> pending = this.getPending();
        if (!current.isEmpty() || !pending.isEmpty()) {
            Stream.concat(current.stream(), pending.stream()).forEach(element -> {
                if (!element.isDone()) {
                    action.accept((Element)element);
                }
            });
            this.m_cbCurrentBatch = 0L;
            try (NonBlocking nb = new NonBlocking();){
                long lBacklog = pending.stream().map(element -> this.f_backlogCalculator.applyAsLong(element.getValue())).mapToLong(Long::longValue).sum();
                this.f_backlog.adjustBacklog(-lBacklog);
            }
        }
        if (fClose) {
            this.close();
        } else {
            this.resetTrigger();
        }
    }

    private String triggerToString(int n) {
        switch (n) {
            case 0: {
                return "TRIGGER_OPEN";
            }
            case 1: {
                return "TRIGGER_CLOSED";
            }
            case 2: {
                return "TRIGGER_WAIT";
            }
        }
        return "TRIGGER_UNKNOWN";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean purgeCurrentBatch() {
        if (this.f_queueCurrentBatch.isEmpty()) {
            return true;
        }
        Gate<?> gate = this.getGate();
        gate.close(-1L);
        try {
            Iterator<Element> iterator = this.f_queueCurrentBatch.iterator();
            long cbSize = this.m_cbCurrentBatch;
            while (iterator.hasNext()) {
                Element element = iterator.next();
                if (!element.isDone()) continue;
                iterator.remove();
                cbSize -= element.getSize();
            }
            this.m_cbCurrentBatch = cbSize;
        }
        finally {
            gate.open();
        }
        return this.getCurrentBatchValues().isEmpty();
    }

    public static interface Executor {
        public void execute(Runnable var1);

        default public <R> void complete(CompletableFuture<R> future, R oValue, Consumer<R> onComplete) {
            this.execute(() -> {
                boolean fCompleted = future.complete(oValue);
                if (fCompleted && onComplete != null) {
                    try {
                        onComplete.accept(oValue);
                    }
                    catch (Throwable t) {
                        Logger.err(t);
                    }
                }
            });
        }

        default public void completeExceptionally(CompletableFuture<?> future, Throwable t) {
            this.execute(() -> future.completeExceptionally(t));
        }

        public static Executor sameThread() {
            return Runnable::run;
        }

        public static Executor fromTaskDaemon(TaskDaemon daemon) {
            return daemon::executeTask;
        }
    }

    public static enum OnErrorAction {
        Retry,
        Complete,
        CompleteAndClose,
        CompleteWithException,
        CompleteWithExceptionAndClose,
        Cancel,
        CancelAndClose;

    }

    public class Element {
        private final CompletableFuture<R> f_future;
        private final V f_value;
        private volatile boolean m_fDone = false;
        private volatile boolean m_fCancelled = false;
        private long m_cbSize;

        public Element(V value) {
            this.f_value = value;
            this.f_future = new CompletableFuture();
            this.f_future.handle((r, error) -> {
                this.m_fCancelled = error instanceof CancellationException;
                this.m_fDone = true;
                return null;
            });
        }

        public V getValue() {
            return this.f_value;
        }

        public CompletableFuture<R> getFuture() {
            return this.f_future;
        }

        public boolean isDone() {
            return this.m_fDone || this.f_future.isDone();
        }

        public boolean isCancelled() {
            return this.m_fCancelled || this.f_future.isCancelled();
        }

        public void complete(R result, Consumer<R> onComplete) {
            if (!this.m_fDone) {
                this.m_fDone = true;
                BatchingOperationsQueue.this.f_executor.complete(this.f_future, result, onComplete);
            }
        }

        public boolean completeSynchronous(R result, Consumer<R> onComplete) {
            boolean fCompleted = false;
            if (!this.m_fDone) {
                this.m_fDone = true;
                fCompleted = this.f_future.complete(result);
                if (fCompleted && onComplete != null) {
                    try {
                        onComplete.accept(result);
                    }
                    catch (Throwable t) {
                        Logger.err(t);
                    }
                }
            }
            return fCompleted;
        }

        public void completeExceptionally(Throwable throwable, BiFunction<Throwable, V, Throwable> function) {
            if (!this.m_fDone) {
                this.m_fDone = true;
                BatchingOperationsQueue.this.f_executor.completeExceptionally(this.f_future, function.apply(throwable, this.f_value));
            }
        }

        public void cancel(BiFunction<Throwable, V, Throwable> function, String sReason) {
            if (!this.m_fDone) {
                OperationCancelledException exception = sReason != null && !sReason.isEmpty() ? new OperationCancelledException(sReason) : new OperationCancelledException();
                OperationCancelledException throwable = function == null ? exception : function.apply(exception, this.f_value);
                BatchingOperationsQueue.this.f_executor.completeExceptionally(this.f_future, throwable);
            }
        }

        public long getSize() {
            return this.m_cbSize;
        }

        public void setSize(long cbSize) {
            this.m_cbSize = cbSize;
        }
    }

    protected static class OperationCancelledException
    extends CancellationException {
        public OperationCancelledException() {
        }

        public OperationCancelledException(String message) {
            super(message);
        }
    }
}

