/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Associated;
import com.oracle.coherence.common.base.Blocking;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.util.Duration;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue;
import com.tangosol.internal.net.topic.impl.paged.PagedTopic;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicDependencies;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicPublisher;
import com.tangosol.internal.net.topic.impl.paged.agent.OfferProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.TailAdvancer;
import com.tangosol.internal.net.topic.impl.paged.agent.TopicInitialiseProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.Page;
import com.tangosol.internal.net.topic.impl.paged.model.Usage;
import com.tangosol.internal.util.DaemonPool;
import com.tangosol.io.Serializer;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.partition.KeyPartitioningStrategy;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.TopicException;
import com.tangosol.net.topic.TopicPublisherException;
import com.tangosol.util.Binary;
import com.tangosol.util.InvocableMapHelper;
import com.tangosol.util.LongArray;
import com.tangosol.util.SparseArray;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

public class PagedTopicChannelPublisher {
    private static final Void VOID = null;
    private final long f_lPublisherId;
    private final int f_nChannel;
    private final int f_nChannelCount;
    private final String f_sTopicName;
    private final BiConsumer<Throwable, Integer> f_onErrorHandler;
    private PagedTopicCaches m_caches;
    private final Serializer f_serializer;
    private final KeyPartitioningStrategy f_keyPartitioningStrategy;
    private final int f_nNotifyPostFull;
    private final Usage.Key f_keyUsageSync;
    private final int f_nUsageSyncUnitOfOrder;
    private final BatchingOperationsQueue<Binary, Publisher.Status> f_batchingQueue;
    private volatile State m_state;
    private volatile long m_lTail = -1L;
    private volatile CompletableFuture<Long> futurePageId;
    private CompletableFuture<Long> m_futureMovePage;
    private long m_cOffers;
    private long m_cOffersLast;
    private long m_cAccepted;
    private long m_cAcceptedLast;
    private long m_cMisses;
    private long m_cMissesLast;
    private long m_cWait;
    private long m_cWaitsLast;
    private long m_cNotify;
    private long m_cNotifyLast;

    public PagedTopicChannelPublisher(long lPublisherId, int nChannel, int nChannelCount, PagedTopicCaches caches, int nNotifyPostFull, DebouncedFlowControl flowControl, DaemonPool pool, BiConsumer<Throwable, Integer> onErrorHandler) {
        this.f_lPublisherId = lPublisherId;
        this.f_nChannel = nChannel;
        this.f_nChannelCount = nChannelCount;
        this.f_sTopicName = caches.getTopicName();
        this.f_onErrorHandler = onErrorHandler;
        this.m_caches = caches;
        this.f_nNotifyPostFull = nNotifyPostFull;
        this.f_keyUsageSync = caches.getUsageSyncKey(nChannel);
        this.f_nUsageSyncUnitOfOrder = caches.getUnitOfOrder(this.f_keyUsageSync.getPartitionId());
        this.f_serializer = caches.getSerializer();
        this.f_keyPartitioningStrategy = caches.getService().getKeyPartitioningStrategy();
        AssociatedExecutor executor = new AssociatedExecutor(pool);
        NamedTopic.ElementCalculator calculator = caches.getElementCalculator();
        this.f_batchingQueue = new BatchingOperationsQueue(this::addQueuedElements, 1, flowControl, calculator::calculateUnits, (BatchingOperationsQueue.Executor)executor);
        this.m_state = State.Active;
    }

    public CompletableFuture<Publisher.Status> publish(Binary binValue) {
        this.ensureConnected();
        try {
            return this.f_batchingQueue.add(binValue);
        }
        catch (IllegalStateException e) {
            throw new IllegalStateException("This publisher is no longer active", e);
        }
    }

