/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Converter;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.util.MemorySize;
import com.oracle.coherence.common.util.Options;
import com.tangosol.coherence.config.Config;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicChannelPublisher;
import com.tangosol.internal.net.topic.impl.paged.model.NotificationKey;
import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
import com.tangosol.internal.util.DaemonPool;
import com.tangosol.internal.util.Daemons;
import com.tangosol.internal.util.DefaultDaemonPoolDependencies;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.Serializer;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.Cluster;
import com.tangosol.net.FlowControl;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Publisher;
import com.tangosol.util.Base;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.MapListener;
import com.tangosol.util.filter.InKeySetFilter;
import com.tangosol.util.listener.SimpleMapListener;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PagedTopicPublisher<V>
implements Publisher<V> {
    public static final long CLOSE_TIMEOUT_SECS = TimeUnit.MILLISECONDS.toSeconds(Base.parseTime(Config.getProperty("coherence.topic.publisher.close.timeout", "30s"), 1000));
    private volatile State m_state;
    private final NamedTopic<V> m_topic;
    private PagedTopicCaches m_caches;
    private final String f_sTopicName;
    private final Converter<V, Binary> f_convValueToBinary;
    private final int f_nNotifyPostFull;
    private final Publisher.OrderBy<V> f_funcOrder;
    private final DebouncedFlowControl f_flowControl;
    protected final PagedTopicChannelPublisher[] f_aChannel;
    private final BitSet f_setOfferedChannel;
    private final DeactivationListener f_listenerDeactivation = new DeactivationListener();
    private final MapListener<NotificationKey, int[]> f_listenerNotification;
    private final Filter<int[]> f_filterListenerNotification;
    private final List<Runnable> f_listOnCloseActions = new ArrayList<Runnable>();
    private final DaemonPool f_daemon;
    private final Executor f_executor;
    private final long f_nId;
    private final Publisher.OnFailure f_onFailure;
    private final Lock f_lock = new ReentrantLock();

    @SafeVarargs
    public PagedTopicPublisher(NamedTopic<V> topic, PagedTopicCaches pagedTopicCaches, Publisher.Option<? super V> ... opts) {
        DebouncedFlowControl backlog;
        this.m_caches = Objects.requireNonNull(pagedTopicCaches, "The PagedTopicCaches parameter cannot be null");
        this.m_topic = topic;
        this.registerDeactivationListener();
        Serializer serializer = pagedTopicCaches.getSerializer();
        Cluster cluster = this.m_caches.getService().getCluster();
        Options<Publisher.Option<V>> options = Options.from(Publisher.Option.class, opts);
        this.f_nId = PagedTopicPublisher.createId(System.identityHashCode(this), cluster.getLocalMember().getId());
        this.f_convValueToBinary = value -> ExternalizableHelper.toBinary(value, serializer);
        this.f_sTopicName = pagedTopicCaches.getTopicName();
        this.f_nNotifyPostFull = options.contains((Publisher.Option<V>)((Object)Publisher.FailOnFull.class)) ? 0 : System.identityHashCode(this);
        this.f_funcOrder = this.computeOrderByOption(options);
        this.f_onFailure = options.get(Publisher.OnFailure.class);
        ChannelCount channelCount = options.get(ChannelCount.class, ChannelCount.USE_CONFIGURED);
        int cChannel = channelCount.isUseConfigured() ? topic.getChannelCount() : channelCount.getChannelCount();
        long cbBatch = this.m_caches.getDependencies().getMaxBatchSizeBytes();
        this.f_aChannel = new PagedTopicChannelPublisher[cChannel];
        this.f_setOfferedChannel = new BitSet(cChannel);
        this.f_flowControl = backlog = new DebouncedFlowControl(cbBatch * 2L, cbBatch * 3L, l -> new MemorySize(Math.abs(l)).toString());
        DefaultDaemonPoolDependencies dependencies = new DefaultDaemonPoolDependencies();
        dependencies.setName("Publisher-" + this.m_caches.getTopicName() + "-" + this.f_nId);
        dependencies.setThreadCountMin(1);
        dependencies.setThreadCount(1);
        dependencies.setThreadCountMax(Integer.MAX_VALUE);
        this.f_daemon = Daemons.newDaemonPool(dependencies);
        this.f_executor = this.f_daemon::add;
        this.f_daemon.start();
        for (int nChannel = 0; nChannel < cChannel; ++nChannel) {
            this.f_aChannel[nChannel] = new PagedTopicChannelPublisher(this.f_nId, nChannel, cChannel, this.m_caches, this.f_nNotifyPostFull, backlog, this.f_daemon, this::handlePublishError);
        }
        this.f_listenerNotification = new SimpleMapListener().addDeleteHandler(evt -> this.onNotification((int[])evt.getOldValue())).synchronous();
        InKeySetFilter inKeySetFilter = this.f_filterListenerNotification = this.f_nNotifyPostFull == 0 ? null : new InKeySetFilter(null, pagedTopicCaches.getPartitionNotifierSet(this.f_nNotifyPostFull));
        if (this.f_nNotifyPostFull != 0) {
            pagedTopicCaches.Notifications.addMapListener(this.f_listenerNotification, this.f_filterListenerNotification, false);
        }
        this.m_state = State.Active;
    }

    @Override
    public boolean isActive() {
        return this.m_state == State.Active || this.m_state == State.Disconnected;
    }

    @Override
    public CompletableFuture<Publisher.Status> publish(V value) {
        this.ensureActive();
        IllegalStateException thrown = null;
        for (int attempt = 0; attempt < 2; ++attempt) {
            try {
                PagedTopicChannelPublisher channelPublisher = this.ensureChannelPublisher(value);
                CompletableFuture<Publisher.Status> future = channelPublisher.publish(this.f_convValueToBinary.convert(value));
                future.handleAsync((status, error) -> this.handlePublished(channelPublisher.getChannel()), this.f_executor);
                return future;
            }
            catch (IllegalStateException e) {
                if (thrown == null) {
                    thrown = e;
                } else {
                    thrown.addSuppressed(e);
                }
                this.ensureActive();
                continue;
            }
        }
        throw Exceptions.ensureRuntimeException(thrown);
    }

    @Override
    public FlowControl getFlowControl() {
        return this.f_flowControl;
    }

    @Override
    public CompletableFuture<Void> flush() {
        this.ensureActive();
        return this.flushInternal(FlushMode.FLUSH);
    }

    @Override
    public void close() {
        if (this.isActive()) {
            this.closeInternal(false);
        }
    }

    @Override
    public void onClose(Runnable action) {
        this.f_listOnCloseActions.add(action);
    }

    @Override
    public int getChannelCount() {
        return this.m_topic.getChannelCount();
    }

    @Override
    public NamedTopic<V> getNamedTopic() {
        return this.m_topic;
    }

    public String getName() {
        return this.f_sTopicName;
    }

    protected void onNotification(int[] anChannel) {
        for (int nChannel : anChannel) {
            PagedTopicChannelPublisher channel = this.f_aChannel[nChannel];
            channel.onNotification();
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PagedTopicPublisher that = (PagedTopicPublisher)o;
        return this.m_caches.equals(that.m_caches);
    }

    public int hashCode() {
        return this.m_caches.hashCode();
    }

    public String toString() {
        PagedTopicCaches caches = this.m_caches;
        if (caches == null) {
            return this.getClass().getSimpleName() + "(inactive)";
        }
        int cChannels = this.f_setOfferedChannel.cardinality();
        StringBuilder buf = new StringBuilder(this.getClass().getSimpleName()).append("(topic=").append(caches.getTopicName()).append(", id=").append(this.f_nId).append(", orderBy=").append(this.f_funcOrder).append(", backlog=").append(this.f_flowControl).append(", channels=").append(cChannels);
        if (!this.f_setOfferedChannel.isEmpty()) {
            for (PagedTopicChannelPublisher channel : this.f_aChannel) {
                int nChannel = channel.getChannel();
                if (!this.f_setOfferedChannel.get(nChannel)) continue;
                buf.append("  ").append(nChannel).append(": ").append(channel);
            }
            this.f_setOfferedChannel.clear();
        }
        return buf.toString();
    }

    protected Void handlePublished(int nChannel) {
        this.f_setOfferedChannel.set(nChannel);
        return null;
    }

    protected void handlePublishError(Throwable error, int nChannel) {
        if (error != null) {
            switch (this.f_onFailure) {
                case Stop: {
                    Logger.fine("Closing publisher due to publishing error from channel " + nChannel + ", " + String.valueOf(error));
                    this.m_state = State.OnError;
                    CompletableFuture.runAsync(() -> this.closeInternal(false), Daemons.commonPool());
                    break;
                }
                case Continue: {
                    Logger.finer("Publisher set to continue on error, ignoring publishing error from channel " + nChannel + ", " + String.valueOf(error));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PagedTopicChannelPublisher ensureChannelPublisher(V value) {
        int nOrder = value instanceof Publisher.Orderable ? ((Publisher.Orderable)value).getOrderId() : this.f_funcOrder.getOrderId(value);
        int nChannel = Base.mod(nOrder, this.f_aChannel.length);
        PagedTopicChannelPublisher publisher = this.f_aChannel[nChannel];
        if (!publisher.isActive()) {
            if (this.f_onFailure == Publisher.OnFailure.Stop) {
                this.closeInternal(false);
                throw new IllegalStateException("This publisher is no longer active");
            }
            this.f_lock.lock();
            try {
                publisher = this.f_aChannel[nChannel];
                if (this.isActive() && !publisher.isActive()) {
                    this.m_caches.ensureConnected();
                    Logger.finer("Restarted publisher for channel " + nChannel + " topic " + this.m_caches.getTopicName() + " publisher " + this.f_nId);
                    publisher = this.f_aChannel[nChannel] = new PagedTopicChannelPublisher(this.f_nId, nChannel, this.f_aChannel.length, this.m_caches, this.f_nNotifyPostFull, this.f_flowControl, this.f_daemon, this::handlePublishError);
                }
            }
            finally {
                this.f_lock.unlock();
            }
        }
        return publisher;
    }

    private Publisher.OrderBy computeOrderByOption(Options options) {
        Iterator<Publisher.OrderBy> iter = options.getInstancesOf(Publisher.OrderBy.class).iterator();
        return iter.hasNext() ? iter.next() : Publisher.OrderBy.thread();
    }

    private void ensureActive() {
        if (!this.isActive()) {
            throw new IllegalStateException("This publisher is no longer active");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeInternal(boolean fDestroyed) {
        if (this.m_caches == null || this.m_state == State.Closing || this.m_state == State.Closed) {
            return;
        }
        this.f_lock.lock();
        try {
            if (this.m_caches == null || this.m_state == State.Closing || this.m_state == State.Closed) {
                return;
            }
            this.m_state = State.Closing;
            try {
                if (!fDestroyed) {
                    this.unregisterDeactivationListener();
                    if (this.f_nNotifyPostFull != 0) {
                        PagedTopicCaches pagedTopicCaches = this.m_caches;
                        if (pagedTopicCaches.Notifications.isActive()) {
                            pagedTopicCaches.Notifications.removeMapListener(this.f_listenerNotification, this.f_filterListenerNotification);
                        }
                    }
                }
                for (PagedTopicChannelPublisher channel : this.f_aChannel) {
                    channel.stop();
                }
                try {
                    this.flushInternal(fDestroyed ? FlushMode.FLUSH_DESTROY : FlushMode.FLUSH).get(CLOSE_TIMEOUT_SECS, TimeUnit.SECONDS);
                }
                catch (TimeoutException exception) {
                    this.flushInternal(FlushMode.FLUSH_CLOSE_EXCEPTIONALLY).join();
                    Logger.warn("Publisher.close: timeout after waiting " + CLOSE_TIMEOUT_SECS + " seconds for completion with flush.join(), forcing complete exceptionally");
                }
                catch (InterruptedException | ExecutionException exception) {
                    // empty catch block
                }
                for (PagedTopicChannelPublisher channel : this.f_aChannel) {
                    channel.close();
                }
            }
            finally {
                this.m_caches = null;
                Arrays.fill(this.f_aChannel, null);
                this.f_listOnCloseActions.forEach(action -> {
                    try {
                        action.run();
                    }
                    catch (Throwable t) {
                        Logger.fine(this.getClass().getName() + ".close(): handled onClose exception: " + t.getClass().getCanonicalName() + ": " + t.getMessage());
                    }
                });
                this.f_daemon.shutdown();
                this.m_state = State.Closed;
            }
        }
        finally {
            this.f_lock.unlock();
        }
    }

    private CompletableFuture<Void> flushInternal(FlushMode mode) {
        CompletableFuture[] aFuture = new CompletableFuture[this.f_aChannel.length];
        for (int i = 0; i < aFuture.length; ++i) {
            aFuture[i] = this.f_aChannel[i].flush(mode);
        }
        return CompletableFuture.allOf(aFuture);
    }

    protected void registerDeactivationListener() {
        try {
            this.m_caches.addListener(this.f_listenerDeactivation);
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    protected void unregisterDeactivationListener() {
        try {
            this.m_caches.removeListener(this.f_listenerDeactivation);
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    static long createId(long nNotificationId, long nMemberId) {
        return nMemberId << 32 | nNotificationId & 0xFFFFFFFFL;
    }

    protected class DeactivationListener
    implements PagedTopicCaches.Listener {
        protected DeactivationListener() {
        }

        @Override
        public void onConnect() {
        }

        @Override
        public void onDisconnect() {
        }

        @Override
        public void onDestroy() {
            if (PagedTopicPublisher.this.isActive()) {
                Logger.fine("Detected destroy of topic " + PagedTopicPublisher.this.f_sTopicName + ", closing publisher " + String.valueOf(PagedTopicPublisher.this));
                PagedTopicPublisher.this.closeInternal(true);
            }
        }

        @Override
        public void onRelease() {
            if (PagedTopicPublisher.this.isActive()) {
                Logger.fine("Detected release of topic " + PagedTopicPublisher.this.f_sTopicName + ", closing publisher " + String.valueOf(PagedTopicPublisher.this));
                PagedTopicPublisher.this.closeInternal(false);
            }
        }
    }

    public static class ChannelCount
    implements Publisher.Option<Object>,
    ExternalizableLite,
    PortableObject {
        public static final ChannelCount USE_CONFIGURED = new ChannelCount(-1);
        private int m_cChannel;

        public ChannelCount() {
            this(-1);
        }

        public ChannelCount(int cChannel) {
            this.m_cChannel = cChannel;
        }

        public boolean isUseConfigured() {
            return this.m_cChannel < 0;
        }

        public int getChannelCount() {
            return this.m_cChannel;
        }

        @Override
        public void readExternal(DataInput in) throws IOException {
            this.m_cChannel = in.readInt();
        }

        @Override
        public void writeExternal(DataOutput out) throws IOException {
            out.writeInt(this.m_cChannel);
        }

        @Override
        public void readExternal(PofReader in) throws IOException {
            this.m_cChannel = in.readInt(0);
        }

        @Override
        public void writeExternal(PofWriter out) throws IOException {
            out.writeInt(0, this.m_cChannel);
        }

        public static ChannelCount of(int cChannel) {
            return new ChannelCount(cChannel);
        }

        @Options.Default
        public static ChannelCount useConfigured() {
            return USE_CONFIGURED;
        }
    }

    public static enum State {
        Active,
        Closing,
        Closed,
        Disconnected,
        OnError;

    }

    static enum FlushMode {
        FLUSH,
        FLUSH_DESTROY,
        FLUSH_CLOSE_EXCEPTIONALLY;

    }

    protected static class PublishedStatus
    implements Publisher.Status {
        private final int f_nChannel;
        private final PagedPosition f_position;

        protected PublishedStatus(int nChannel, long lPage, int nOffset) {
            this.f_nChannel = nChannel;
            this.f_position = new PagedPosition(lPage, nOffset);
        }

        @Override
        public int getChannel() {
            return this.f_nChannel;
        }

        @Override
        public Position getPosition() {
            return this.f_position;
        }

        public String toString() {
            return "PublishedStatus(channel=" + this.f_nChannel + ", position=" + String.valueOf(this.f_position) + ")";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PublishedStatus that = (PublishedStatus)o;
            return this.f_nChannel == that.f_nChannel && Objects.equals(this.f_position, that.f_position);
        }

        public int hashCode() {
            return Objects.hash(this.f_nChannel, this.f_position);
        }
    }
}