    private void ensureConnected() {
        long now = System.currentTimeMillis();
        long retry = PagedTopic.DEFAULT_RECONNECT_TIMEOUT_SECONDS.as(Duration.Magnitude.MILLI);
        long timeout = now * 2L;
        Throwable error = null;
        while (now < timeout) {
            PagedTopicCaches caches = this.m_caches;
            if (this.m_state != State.Active || caches == null) {
                return;
            }
            try {
                PagedTopicDependencies dependencies = caches.getDependencies();
                retry = dependencies.getReconnectRetryMillis();
                timeout = now + dependencies.getReconnectTimeoutMillis();
                caches.ensureConnected();
                PagedTopicService service = caches.getService();
                if (service.isSuspended()) {
                    Blocking.sleep(100L);
                    break;
                }
                int cActual = service.ensureChannelCount(this.f_sTopicName, this.f_nChannel + 1, this.f_nChannelCount);
                if (this.f_nChannel >= cActual) {
                    Logger.warn(() -> String.format("This publisher is publishing to channel %d, but the topic is configured with %d channels", this.f_nChannel, cActual));
                }
                error = null;
                break;
            }
            catch (Throwable thrown) {
                error = thrown;
                if (error instanceof TopicException) break;
                now = System.currentTimeMillis();
                if (now >= timeout) continue;
                Logger.finer("Failed to reconnect publisher, will retry in " + retry + " millis " + String.valueOf(this) + " due to " + error.getMessage());
                try {
                    Thread.sleep(retry);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        if (error != null) {
            throw Exceptions.ensureRuntimeException(error);
        }
    }

    public boolean isActive() {
        return this.m_state == State.Active;
    }

    public synchronized void stop() {
        if (this.m_state == State.Active) {
            this.m_state = State.Closing;
            this.f_batchingQueue.close();
        }
    }

    public synchronized void close() {
        if (this.m_state == State.Closing) {
            this.m_state = State.Closed;
            this.f_batchingQueue.cancelAllAndClose("Publisher has been closed", null);
            this.m_caches = null;
        }
    }

    public CompletableFuture<Void> flush(PagedTopicPublisher.FlushMode mode) {
        String sDescription = null;
        switch (mode) {
            case FLUSH_DESTROY: {
                sDescription = "Topic " + this.f_sTopicName + " was destroyed";
            }
            case FLUSH_CLOSE_EXCEPTIONALLY: {
                String sReason = sDescription != null ? sDescription : "Force Close of Publisher " + this.f_lPublisherId + " channel " + this.f_nChannel + " for topic " + this.f_sTopicName;
                BiFunction<Throwable, Binary, Throwable> fn = TopicPublisherException.createFactory(this.f_serializer, sReason);
                this.f_batchingQueue.handleError(fn, BatchingOperationsQueue.OnErrorAction.CompleteWithException);
                return this.f_batchingQueue.flush();
            }
        }
        return this.f_batchingQueue.flush();
    }

    public int getChannel() {
        return this.f_nChannel;
    }

    protected void onNotification() {
        ++this.m_cNotify;
        if (this.f_batchingQueue.resume()) {
            ++this.m_cWait;
            this.addQueuedElements(1);
        }
    }

    protected void addQueuedElements(int cbMaxElements) {
        if (this.f_batchingQueue.fillCurrentBatch(cbMaxElements)) {
            ((CompletableFuture)this.ensurePageId().thenAccept(_void -> this.addInternal(this.m_lTail))).handle(this::handleError);
        }
    }

    protected void addInternal(long lPageId) {
        LinkedList<Binary> listBinary = this.f_batchingQueue.getCurrentBatchValues();
        if (listBinary.isEmpty()) {
            return;
        }
        PagedTopicCaches caches = this.m_caches;
        Page.Key keyPage = new Page.Key(this.f_keyUsageSync.getChannelId(), lPageId);
        int nPart = this.f_keyPartitioningStrategy.getKeyPartition(keyPage);
        InvocableMapHelper.invokeAsync(caches.Pages, keyPage, caches.getUnitOfOrder(nPart), new OfferProcessor(listBinary, this.f_nNotifyPostFull, false), (result, e) -> {
            if (e == null) {
                this.handleOfferCompletion((OfferProcessor.Result)result, lPageId);
            } else {
                this.handleError(null, (Throwable)e);
            }
        });
    }

    protected void handleOfferCompletion(OfferProcessor.Result result, long lPageId) {
        LongArray<Throwable> aErrors = result.getErrors();
        SparseArray<PagedTopicPublisher.PublishedStatus> aMetadata = new SparseArray<PagedTopicPublisher.PublishedStatus>();
        int cAccepted = result.getAcceptedCount();
        int nChannel = this.f_keyUsageSync.getChannelId();
        ++this.m_cOffers;
        this.m_cAccepted += (long)cAccepted;
        if (cAccepted == 0) {
            ++this.m_cMisses;
        }
        if (this.f_nNotifyPostFull == 0 && result.getStatus() == OfferProcessor.Result.Status.TopicFull) {
            int ceBatch = this.f_batchingQueue.getCurrentBatch().size();
            IllegalStateException error = new IllegalStateException("the topic is at capacity");
            if (aErrors == null) {
                aErrors = new SparseArray<Throwable>();
            }
            while (cAccepted < ceBatch) {
                ++cAccepted;
                aErrors.add(error);
            }
        } else {
            int nOffset = result.getOffset();
            for (long i = 0L; i < (long)cAccepted; ++i) {
                if (aErrors != null && aErrors.get(i) != null) continue;
                aMetadata.set(i, new PagedTopicPublisher.PublishedStatus(nChannel, lPageId, nOffset++));
            }
        }
        this.f_batchingQueue.completeElements(cAccepted, aErrors, aMetadata, TopicPublisherException.createFactory(this.f_serializer), null);
        this.handleIndividualErrors(aErrors);
        if (this.m_state != State.Closed) {
            switch (result.getStatus()) {
                case PageSealed: {
                    ((CompletableFuture)this.moveToNextPage(lPageId).thenRun(() -> this.addQueuedElements(result.getPageCapacity()))).handle(this::handleError);
                    break;
                }
                case TopicFull: {
                    if (this.f_nNotifyPostFull != 0) {
                        this.f_batchingQueue.pause();
                        break;
                    }
                }
                default: {
                    this.addQueuedElements(result.getPageCapacity());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Void handleError(Object ignored, Throwable throwable) {
        if (throwable != null) {
            PagedTopicChannelPublisher pagedTopicChannelPublisher = this;
            synchronized (pagedTopicChannelPublisher) {
                this.stop();
                if (this.f_onErrorHandler != null) {
                    try {
                        this.f_onErrorHandler.accept(throwable, this.f_nChannel);
                    }
                    catch (Throwable t) {
                        Logger.err(t);
                    }
                }
                this.f_batchingQueue.handleError(TopicPublisherException.createFactory(this.f_serializer, throwable.getMessage()), BatchingOperationsQueue.OnErrorAction.CancelAndClose);
                this.close();
            }
        }
        return VOID;
    }

    protected void handleIndividualErrors(LongArray<Throwable> aErrors) {
        if (aErrors == null || aErrors.isEmpty()) {
            return;
        }
        Throwable throwable = aErrors.get(aErrors.getFirstIndex());
        this.handleError(VOID, throwable);
    }

    protected CompletableFuture<Long> ensurePageId() {
        if (this.futurePageId == null) {
            return this.initializePageId();
        }
        return this.futurePageId;
    }

    private synchronized CompletableFuture<Long> initializePageId() {
        if (this.futurePageId == null) {
            this.futurePageId = InvocableMapHelper.invokeAsync(this.m_caches.Usages, this.f_keyUsageSync, this.m_caches.getUnitOfOrder(this.f_keyUsageSync.getPartitionId()), new TopicInitialiseProcessor(), new BiConsumer[0]);
        }
        return this.futurePageId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Long> moveToNextPage(long lPage) {
        long lPageCurrent = this.m_lTail;
        if (lPageCurrent > lPage) {
            return CompletableFuture.completedFuture(lPageCurrent);
        }
        PagedTopicChannelPublisher pagedTopicChannelPublisher = this;
        synchronized (pagedTopicChannelPublisher) {
            lPageCurrent = this.m_lTail;
            if (lPageCurrent > lPage) {
                return CompletableFuture.completedFuture(lPageCurrent);
            }
            CompletableFuture<Long> futureResult = this.m_futureMovePage;
            if (futureResult == null) {
                this.m_futureMovePage = futureResult = InvocableMapHelper.invokeAsync(this.m_caches.Usages, this.f_keyUsageSync, this.f_nUsageSyncUnitOfOrder, new TailAdvancer(lPage + 1L), (result, e) -> {
                    if (e == null) {
                        this.updatePageId((long)result);
                    } else {
                        this.handleError(result, (Throwable)e);
                    }
                });
            }
            return futureResult;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updatePageId(long lPageNew) {
        if (this.m_lTail < lPageNew) {
            PagedTopicChannelPublisher pagedTopicChannelPublisher = this;
            synchronized (pagedTopicChannelPublisher) {
                if (this.m_lTail < lPageNew) {
                    this.m_lTail = lPageNew;
                }
            }
        }
        this.m_futureMovePage = null;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PagedTopicChannelPublisher that = (PagedTopicChannelPublisher)o;
        return this.f_lPublisherId == that.f_lPublisherId && this.f_nChannel == that.f_nChannel && Objects.equals(this.m_caches, that.m_caches);
    }

    public int hashCode() {
        return Objects.hash(this.f_lPublisherId, this.f_nChannel, this.m_caches);
    }

    public String toString() {
        long cOffersNow = this.m_cOffers;
        long cAcceptedNow = this.m_cAccepted;
        long cMissesNow = this.m_cMisses;
        long cWaitNow = this.m_cWait;
        long cNotifyNow = this.m_cNotify;
        long cOffers = cOffersNow - this.m_cOffersLast;
        long cAccepted = cAcceptedNow - this.m_cAcceptedLast;
        long cMisses = cMissesNow - this.m_cMissesLast;
        long cWait = cWaitNow - this.m_cWaitsLast;
        long cNotify = cNotifyNow - this.m_cNotifyLast;
        this.m_cOffersLast = cOffersNow;
        this.m_cAcceptedLast = cAcceptedNow;
        this.m_cMissesLast = cMissesNow;
        this.m_cWaitsLast = cWaitNow;
        this.m_cNotifyLast = cNotifyNow;
        return this.getClass().getSimpleName() + "(topic=" + this.f_sTopicName + ", channel=" + this.f_nChannel + ", state=" + String.valueOf((Object)this.m_state) + ", publisher=" + this.f_lPublisherId + ", batchSize=" + cAccepted / Math.max(1L, cOffers - cMisses) + ", hitRate=" + (cOffers - cMisses) * 100L / Math.max(1L, cOffers) + "%, waitNotifyRate=" + cWait * 100L / Math.max(1L, cOffers) + "/" + cNotify * 100L / Math.max(1L, cOffers) + "%)";
    }

    protected class AssociatedExecutor
    implements BatchingOperationsQueue.Executor {
        private final DaemonPool f_pool;

        public AssociatedExecutor(DaemonPool pool) {
            this.f_pool = pool;
        }

        @Override
        public void execute(Runnable runnable) {
            this.f_pool.add(new AssociatedTask(runnable));
        }
    }

    public static enum State {
        Active,
        Closing,
        Closed;

    }

    protected class AssociatedTask
    implements Runnable,
    Associated<Integer> {
        private final Runnable f_task;

        public AssociatedTask(Runnable task) {
            this.f_task = task;
        }

        @Override
        public Integer getAssociatedKey() {
            return PagedTopicChannelPublisher.this.f_nChannel;
        }

        @Override
        public void run() {
            this.f_task.run();
        }
    }
}

