/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.base.TimeHelper;
import com.oracle.coherence.common.util.Options;
import com.oracle.coherence.common.util.SafeClock;
import com.oracle.coherence.common.util.Sentry;
import com.tangosol.coherence.config.Config;
import com.tangosol.internal.net.DebouncedFlowControl;
import com.tangosol.internal.net.metrics.Meter;
import com.tangosol.internal.net.topic.impl.paged.BatchingOperationsQueue;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicDependencies;
import com.tangosol.internal.net.topic.impl.paged.agent.CloseSubscriptionProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.CommitProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.DestroySubscriptionProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.EvictSubscriber;
import com.tangosol.internal.net.topic.impl.paged.agent.HeadAdvancer;
import com.tangosol.internal.net.topic.impl.paged.agent.PollProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.SeekProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.SubscriberHeartbeatProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.ContentKey;
import com.tangosol.internal.net.topic.impl.paged.model.NotificationKey;
import com.tangosol.internal.net.topic.impl.paged.model.Page;
import com.tangosol.internal.net.topic.impl.paged.model.PageElement;
import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
import com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberInfo;
import com.tangosol.internal.net.topic.impl.paged.model.Subscription;
import com.tangosol.internal.util.Daemons;
import com.tangosol.io.Serializer;
import com.tangosol.net.Cluster;
import com.tangosol.net.FlowControl;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.events.EventDispatcher;
import com.tangosol.net.events.EventDispatcherAwareInterceptor;
import com.tangosol.net.events.partition.cache.EntryEvent;
import com.tangosol.net.events.partition.cache.PartitionedCacheDispatcher;
import com.tangosol.net.management.MBeanHelper;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicException;
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.Base;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.Gate;
import com.tangosol.util.InvocableMapHelper;
import com.tangosol.util.Listeners;
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
import com.tangosol.util.SparseArray;
import com.tangosol.util.TaskDaemon;
import com.tangosol.util.ThreadGateLite;
import com.tangosol.util.UUID;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.aggregator.ComparableMin;
import com.tangosol.util.aggregator.GroupAggregator;
import com.tangosol.util.aggregator.LongMin;
import com.tangosol.util.extractor.ReflectionExtractor;
import com.tangosol.util.filter.InKeySetFilter;
import com.tangosol.util.listener.SimpleMapListener;
import java.io.PrintStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class PagedTopicSubscriber<V>
implements Subscriber<V>,
AutoCloseable {
    public static final int STATE_INITIAL = 0;
    public static final int STATE_CONNECTING = 1;
    public static final int STATE_CONNECTED = 2;
    public static final int STATE_DISCONNECTED = 3;
    public static final int STATE_CLOSING = 4;
    public static final int STATE_CLOSED = 5;
    public static final String[] STATES = new String[]{"Initial", "Connecting", "Connected", "Disconnected", "CLosing", "Closed"};
    public static final long CLOSE_TIMEOUT_SECS = TimeUnit.MILLISECONDS.toSeconds(Base.parseTime(Config.getProperty("coherence.topic.subscriber.close.timeout", "30s"), 1000));
    public static final long INIT_TIMEOUT_SECS = TimeUnit.MILLISECONDS.toSeconds(Base.parseTime(Config.getProperty("coherence.topic.subscriber.init.timeout", "30s"), 1000));
    private final NamedTopic<?> f_topic;
    protected final PagedTopicCaches m_caches;
    protected final boolean f_fAnonymous;
    protected final SubscriberId f_id;
    protected final SubscriberInfo.Key f_key;
    protected final SubscriberHeartbeatProcessor f_heartbeatProcessor;
    protected final Filter<V> f_filter;
    protected final ValueExtractor<V, ?> f_extractor;
    protected final Serializer f_serializer;
    protected final SubscriberGroupId f_subscriberGroupId;
    protected long m_subscriptionId;
    protected volatile long m_connectionTimestamp;
    protected final int f_nNotificationId;
    protected final Filter<Object> f_filterNotification;
    protected final boolean f_fCompleteOnEmpty;
    private final Gate<?> f_gate;
    private final Gate<?> f_gateState;
    private final Lock f_receiveLock = new ReentrantLock();
    private volatile int m_nState = 0;
    private volatile boolean m_fForceReconnect;
    protected final ConcurrentLinkedDeque<CommittableElement> m_queueValuesPrefetched = new ConcurrentLinkedDeque();
    protected final BatchingOperationsQueue<Request, ?> f_queueReceiveOrders;
    protected final DebouncedFlowControl f_backlog;
    protected volatile PagedTopicChannel[] m_aChannel;
    protected volatile int[] m_aChannelOwned;
    protected volatile int m_nChannel;
    protected final TaskDaemon f_daemon;
    private final Executor f_executor;
    protected final TaskDaemon f_daemonChannels;
    private final Executor f_executorChannels;
    private final SimpleMapListener f_listenerNotification;
    protected final ChannelListener m_listenerChannelAllocation;
    protected final Subscriber.ChannelOwnershipListener[] m_aChannelOwnershipListener;
    protected long m_cPolls;
    protected long m_cPollsLast;
    protected long m_cValues;
    protected long m_cValuesLast;
    protected long m_cWait;
    protected long m_cWaitsLast;
    protected long m_cMisses;
    protected long m_cMissesLast;
    protected long m_cNotify;
    protected long m_cNotifyLast;
    protected final DeactivationListener f_listenerDeactivation = new DeactivationListener();
    protected final GroupDeactivationListener m_listenerGroupDeactivation;
    private final List<Runnable> f_listOnCloseActions = new ArrayList<Runnable>();
    private CommittableElement m_elementEmpty;
    private final Meter m_cReceived = new Meter();
    private final Meter m_cReceivedEmpty = new Meter();
    private final Meter m_cReceivedError = new Meter();
    private final Meter m_cSubscribe = new Meter();
    private final Meter m_cDisconnect = new Meter();
    private final ReconnectTask f_taskReconnect;
    private final String f_sIdentifyingName;
    private final Listeners f_stateListeners = new Listeners();
    private final LongAdder f_cReceiveRequests = new LongAdder();
    private final LongAdder f_cCancelled = new LongAdder();

    public <T> PagedTopicSubscriber(NamedTopic<?> topic, PagedTopicCaches caches, Subscriber.Option<? super T, V> ... options) {
        this.f_topic = Objects.requireNonNull(topic);
        this.m_caches = Objects.requireNonNull(caches, "The TopicCaches parameter cannot be null");
        Options<Subscriber.Option<Class<Subscriber.CompleteOnEmpty>, V>> optionsMap = Options.from(Subscriber.Option.class, options);
        Subscriber.Name nameOption = optionsMap.get(Subscriber.Name.class, null);
        String sName = nameOption == null ? null : nameOption.getName();
        this.f_fAnonymous = sName == null;
        this.m_listenerGroupDeactivation = new GroupDeactivationListener();
        this.f_serializer = this.m_caches.getSerializer();
        this.f_listenerNotification = new SimpleMapListener().addDeleteHandler(this::onChannelPopulatedNotification);
        this.f_gate = new ThreadGateLite();
        this.f_gateState = new ThreadGateLite();
        Subscriber.ChannelOwnershipListeners listeners = optionsMap.get(Subscriber.ChannelOwnershipListeners.class, Subscriber.ChannelOwnershipListeners.none());
        this.m_aChannelOwnershipListener = listeners.getListeners().toArray(new Subscriber.ChannelOwnershipListener[0]);
        PagedTopicService cacheService = this.m_caches.getService();
        Cluster cluster = cacheService.getCluster();
        Member member = cluster.getLocalMember();
        boolean fWarn = false;
        WithNotificationId withId = optionsMap.get(WithNotificationId.class);
        if (withId == null) {
            this.f_nNotificationId = System.identityHashCode(this);
        } else {
            this.f_nNotificationId = withId.getId();
            fWarn = true;
        }
        this.f_fCompleteOnEmpty = optionsMap.contains((Subscriber.Option<Class<Subscriber.CompleteOnEmpty>, V>)((Object)Subscriber.CompleteOnEmpty.class));
        this.f_filterNotification = new InKeySetFilter<Object>(null, this.m_caches.getPartitionNotifierSet(this.f_nNotificationId));
        this.f_id = new SubscriberId(this.f_nNotificationId, member.getId(), member.getUuid());
        this.f_subscriberGroupId = this.f_fAnonymous ? SubscriberGroupId.anonymous() : SubscriberGroupId.withName(sName);
        this.f_key = new SubscriberInfo.Key(this.f_subscriberGroupId, this.f_id.getId());
        this.f_heartbeatProcessor = new SubscriberHeartbeatProcessor();
        this.m_listenerChannelAllocation = new ChannelListener(this.f_id, new PagedTopicSubscription.Key(this.f_topic.getName(), this.f_subscriberGroupId));
        Subscriber.Filtered filtered = optionsMap.get(Subscriber.Filtered.class);
        this.f_filter = filtered == null ? null : filtered.getFilter();
        Subscriber.Convert convert = optionsMap.get(Subscriber.Convert.class);
        this.f_extractor = convert == null ? null : convert.getExtractor();
        this.f_taskReconnect = new ReconnectTask(this);
        this.f_daemon = new TaskDaemon("PagedTopic:Subscriber:" + this.m_caches.getTopicName() + ":" + this.f_id.getId());
        this.f_executor = new TaskDaemon("PagedTopic:Subscriber:" + this.m_caches.getTopicName() + ":Receive:" + this.f_id.getId());
        this.f_daemonChannels = new TaskDaemon("PagedTopic:Subscriber:" + this.m_caches.getTopicName() + ":Channels:" + this.f_id.getId());
        this.f_executorChannels = this.f_daemonChannels::executeTask;
        this.f_daemon.start();
        this.f_daemonChannels.start();
        long cBacklog = cluster.getDependencies().getPublisherCloggedCount();
        this.f_backlog = new DebouncedFlowControl(cBacklog * 2L / 3L, cBacklog);
        this.f_queueReceiveOrders = new BatchingOperationsQueue(this::trigger, 1, this.f_backlog, v -> 1L, BatchingOperationsQueue.Executor.fromTaskDaemon(this.f_daemon));
        int cChannel = this.m_caches.getChannelCount();
        this.m_aChannel = this.initializeChannels(this.m_caches, cChannel, this.f_subscriberGroupId);
        WithIdentifyingName withIdentifyingName = optionsMap.get(WithIdentifyingName.class);
        this.f_sIdentifyingName = withIdentifyingName == null ? null : withIdentifyingName.getName();
        this.registerChannelAllocationListener();
        this.registerDeactivationListener();
        this.registerMBean();
        if (fWarn) {
            Logger.warn("Subscriber " + String.valueOf(this.f_id) + " is being created with a custom notification id " + this.f_nNotificationId);
        }
        this.ensureConnected();
    }

    public long getId() {
        return this.f_id.getId();
    }

    public SubscriberId getSubscriberId() {
        return this.f_id;
    }

    public long getSubscriptionId() {
        return this.m_subscriptionId;
    }

    public String getIdentifyingName() {
        return this.f_sIdentifyingName;
    }

    public int getNotificationId() {
        return this.f_nNotificationId;
    }

    public SubscriberInfo.Key getKey() {
        return this.f_key;
    }

    public boolean isAnonymous() {
        return this.f_fAnonymous;
    }

    public long getBacklog() {
        return this.f_backlog.getBacklog();
    }

    public long getMaxBacklog() {
        return this.f_backlog.getExcessiveLimit();
    }

    public Filter<V> getFilter() {
        return this.f_filter;
    }

    public ValueExtractor<V, ?> getConverter() {
        return this.f_extractor;
    }

    public Serializer getSerializer() {
        return this.f_serializer;
    }

    public boolean isCompleteOnEmpty() {
        return this.f_fCompleteOnEmpty;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void printChannels(PrintStream out) {
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            out.println("Owned: " + Arrays.toString(this.m_aChannelOwned));
            for (int c = 0; c < this.m_aChannel.length; ++c) {
                out.printf("%d: %s current=%b\n", c, this.m_aChannel[c], c == this.m_nChannel);
            }
        }
        finally {
            gate.exit();
        }
    }

    public void printPreFetchCache(PrintStream out) {
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            out.println("Pre-Fetch Cache: ");
            this.m_queueValuesPrefetched.forEach(out::println);
        }
        finally {
            gate.exit();
        }
    }

    @Override
    public <T> NamedTopic<T> getNamedTopic() {
        return this.f_topic;
    }

    @Override
    public CompletableFuture<Subscriber.Element<V>> receive() {
        this.ensureActive();
        CompletableFuture<Subscriber.Element<V>> future = this.f_queueReceiveOrders.add(ReceiveRequest.SINGLE);
        this.f_cReceiveRequests.add(1L);
        future.handle((e, error) -> {
            if (error instanceof CancellationException) {
                if (!(error instanceof BatchingOperationsQueue.OperationCancelledException)) {
                    Logger.err("Receive cancelled", error);
                }
                this.f_cCancelled.add(1L);
            }
            return null;
        });
        return future;
    }

    @Override
    public CompletableFuture<List<Subscriber.Element<V>>> receive(int cBatch) {
        this.ensureActive();
        CompletableFuture<List<Subscriber.Element<V>>> future = this.f_queueReceiveOrders.add(new ReceiveRequest(true, cBatch));
        this.f_cReceiveRequests.add(1L);
        future.handle((e, error) -> {
            if (error instanceof CancellationException) {
                if (!(error instanceof BatchingOperationsQueue.OperationCancelledException)) {
                    Logger.err("Receive cancelled", error);
                }
                this.f_cCancelled.add(1L);
            }
            return null;
        });
        return future;
    }

    public Optional<Subscriber.Element<V>> peek(int nChannel) {
        this.ensureActive();
        Map<Integer, Position> map = this.getHeads();
        Position position = map.get(nChannel);
        if (position != null) {
            Optional<CommittableElement> optional = this.m_queueValuesPrefetched.stream().filter(e -> e.getPosition().equals(position)).findFirst();
            if (optional.isPresent()) {
                return Optional.of(optional.get().getElement());
            }
            PagedPosition pagedPosition = (PagedPosition)position;
            ContentKey key = new ContentKey(nChannel, pagedPosition.getPage(), pagedPosition.getOffset());
            Binary binary = (Binary)this.m_caches.Data.get(key.toBinary(this.m_caches.getPartitionCount()));
            return binary == null ? Optional.empty() : Optional.of(PageElement.fromBinary(binary, this.m_caches.getSerializer()));
        }
        return Optional.empty();
    }

    @Override
    public CompletableFuture<Subscriber.CommitResult> commitAsync(int nChannel, Position position) {
        this.ensureActive();
        try {
            if (position instanceof PagedPosition) {
                return this.commitInternal(nChannel, (PagedPosition)position, null);
            }
            throw new IllegalArgumentException("Invalid position type");
        }
        catch (Throwable t) {
            CompletableFuture<Subscriber.CommitResult> future = new CompletableFuture<Subscriber.CommitResult>();
            future.completeExceptionally(t);
            return future;
        }
    }

    @Override
    public CompletableFuture<Map<Integer, Subscriber.CommitResult>> commitAsync(Map<Integer, Position> mapPositions) {
        this.ensureActive();
        HashMap<Integer, Subscriber.CommitResult> mapResult = new HashMap<Integer, Subscriber.CommitResult>();
        HashMap<Integer, PagedPosition> mapCommit = new HashMap<Integer, PagedPosition>();
        for (Map.Entry<Integer, Position> entry : mapPositions.entrySet()) {
            Integer nChannel = entry.getKey();
            Position position = entry.getValue();
            if (position instanceof PagedPosition) {
                mapCommit.put(nChannel, (PagedPosition)position);
                continue;
            }
            mapResult.put(nChannel, new Subscriber.CommitResult((int)nChannel, position, Subscriber.CommitResultStatus.Rejected));
        }
        CompletableFuture[] aFuture = (CompletableFuture[])mapCommit.entrySet().stream().map(e -> this.commitInternal((Integer)e.getKey(), (PagedPosition)e.getValue(), mapResult)).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(aFuture).handle((_void, err) -> mapResult);
    }

    @Override
    public int[] getChannels() {
        return this.getChannelSet().stream().mapToInt(i -> i).toArray();
    }

    public Set<Integer> getChannelSet() {
        if (this.m_nState == 2) {
            Gate<?> gate = this.f_gate;
            gate.enter(-1L);
            try {
                if (this.m_nState == 2) {
                    Set<Integer> set = Arrays.stream(this.m_aChannel).filter(PagedTopicChannel::isOwned).map(c -> c.subscriberPartitionSync.getChannelId()).collect(Collectors.toSet());
                    return set;
                }
            }
            finally {
                gate.exit();
            }
        }
        return Collections.emptySet();
    }

    @Override
    public boolean isOwner(int nChannel) {
        if (this.m_nState == 2) {
            return nChannel >= 0 && this.m_aChannel[nChannel].isOwned();
        }
        return false;
    }

    @Override
    public int getChannelCount() {
        return this.m_aChannel == null ? this.m_caches.getChannelCount() : this.m_aChannel.length;
    }

    @Override
    public FlowControl getFlowControl() {
        return this.f_backlog;
    }

    @Override
    public void onClose(Runnable action) {
        this.f_listOnCloseActions.add(action);
    }

    @Override
    public boolean isActive() {
        return this.m_nState != 5 && this.m_nState != 4;
    }

    @Override
    public Map<Integer, Position> getLastCommitted() {
        this.ensureActive();
        CompletableFuture<?> future = this.f_queueReceiveOrders.addFirst(new GetPositionRequest(PositionType.Committed));
        return (Map)future.join();
    }

    @Override
    public Map<Integer, Position> getHeads() {
        this.ensureActive();
        CompletableFuture<?> future = this.f_queueReceiveOrders.addFirst(new GetPositionRequest(PositionType.Head));
        return (Map)future.join();
    }

    @Override
    public Map<Integer, Position> getTails() {
        this.ensureActive();
        CompletableFuture<?> future = this.f_queueReceiveOrders.addFirst(new GetPositionRequest(PositionType.Tail));
        return (Map)future.join();
    }

    @Override
    public Position seek(int nChannel, Position position) {
        this.ensureActive();
        CompletableFuture<?> future = this.f_queueReceiveOrders.addFirst(new SeekRequest(Collections.singletonMap(nChannel, position)));
        Map map = (Map)future.join();
        return (Position)map.get(nChannel);
    }

    @Override
    public Map<Integer, Position> seek(Map<Integer, Position> mapPosition) {
        this.ensureActive();
        CompletableFuture<?> future = this.f_queueReceiveOrders.addFirst(new SeekRequest(mapPosition));
        return (Map)future.join();
    }

    @Override
    public Position seek(int nChannel, Instant timestamp) {
        this.ensureActive();
        CompletableFuture<?> future = this.f_queueReceiveOrders.addFirst(new SeekRequest(timestamp, nChannel));
        return (Position)((Map)future.join()).get(nChannel);
    }

    @Override
    public Map<Integer, Position> seekToHead(int ... anChannel) {
        this.ensureActiveAnOwnedChannels(anChannel);
        return (Map)this.f_queueReceiveOrders.addFirst(new SeekRequest(SeekType.Head, anChannel)).join();
    }

    @Override
    public Map<Integer, Position> seekToTail(int ... anChannel) {
        this.ensureActiveAnOwnedChannels(anChannel);
        return (Map)this.f_queueReceiveOrders.addFirst(new SeekRequest(SeekType.Tail, anChannel)).join();
    }

    @Override
    public int getRemainingMessages() {
        int[] anChannel = this.getChannels();
        return this.m_caches.getRemainingMessages(this.f_subscriberGroupId, anChannel);
    }

    @Override
    public int getRemainingMessages(int nChannel) {
        if (this.isOwner(nChannel)) {
            return this.m_caches.getRemainingMessages(this.f_subscriberGroupId, nChannel);
        }
        return 0;
    }

    @Override
    public void close() {
        this.closeInternal(false);
    }

    public String toString() {
        if (this.m_nState == 5) {
            return this.getClass().getSimpleName() + "(inactive)";
        }
        long cPollsNow = this.m_cPolls;
        long cValuesNow = this.m_cValues;
        long cMissesNow = this.m_cMisses;
        long cWaitNow = this.m_cWait;
        long cNotifyNow = this.m_cNotify;
        long cPoll = cPollsNow - this.m_cPollsLast;
        long cValues = cValuesNow - this.m_cValuesLast;
        long cMisses = cMissesNow - this.m_cMissesLast;
        long cWait = cWaitNow - this.m_cWaitsLast;
        long cNotify = cNotifyNow - this.m_cNotifyLast;
        this.m_cPollsLast = cPollsNow;
        this.m_cValuesLast = cValuesNow;
        this.m_cMissesLast = cMissesNow;
        this.m_cWaitsLast = cWaitNow;
        this.m_cNotifyLast = cNotifyNow;
        PagedTopicChannel[] aChannel = this.m_aChannel;
        long cChannelsPolled = Arrays.stream(aChannel).filter(PagedTopicChannel::isPolled).count();
        String sChannelsPolled = Arrays.toString(Arrays.stream(aChannel).filter(PagedTopicChannel::isPolled).mapToInt(PagedTopicChannel::getId).toArray());
        long cChannelsHit = Arrays.stream(aChannel).filter(PagedTopicChannel::isHit).count();
        String sChannelsHit = Arrays.toString(Arrays.stream(aChannel).filter(PagedTopicChannel::isHit).mapToInt(PagedTopicChannel::getId).toArray());
        String sState = this.getStateName();
        String sName = this.f_sIdentifyingName == null ? "" : ", name=" + this.f_sIdentifyingName;
        return this.getClass().getSimpleName() + "(topic=" + this.m_caches.getTopicName() + sName + ", id=" + String.valueOf(this.f_id) + ", group=" + String.valueOf(this.f_subscriberGroupId) + ", subscriptionId=" + this.m_subscriptionId + ", durable=" + !this.f_fAnonymous + ", state=" + sState + ", prefetched=" + this.m_queueValuesPrefetched.size() + ", backlog=" + String.valueOf(this.f_backlog) + ", subscriptions=" + this.m_cSubscribe.getCount() + ", disconnections=" + this.m_cDisconnect.getCount() + ", received=" + this.m_cReceived.getCount() + ", receivedEmpty=" + this.m_cReceivedEmpty.getCount() + ", receivedError=" + this.m_cReceivedError.getCount() + ", channelAllocation=" + (this.f_fAnonymous ? "[ALL]" : Arrays.toString(this.m_aChannelOwned)) + ", totalChannelsPolled=" + cPollsNow + ", channelsPolledSinceReallocation=" + sChannelsPolled + cChannelsPolled + ", channelsHit=" + sChannelsHit + cChannelsHit + "/" + cChannelsPolled + ", batchSize=" + cValues / Math.max(1L, cPoll - cMisses) + ", values=" + cValuesNow + ", notifications=" + cNotifyNow + ", waitNotifyRate=" + cWait * 100L / Math.max(1L, cPoll) + "/" + cNotify * 100L / Math.max(1L, cPoll) + "%)";
    }

    private PagedTopicChannel[] initializeChannels(PagedTopicCaches caches, int cChannel, SubscriberGroupId subscriberGroupId) {
        return this.initializeChannels(caches, cChannel, subscriberGroupId, null);
    }

    private PagedTopicChannel[] initializeChannels(PagedTopicCaches caches, int cChannel, SubscriberGroupId subscriberGroupId, PagedTopicChannel[] existing) {
        if (existing != null && existing.length >= cChannel) {
            return existing;
        }
        try (Sentry ignored = this.f_gate.close();){
            PagedTopicChannel[] aChannel = new PagedTopicChannel[cChannel];
            int cPart = caches.getPartitionCount();
            for (int nChannel = 0; nChannel < cChannel; ++nChannel) {
                if (existing != null && nChannel < existing.length) {
                    aChannel[nChannel] = existing[nChannel];
                    continue;
                }
                aChannel[nChannel] = new PagedTopicChannel();
                aChannel[nChannel].subscriberPartitionSync = Subscription.createSyncKey(subscriberGroupId, nChannel, cPart);
            }
            PagedTopicChannel[] pagedTopicChannelArray = aChannel;
            return pagedTopicChannelArray;
        }
    }

    public Subscriber.Channel getChannel(int nChannel) {
        return nChannel < this.m_aChannel.length ? this.m_aChannel[nChannel] : new Subscriber.Channel.EmptyChannel(nChannel);
    }

    public long getCancelled() {
        return this.f_cCancelled.longValue();
    }

    public long getReceiveRequests() {
        return this.f_cReceiveRequests.longValue();
    }

    public long getReceived() {
        return this.m_cReceived.getCount();
    }

    public double getReceivedMeanRate() {
        return this.m_cReceived.getMeanRate();
    }

    public double getReceivedOneMinuteRate() {
        return this.m_cReceived.getOneMinuteRate();
    }

    public double getReceivedFiveMinuteRate() {
        return this.m_cReceived.getFiveMinuteRate();
    }

    public double getReceivedFifteenMinuteRate() {
        return this.m_cReceived.getFifteenMinuteRate();
    }

    public long getReceivedEmpty() {
        return this.m_cReceivedEmpty.getCount();
    }

    public long getReceivedError() {
        return this.m_cReceivedError.getCount();
    }

    public long getDisconnectCount() {
        return this.m_cDisconnect.getCount();
    }

    public int getAutoReconnectTaskCount() {
        return this.f_taskReconnect.getExecutionCount();
    }

    public long getPolls() {
        return this.m_cPolls;
    }

    public long getElementsPolled() {
        return this.m_cValues;
    }

    public long getWaitCount() {
        return this.m_cWait;
    }

    public long getMisses() {
        return this.m_cMisses;
    }

    public long getNotify() {
        return this.m_cNotify;
    }

    public boolean isCommitted(int nChannel, Position position) {
        this.ensureActive();
        return this.m_caches.isCommitted(this.f_subscriberGroupId, nChannel, position);
    }

    protected void initialise() throws InterruptedException, ExecutionException, TimeoutException {
        this.ensureActive();
        if (this.m_nState == 2) {
            return;
        }
        try (Sentry ignored = this.f_gate.close();){
            boolean fReconnect;
            boolean bl = fReconnect = this.m_nState == 3;
            while (this.m_nState != 2 && this.isActive()) {
                boolean fDisconnected;
                long[] alHead;
                int cChannel;
                int nPrevState = this.setState(1);
                if (fReconnect) {
                    Logger.finest("Reconnecting subscriber " + String.valueOf(this));
                }
                this.m_caches.ensureConnected();
                if (!this.f_fAnonymous) {
                    PagedTopicService service = this.m_caches.getService();
                    if (this.m_subscriptionId == 0L) {
                        this.m_subscriptionId = service.ensureSubscription(this.f_topic.getName(), this.f_subscriberGroupId, this.f_id, this.f_filter, this.f_extractor);
                    } else {
                        service.ensureSubscription(this.f_topic.getName(), this.m_subscriptionId, this.f_id, this.m_fForceReconnect);
                        if (service.isSubscriptionDestroyed(this.m_subscriptionId)) {
                            this.close();
                            throw new IllegalStateException("The subscriber group \"" + String.valueOf(this.f_subscriberGroupId) + "\" (id=" + this.m_subscriptionId + ") this subscriber was previously subscribed to has been destroyed");
                        }
                    }
                    PagedTopicSubscription subscription = service.getSubscription(this.m_subscriptionId);
                    this.m_connectionTimestamp = subscription != null ? subscription.getSubscriberTimestamp(this.f_id) : SafeClock.INSTANCE.getSafeTimeMillis();
                    this.heartbeat(false);
                }
                if ((cChannel = (alHead = this.m_caches.initializeSubscription(this.f_subscriberGroupId, this.f_id, this.m_subscriptionId, this.f_filter, this.f_extractor, fReconnect, false, fDisconnected = nPrevState == 3)).length) > this.m_aChannel.length) {
                    PagedTopicChannel[] aChannel = this.m_aChannel;
                    this.m_aChannel = this.initializeChannels(this.m_caches, cChannel, this.f_subscriberGroupId, aChannel);
                }
                for (int nChannel = 0; nChannel < cChannel; ++nChannel) {
                    PagedTopicChannel channel = this.m_aChannel[nChannel];
                    channel.m_lHead = alHead[nChannel];
                    channel.m_nNext = -1;
                    channel.setPopulated();
                }
                if (this.f_fAnonymous) {
                    TreeSet<Integer> listChannel = new TreeSet<Integer>();
                    for (int i = 0; i < cChannel; ++i) {
                        listChannel.add(i);
                    }
                    this.updateChannelOwnership(listChannel, false);
                } else {
                    SortedSet setChannel;
                    PagedTopicSubscription pagedTopicSubscription = this.m_caches.getService().getSubscription(this.m_subscriptionId);
                    if (pagedTopicSubscription != null) {
                        setChannel = pagedTopicSubscription.getOwnedChannels(this.f_id);
                    } else {
                        CompletableFuture future = this.m_caches.Subscriptions.async().get(this.m_aChannel[0].subscriberPartitionSync);
                        try {
                            Subscription subscription = (Subscription)future.get(INIT_TIMEOUT_SECS, TimeUnit.SECONDS);
                            setChannel = Arrays.stream(subscription.getChannels(this.f_id, cChannel)).boxed().collect(Collectors.toCollection(TreeSet::new));
                        }
                        catch (TimeoutException e) {
                            future.cancel(true);
                            if (future.isDone() && !future.isCompletedExceptionally()) {
                                Subscription subscription = (Subscription)future.get(INIT_TIMEOUT_SECS, TimeUnit.SECONDS);
                                setChannel = Arrays.stream(subscription.getChannels(this.f_id, cChannel)).boxed().collect(Collectors.toCollection(TreeSet::new));
                            }
                            throw e;
                        }
                    }
                    this.updateChannelOwnership(setChannel, false);
                }
                this.heartbeat();
                this.registerNotificationListener();
                if (!this.casState(1, 2)) continue;
                this.switchChannel();
            }
            this.m_cSubscribe.mark();
        }
    }

    private void trigger(int cBatch) {
        this.receiveInternal(this.f_queueReceiveOrders, cBatch);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void receiveInternal(BatchingOperationsQueue<Request, ?> queueRequest, Integer cBatch) {
        if (this.isActive()) {
            this.ensureConnected();
        }
        if (!queueRequest.isBatchComplete() || queueRequest.fillCurrentBatch(cBatch)) {
            this.heartbeat();
            this.complete(queueRequest);
            int nChannel = this.ensureOwnedChannel();
            if (!queueRequest.isBatchComplete() && nChannel >= 0) {
                PagedTopicChannel channel = this.m_aChannel[nChannel];
                long lVersion = channel.getVersion();
                long lHead = channel.m_lHead == -1L ? this.getSubscriptionHead(channel) : channel.m_lHead;
                int nPart = ((PartitionedService)((Object)this.m_caches.Subscriptions.getCacheService())).getKeyPartitioningStrategy().getKeyPartition(new Page.Key(nChannel, lHead));
                InvocableMapHelper.invokeAsync(this.m_caches.Subscriptions, new Subscription.Key(nPart, nChannel, this.f_subscriberGroupId), this.m_caches.getUnitOfOrder(nPart), new PollProcessor(lHead, Integer.MAX_VALUE, this.f_nNotificationId, this.f_id), this.f_executor, (result, e) -> this.onReceiveResult(channel, lVersion, lHead, (PollProcessor.Result)result, (Throwable)e)).handleAsync((r, e) -> {
                    if (e != null) {
                        Logger.err(e);
                        return null;
                    }
                    if (!this.m_queueValuesPrefetched.isEmpty()) {
                        this.complete(queueRequest);
                    }
                    this.trigger(cBatch);
                    return null;
                }, this.f_daemon::executeTask);
            } else if (this.m_queueValuesPrefetched.isEmpty() && nChannel < 0) {
                Gate<?> gate = this.f_gate;
                gate.enter(-1L);
                try {
                    if (this.switchChannel()) {
                        this.receiveInternal(queueRequest, cBatch);
                    }
                    if (this.f_fCompleteOnEmpty) {
                        this.m_queueValuesPrefetched.add(this.getEmptyElement());
                        this.complete(queueRequest);
                    }
                    queueRequest.resetTrigger();
                }
                finally {
                    gate.exit();
                }
            } else {
                this.receiveInternal(queueRequest, cBatch);
            }
        }
    }

    private long getSubscriptionHead(PagedTopicChannel channel) {
        Subscription subscription = (Subscription)this.m_caches.Subscriptions.get(channel.subscriberPartitionSync);
        if (subscription == null) {
            try {
                this.initialise();
            }
            catch (Throwable e) {
                throw Exceptions.ensureRuntimeException(e);
            }
            return this.m_aChannel[channel.getId()].m_lHead;
        }
        return subscription.getSubscriptionHead();
    }

    protected void complete(BatchingOperationsQueue<Request, ?> queueRequest) {
        LinkedList<Request> queueBatch = queueRequest.getCurrentBatchValues();
        this.complete(queueRequest, queueBatch);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void complete(BatchingOperationsQueue<Request, ?> queueRequest, LinkedList<Request> queueBatch) {
        block13: {
            ConcurrentLinkedDeque<CommittableElement> queuePrefetched = this.m_queueValuesPrefetched;
            Request firstRequest = queueBatch.peek();
            while (firstRequest instanceof FunctionalRequest) {
                ((FunctionalRequest)queueBatch.poll()).execute(this, queueRequest);
                firstRequest = queueBatch.peek();
            }
            int cValues = 0;
            int cRequest = queueBatch.size();
            if (this.isActive() && !queuePrefetched.isEmpty()) {
                Gate<?> gate = this.f_gate;
                gate.enter(-1L);
                try {
                    SparseArray aValues = new SparseArray();
                    CommittableElement element = queuePrefetched.peek();
                    if (element != null && element.isEmpty()) {
                        queuePrefetched.poll();
                        while (cValues < cRequest) {
                            Request request = queueBatch.get(cValues);
                            if (!(request instanceof ReceiveRequest)) continue;
                            ReceiveRequest receiveRequest = (ReceiveRequest)request;
                            if (!receiveRequest.isBatch()) {
                                aValues.set(cValues, null);
                            } else {
                                aValues.set(cValues, Collections.emptyList());
                            }
                            ++cValues;
                        }
                        queueRequest.completeElements(cValues, aValues, this::onReceiveError, this::onReceiveComplete);
                        break block13;
                    }
                    while (this.m_nState == 2 && cValues < cRequest && !queuePrefetched.isEmpty()) {
                        Request request = queueBatch.get(cValues);
                        if (!(request instanceof ReceiveRequest)) continue;
                        ReceiveRequest receiveRequest = (ReceiveRequest)request;
                        if (receiveRequest.isBatch()) {
                            CommittableElement e;
                            int cElement = receiveRequest.getElementCount();
                            LinkedList<CommittableElement> list = new LinkedList<CommittableElement>();
                            for (int i = 0; i < cElement && !queuePrefetched.isEmpty(); ++i) {
                                element = queuePrefetched.poll();
                                if (element == null || element.isEmpty() || !this.isOwner(element.getChannel())) continue;
                                list.add(element);
                            }
                            ++cValues;
                            boolean fCompleted = queueRequest.completeElement(list, this::onReceiveComplete);
                            if (fCompleted) continue;
                            while ((e = (CommittableElement)list.pollLast()) != null) {
                                queuePrefetched.offerFirst(e);
                            }
                            continue;
                        }
                        element = queuePrefetched.poll();
                        if (element == null || element.isEmpty() || !this.isOwner(element.getChannel())) continue;
                        ++cValues;
                        boolean fCompleted = queueRequest.completeElement(element, this::onReceiveComplete);
                        if (fCompleted) continue;
                        queuePrefetched.offerFirst(element);
                    }
                }
                finally {
                    gate.exit();
                }
            }
        }
    }

    public int getReceiveQueueSize() {
        return this.f_queueReceiveOrders.size();
    }

    private Throwable onReceiveError(Throwable err, Object o) {
        this.m_cReceived.mark();
        this.m_cReceivedError.mark();
        return new TopicException(err);
    }

    private void onReceiveComplete(Object o) {
        if (o == null) {
            this.m_cReceived.mark();
            this.m_cReceivedEmpty.mark();
            return;
        }
        if (o instanceof Subscriber.Element) {
            this.m_cReceived.mark();
            this.onReceiveComplete((Subscriber.Element)o);
        } else if (o instanceof Collection) {
            this.m_cReceived.mark();
            for (Subscriber.Element e : (Collection)o) {
                this.onReceiveComplete(e);
            }
        }
    }

    private void onReceiveComplete(Subscriber.Element<?> element) {
        int c = element.getChannel();
        if (this.m_aChannel[c].isOwned()) {
            this.m_aChannel[c].m_lastReceived = (PagedPosition)element.getPosition();
            this.m_aChannel[c].m_cReceived.mark();
        }
    }

    private CompletableFuture<Subscriber.CommitResult> commitInternal(int nChannel, PagedPosition position, Map<Integer, Subscriber.CommitResult> mapResult) {
        try {
            long lPage = position.getPage();
            int cPart = this.m_caches.getPartitionCount();
            int nPart = ((PartitionedService)((Object)this.m_caches.Subscriptions.getCacheService())).getKeyPartitioningStrategy().getKeyPartition(new Page.Key(nChannel, lPage));
            this.scheduleHeadIncrement(this.m_aChannel[nChannel], lPage - 1L).join();
            Set<Subscription.Key> setKeys = this.m_aChannel[nChannel].ensureSubscriptionKeys(cPart, this.f_subscriberGroupId);
            CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> this.m_caches.Subscriptions.invokeAll(setKeys, new CommitProcessor(position, this.f_id)), Daemons.commonPool());
            return future.handle((map, err) -> {
                Subscriber.CommitResult result;
                if (err == null) {
                    Subscription.Key key = new Subscription.Key(nPart, nChannel, this.f_subscriberGroupId);
                    result = (Subscriber.CommitResult)map.get(key);
                } else {
                    Logger.err("Commit failure", err);
                    result = new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.Rejected, (Throwable)err);
                }
                if (mapResult != null) {
                    mapResult.put(nChannel, result);
                }
                this.m_aChannel[nChannel].committed(position);
                return result;
            });
        }
        catch (Throwable thrown) {
            CompletableFuture<Subscriber.CommitResult> future = new CompletableFuture<Subscriber.CommitResult>();
            future.completeExceptionally(thrown);
            return future;
        }
    }

    private Position seekChannel(int nChannel, PagedPosition pagedPosition) {
        SeekProcessor.Result result = this.seekInternal(nChannel, pagedPosition);
        Position seekPosition = this.updateSeekedChannel(nChannel, result);
        return seekPosition == null ? PagedPosition.NULL_POSITION : seekPosition;
    }

    private Position updateSeekedChannel(int nChannel, SeekProcessor.Result result) {
        PagedPosition positionHead = result.getHead();
        PagedPosition seekPosition = result.getSeekPosition();
        if (positionHead != null) {
            this.m_aChannel[nChannel].m_lHead = positionHead.getPage();
            this.m_aChannel[nChannel].m_nNext = positionHead.getOffset();
        }
        this.m_queueValuesPrefetched.removeIf(e -> e.getChannel() == nChannel);
        return seekPosition;
    }

    private SeekProcessor.Result seekInternal(int nChannel, PagedPosition position) {
        Set<Subscription.Key> setKeys = this.m_aChannel[nChannel].ensureSubscriptionKeys(this.m_caches.getPartitionCount(), this.f_subscriberGroupId);
        Map<Subscription.Key, SeekProcessor.Result> mapResult = this.m_caches.Subscriptions.invokeAll(setKeys, new SeekProcessor(position, this.f_id));
        return mapResult.values().stream().filter(Objects::nonNull).sorted().findFirst().orElse(null);
    }

    private Map<Integer, Position> seekInternal(SeekRequest request) {
        SeekType type = request.getType();
        int[] anChannel = request.getChannels();
        if (anChannel == null || anChannel.length == 0) {
            return new HashMap<Integer, Position>();
        }
        this.ensureActiveAnOwnedChannels(anChannel);
        switch (type.ordinal()) {
            case 0: {
                ReflectionExtractor extractorChannel = new ReflectionExtractor("getChannelId", new Object[0], 1);
                ReflectionExtractor extractorPage = new ReflectionExtractor("getPageId", new Object[0], 1);
                Map mapHeads = (Map)this.m_caches.Pages.aggregate(GroupAggregator.createInstance(extractorChannel, new LongMin(extractorPage)));
                HashMap<Integer, Position> mapSeek = new HashMap<Integer, Position>();
                for (int nChannel : anChannel) {
                    mapSeek.put(nChannel, new PagedPosition((Long)mapHeads.get(nChannel), -1));
                }
                return this.seekInternal(mapSeek);
            }
            case 1: {
                Map<Integer, Position> mapTails = this.m_caches.getTails();
                return this.seekInternal(this.filterForChannel(mapTails, anChannel));
            }
            case 2: {
                return this.seekInternal(request.getPositions());
            }
            case 3: {
                HashMap<Integer, Position> mapPosition = new HashMap<Integer, Position>();
                Instant instant = request.getInstant();
                for (int nChannel : request.getChannels()) {
                    Position position = this.seekInternal(nChannel, instant);
                    mapPosition.put(nChannel, position);
                }
                return mapPosition;
            }
        }
        throw new IllegalArgumentException("Invalid SeekType " + String.valueOf((Object)type));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<Integer, Position> seekInternal(Map<Integer, Position> mapPosition) {
        this.ensureActive();
        List<Integer> listUnallocated = mapPosition.keySet().stream().filter(c -> !this.isOwner((int)c)).toList();
        if (!listUnallocated.isEmpty()) {
            throw new IllegalStateException("Subscriber is not allocated channels " + String.valueOf(listUnallocated));
        }
        try {
            this.f_queueReceiveOrders.pause();
            HashMap<Integer, PagedPosition> mapSeek = new HashMap<Integer, PagedPosition>();
            for (Map.Entry<Integer, Position> entry : mapPosition.entrySet()) {
                Integer nChannel = entry.getKey();
                Position position = entry.getValue();
                if (position instanceof PagedPosition) {
                    mapSeek.put(nChannel, (PagedPosition)position);
                    continue;
                }
                throw new IllegalArgumentException("Invalid position type for channel " + nChannel);
            }
            HashMap<Integer, Position> mapResult = new HashMap<Integer, Position>();
            for (Map.Entry entry : mapSeek.entrySet()) {
                int nChannel = (Integer)entry.getKey();
                SeekProcessor.Result result = this.seekInternal(nChannel, (PagedPosition)entry.getValue());
                Position seekPosition = this.updateSeekedChannel(nChannel, result);
                mapResult.put(nChannel, seekPosition);
            }
            HashMap<Integer, Position> hashMap = mapResult;
            return hashMap;
        }
        finally {
            this.f_queueReceiveOrders.resetTrigger();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Position seekInternal(int nChannel, Instant timestamp) {
        if (!this.isOwner(nChannel)) {
            throw new IllegalStateException("Subscriber is not allocated channel " + nChannel);
        }
        Objects.requireNonNull(timestamp);
        try {
            this.f_queueReceiveOrders.pause();
            ValueExtractor extractorChannel = Page.ElementExtractor.chained(Subscriber.Element::getChannel);
            ValueExtractor extractorTimestamp = Page.ElementExtractor.chained(Subscriber.Element::getTimestamp);
            ValueExtractor extractorPosition = Page.ElementExtractor.chained(Subscriber.Element::getPosition);
            Binary bin = (Binary)this.m_caches.Data.aggregate(Filters.equal(extractorChannel, Integer.valueOf(nChannel)).and(Filters.greater(extractorTimestamp, timestamp)), new ComparableMin(extractorPosition));
            PagedPosition position = (PagedPosition)this.m_caches.getService().getBackingMapManager().getContext().getValueFromInternalConverter().convert(bin);
            if (position == null) {
                Position position2 = this.seekToTail(nChannel).get(nChannel);
                return position2;
            }
            int nOffset = position.getOffset();
            PagedPosition positionSeek = nOffset == 0 ? new PagedPosition(position.getPage() - 1L, Integer.MAX_VALUE) : new PagedPosition(position.getPage(), nOffset - 1);
            Position position3 = this.seekChannel(nChannel, positionSeek);
            return position3;
        }
        finally {
            this.f_queueReceiveOrders.resetTrigger();
        }
    }

    private void ensureActiveAnOwnedChannels(int ... anChannel) {
        List<Integer> listUnallocated;
        this.ensureActive();
        if (anChannel != null && anChannel.length > 0 && !(listUnallocated = Arrays.stream(anChannel).filter(c -> !this.isOwner(c)).boxed().toList()).isEmpty()) {
            throw new IllegalArgumentException("One or more channels are not allocated to this subscriber " + String.valueOf(listUnallocated));
        }
    }

    private Map<Integer, Position> filterForChannel(Map<Integer, Position> mapPosition, int ... anChannel) {
        HashMap<Integer, Position> mapChannel = new HashMap<Integer, Position>();
        for (int nChannel : anChannel) {
            Position position = mapPosition.get(nChannel);
            if (position == null) continue;
            mapChannel.put(nChannel, position);
        }
        return mapChannel;
    }

    private Map<Integer, Position> getHeadsInternal() {
        this.ensureActive();
        HashMap<Integer, Position> mapHeads = new HashMap<Integer, Position>();
        int[] anChannel = this.getChannels();
        for (int nChannel : anChannel) {
            mapHeads.put(nChannel, this.m_aChannel[nChannel].getHead());
        }
        Object object = this.m_queueValuesPrefetched.iterator();
        while (object.hasNext()) {
            CommittableElement element = (CommittableElement)object.next();
            int nChannel = element.getChannel();
            if (!mapHeads.containsKey(nChannel)) continue;
            Position positionCurrent = (Position)mapHeads.get(nChannel);
            PagedPosition position = (PagedPosition)element.getPosition();
            if (nChannel == -1 || position == null || position.getPage() == -1L || positionCurrent != null && positionCurrent.compareTo(position) <= 0) continue;
            mapHeads.put(nChannel, position);
        }
        return mapHeads;
    }

    private Map<Integer, Position> getTailsInternal() {
        int[] anChannel;
        Map<Integer, Position> map = this.m_caches.getTails();
        HashMap<Integer, Position> mapTails = new HashMap<Integer, Position>();
        for (int nChannel : anChannel = this.getChannels()) {
            mapTails.put(nChannel, map.getOrDefault(nChannel, this.m_aChannel[nChannel].getHead()));
        }
        return mapTails;
    }

    private Map<Integer, Position> getLastCommittedInternal() {
        this.ensureActive();
        Map<Integer, Position> mapCommit = this.m_caches.getLastCommitted(this.f_subscriberGroupId);
        int[] anChannels = this.m_aChannelOwned;
        HashMap<Integer, Position> mapResult = new HashMap<Integer, Position>();
        for (int nChannel : anChannels) {
            mapResult.put(nChannel, mapCommit.getOrDefault(nChannel, PagedPosition.NULL_POSITION));
        }
        return mapResult;
    }

    protected void ensureActive() {
        if (!this.isActive()) {
            throw new IllegalStateException("The subscriber is not active");
        }
    }

    public int getState() {
        return this.m_nState;
    }

    public String getStateName() {
        return PagedTopicSubscriber.getStateName(this.m_nState);
    }

    public static String getStateName(int nState) {
        if (nState >= 0 && nState < STATES.length) {
            return STATES[nState];
        }
        return "Unknown (" + nState + ")";
    }

    protected int setState(int nState) {
        try (Sentry ignored = this.f_gateState.close();){
            int nPrevState = this.m_nState;
            this.m_nState = nState;
            if (nPrevState != nState) {
                this.notifyStateChange(nState, nPrevState);
            }
            int n = nPrevState;
            return n;
        }
    }

    protected boolean casState(int nPrevState, int nState) {
        if (this.m_nState == nPrevState) {
            try (Sentry ignored = this.f_gateState.close();){
                if (this.m_nState == nPrevState) {
                    this.m_nState = nState;
                    this.notifyStateChange(nState, nPrevState);
                    boolean bl = true;
                    return bl;
                }
            }
        }
        return false;
    }

    private void notifyStateChange(int nState, int nPrevState) {
        for (EventListener listener : this.f_stateListeners.listeners()) {
            try {
                ((StateListener)listener).onStateChange(this, nState, nPrevState);
            }
            catch (Throwable t) {
                Logger.err(t);
            }
        }
    }

    public void addStateListener(StateListener listener) {
        this.f_stateListeners.add(listener);
    }

    public void removeStateListener(StateListener listener) {
        this.f_stateListeners.remove(listener);
    }

    public void connect() {
        this.ensureActive();
        this.ensureConnected();
    }

    protected void reconnectInternal() {
        if (this.getState() == 2 || !this.isActive()) {
            return;
        }
        try {
            if (this.m_caches.getService().isSuspended()) {
                Logger.finest("Skipping reconnect task, service is suspended for subscriber " + String.valueOf(this));
            } else if (this.f_queueReceiveOrders.size() > 0) {
                Logger.finest("Running reconnect task, reconnecting " + String.valueOf(this));
                this.ensureConnected();
                this.f_queueReceiveOrders.triggerOperations();
            } else {
                Logger.finest("Skipping reconnect task, no pending receives for subscriber " + String.valueOf(this));
            }
        }
        catch (Throwable t) {
            Logger.finest("Failed to reconnect subscriber " + String.valueOf(this), t);
        }
    }

    protected void ensureConnected() {
        if (this.isActive() && this.m_nState != 2) {
            try (Sentry ignored = this.f_gate.close();){
                this.ensureActive();
                PagedTopicDependencies dependencies = this.m_caches.getDependencies();
                long retry = dependencies.getReconnectRetryMillis();
                long now = System.currentTimeMillis();
                long timeout = now + dependencies.getReconnectTimeoutMillis();
                Throwable error = null;
                if (this.m_nState != 2) {
                    while (now < timeout && this.isActive()) {
                        try {
                            if (this.m_caches.getService().isSuspended()) {
                                Logger.finer("Skipping ensureConnected, service is suspended " + String.valueOf(this));
                                break;
                            }
                            this.m_caches.ensureConnected();
                            this.initialise();
                            error = null;
                            break;
                        }
                        catch (Throwable thrown) {
                            error = thrown;
                            if (error instanceof TopicException) break;
                            now = System.currentTimeMillis();
                            if (now >= timeout) continue;
                            Logger.info("Failed to reconnect subscriber, will retry in " + retry + " millis " + String.valueOf(this) + " due to " + error.getMessage());
                            Logger.finest(error);
                            try {
                                Thread.sleep(retry);
                            }
                            catch (InterruptedException interruptedException) {}
                        }
                    }
                    if (error == null) {
                        this.f_queueReceiveOrders.triggerOperations();
                    }
                }
                if (error != null) {
                    throw Exceptions.ensureRuntimeException(error);
                }
            }
        }
    }

    public boolean isDisconnected() {
        return this.m_nState == 3;
    }

    public boolean isConnected() {
        return this.m_nState == 2;
    }

    public void disconnect() {
        this.disconnectInternal(false);
    }

    /*
     * Unable to fully structure code
     */
    private void disconnectInternal(boolean fForceReconnect) {
        block10: {
            nTimestamp = this.m_connectionTimestamp;
            if (!this.isActive()) break block10;
            while (true) lbl-1000:
            // 6 sources

            {
                if ((nState = this.m_nState) == 2) {
                    ignored = this.f_gate.close();
                    try {
                        if (this.m_connectionTimestamp != nTimestamp) {
                            return;
                        }
                        if (!this.isActive() || !this.casState(nState, 3)) ** GOTO lbl-1000
                        this.m_fForceReconnect = fForceReconnect;
                        this.m_cDisconnect.mark();
                        if (!this.f_fAnonymous) {
                            this.m_listenerChannelAllocation.reset();
                        }
                        this.m_queueValuesPrefetched.clear();
                        dependencies = this.m_caches.getDependencies();
                        cWaitMillis = dependencies.getReconnectWaitMillis();
                        Logger.finest("Disconnected Subscriber " + String.valueOf(this));
                        this.f_daemon.scheduleTask(this.f_taskReconnect, TimeHelper.getSafeTimeMillis() + cWaitMillis);
                    }
                    finally {
                        if (ignored == null) ** GOTO lbl-1000
                        ignored.close();
                    }
                    continue;
                }
                if (nState == 3 || this.casState(nState, 3)) break;
            }
        }
    }

    public SubscriberGroupId getSubscriberGroupId() {
        return this.f_subscriberGroupId;
    }

    public void notifyChannel(int nChannel) {
        this.onChannelPopulatedNotification(new int[]{nChannel});
    }

    private void onChannelPopulatedNotification(MapEvent<?, ?> evt) {
        CompletableFuture.runAsync(() -> this.onChannelPopulatedNotification((int[])evt.getOldValue()), this.f_executorChannels);
    }

    protected void onChannelPopulatedNotification(int[] anChannel) {
        boolean fWasEmpty;
        if (anChannel == null || anChannel.length == 0) {
            return;
        }
        try (Sentry ignored = this.f_gate.close();){
            if (this.m_aChannel == null || !this.isActive()) {
                return;
            }
            ++this.m_cNotify;
            int nChannelCurrent = this.m_nChannel;
            fWasEmpty = nChannelCurrent < 0 || this.m_aChannel[nChannelCurrent].isEmpty();
            for (int nChannel : anChannel) {
                this.m_aChannel[nChannel].onChannelPopulatedNotification();
            }
        }
        CommittableElement element = this.m_queueValuesPrefetched.peek();
        if (element != null && element.isEmpty()) {
            this.m_queueValuesPrefetched.poll();
        }
        if (fWasEmpty) {
            this.switchChannel();
            this.f_queueReceiveOrders.triggerOperations();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onChannelEmpty(int nChannel, long lVersion) {
        if (this.isConnected()) {
            Gate<?> gate = this.f_gate;
            gate.enter(-1L);
            try {
                if (this.m_aChannel == null || !this.isActive() || !this.isConnected()) {
                    return;
                }
                this.m_aChannel[nChannel].setEmpty(lVersion);
            }
            finally {
                gate.exit();
            }
        }
    }

    protected CompletableFuture<Long> scheduleHeadIncrement(PagedTopicChannel channel, long lHeadAssumed) {
        if (this.isActive()) {
            return InvocableMapHelper.invokeAsync(this.m_caches.Subscriptions, channel.subscriberPartitionSync, this.m_caches.getUnitOfOrder(channel.subscriberPartitionSync.getPartitionId()), new HeadAdvancer(lHeadAssumed + 1L), this.f_executor, (lPriorHeadRemote, e2) -> {
                if (lPriorHeadRemote < lHeadAssumed + 1L) {
                    channel.m_fContended = false;
                } else {
                    if (lHeadAssumed != -1L && !channel.m_fContended) {
                        channel.m_fContended = true;
                    }
                    if (lPriorHeadRemote > channel.m_lHead) {
                        channel.m_lHead = lPriorHeadRemote;
                        channel.m_nNext = -1;
                    }
                }
            });
        }
        return CompletableFuture.completedFuture(-1L);
    }

    @Override
    public void heartbeat() {
        this.heartbeat(true);
    }

    private void heartbeat(boolean fAsync) {
        if (!this.f_fAnonymous) {
            UUID uuid = this.m_caches.getService().getCluster().getLocalMember().getUuid();
            this.f_heartbeatProcessor.setUuid(uuid);
            this.f_heartbeatProcessor.setSubscription(this.m_subscriptionId);
            this.f_heartbeatProcessor.setlConnectionTimestamp(this.m_connectionTimestamp);
            if (fAsync) {
                this.m_caches.Subscribers.async().invoke(this.f_key, this.f_heartbeatProcessor);
            } else {
                this.m_caches.Subscribers.invoke(this.f_key, this.f_heartbeatProcessor);
            }
        }
    }

    private void updateChannelOwnership(SortedSet<Integer> setChannel, boolean fLost) {
        block29: {
            if (!this.isActive()) {
                return;
            }
            int[] aChannel = setChannel.stream().mapToInt(i -> i).toArray();
            int nMaxChannel = setChannel.stream().mapToInt(i -> i).max().orElse(this.getChannelCount() - 1);
            try (Sentry ignored = this.f_gate.close();){
                CommittableElement element;
                if (!this.isActive()) {
                    return;
                }
                PagedTopicChannel[] aExistingChannel = this.m_aChannel;
                if (nMaxChannel >= aExistingChannel.length) {
                    Logger.finer(() -> String.format("Disconnecting subscriber %d on topic %s due to increase in channel count from %d to %d", this.f_id.getId(), this.f_topic.getName(), aExistingChannel.length, nMaxChannel));
                    this.disconnectInternal(true);
                    return;
                }
                if (Arrays.equals(this.m_aChannelOwned, aChannel)) break block29;
                Set<Integer> setRevoked = new HashSet();
                HashSet<Integer> setAdded = new HashSet<Integer>(setChannel);
                if (this.m_aChannelOwned != null && this.m_aChannelOwned.length > 0) {
                    for (int nChannel : this.m_aChannelOwned) {
                        setRevoked.add(nChannel);
                        setAdded.remove(nChannel);
                    }
                    setChannel.forEach(setRevoked::remove);
                }
                setRevoked = Collections.unmodifiableSet(setRevoked);
                Set<Integer> setAssigned = Set.copyOf(setChannel);
                Logger.finest(String.format("Subscriber %d (name=%s) channel allocation changed, assigned=%s added=%s revoked=%s", this.f_id.getId(), this.f_sIdentifyingName, setAssigned, setAdded, setRevoked));
                this.m_aChannelOwned = aChannel;
                if (!this.f_fAnonymous) {
                    int c;
                    if (this.m_nState == 0) {
                        for (PagedTopicChannel channel : this.m_aChannel) {
                            channel.setUnowned();
                            channel.setPopulated();
                        }
                    }
                    for (PagedTopicChannel channel : this.m_aChannel) {
                        channel.m_fContended = false;
                        channel.setUnowned();
                        channel.setPopulated();
                    }
                    for (PagedTopicChannel c2 : (Object)this.m_aChannelOwned) {
                        PagedTopicChannel channel = this.m_aChannel[c2];
                        channel.m_fContended = false;
                        channel.setOwned();
                        channel.setPopulated();
                    }
                    Object object = setAdded.iterator();
                    while (object.hasNext()) {
                        c = (Integer)object.next();
                        PagedTopicChannel channel = this.m_aChannel[c];
                        channel.clearPolled();
                        channel.clearHit();
                    }
                    object = setRevoked.iterator();
                    while (object.hasNext()) {
                        c = (Integer)object.next();
                        PagedTopicChannel channel = this.m_aChannel[c];
                        channel.clearPolled();
                        channel.clearHit();
                    }
                }
                if ((element = this.m_queueValuesPrefetched.peek()) != null && element.isEmpty()) {
                    this.m_queueValuesPrefetched.poll();
                }
                for (Subscriber.ChannelOwnershipListener listener : this.m_aChannelOwnershipListener) {
                    if (!setRevoked.isEmpty()) {
                        try {
                            if (fLost) {
                                listener.onChannelsLost(setRevoked);
                            } else {
                                listener.onChannelsRevoked(setRevoked);
                            }
                        }
                        catch (Throwable t) {
                            Logger.err(t);
                        }
                    }
                    if (setAssigned.isEmpty()) continue;
                    try {
                        listener.onChannelsAssigned(setAssigned);
                    }
                    catch (Throwable t) {
                        Logger.err(t);
                    }
                }
                this.onChannelPopulatedNotification(this.m_aChannelOwned);
            }
        }
    }

    protected int ensureOwnedChannel() {
        if (this.m_nChannel >= 0 && this.m_aChannel[this.m_nChannel].isOwned()) {
            return this.m_nChannel;
        }
        this.switchChannel();
        return this.m_nChannel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean switchChannel() {
        if (this.m_aChannel == null || !this.isActive() || !this.isConnected()) {
            return false;
        }
        Gate<?> gate = this.f_gate;
        gate.enter(-1L);
        try {
            int i;
            int nChannelStart;
            if (this.m_aChannel == null || !this.isActive() || !this.isConnected()) {
                boolean bl = false;
                return bl;
            }
            if (this.m_aChannelOwned.length == 0) {
                this.m_nChannel = -1;
                boolean bl = false;
                return bl;
            }
            if (this.m_aChannelOwned.length == 1) {
                int nChannel = this.m_aChannelOwned[0];
                if (this.m_aChannel[nChannel].isEmpty()) {
                    this.m_nChannel = -1;
                    boolean bl = false;
                    return bl;
                }
                this.m_nChannel = nChannel;
                boolean bl = true;
                return bl;
            }
            int nChannel = nChannelStart = this.m_nChannel;
            for (i = 0; i < this.m_aChannelOwned.length && this.m_aChannelOwned[i] <= nChannel; ++i) {
                if (this.m_aChannelOwned[i] != nChannel) continue;
                ++i;
                break;
            }
            if (i >= this.m_aChannelOwned.length) {
                i = 0;
            }
            nChannel = this.m_aChannelOwned[i];
            for (int cTried = 0; nChannel != nChannelStart && cTried < this.m_aChannel.length && (!this.m_aChannel[nChannel].isOwned() || this.m_aChannel[nChannel].isEmpty()); ++cTried) {
                if (++nChannel != this.m_aChannel.length) continue;
                nChannel = 0;
            }
            if (this.m_aChannel[nChannel].isOwned() && !this.m_aChannel[nChannel].isEmpty()) {
                this.m_nChannel = nChannel;
                boolean bl = true;
                return bl;
            }
            this.m_nChannel = -1;
            boolean bl = false;
            return bl;
        }
        finally {
            gate.exit();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onReceiveResult(PagedTopicChannel channel, long lVersion, long lPageId, PollProcessor.Result result, Throwable e) {
        int nChannel = channel.subscriberPartitionSync.getChannelId();
        this.f_receiveLock.lock();
        try {
            if (e == null) {
                Queue<Binary> queueValues = result.getElements();
                int cReceived = queueValues.size();
                int cRemaining = result.getRemainingElementCount();
                int nNext = result.getNextIndex();
                channel.setPolled();
                ++this.m_cPolls;
                if (cReceived == 0) {
                    ++this.m_cMisses;
                } else if (!queueValues.isEmpty()) {
                    channel.setHit();
                    this.m_cValues += (long)cReceived;
                    channel.adjustPolls(cReceived);
                    queueValues.stream().map(bin -> new CommittableElement((Binary)bin, nChannel)).forEach(this.m_queueValuesPrefetched::add);
                    if (!this.m_queueValuesPrefetched.isEmpty()) {
                        long nTime = System.currentTimeMillis();
                        channel.setFirstPolled((PagedPosition)this.m_queueValuesPrefetched.getFirst().getPosition(), nTime);
                        channel.setLastPolled((PagedPosition)this.m_queueValuesPrefetched.getLast().getPosition(), nTime);
                    }
                }
                channel.m_nNext = nNext;
                if (cRemaining == -1) {
                    if (lPageId >= channel.m_lHead && lPageId != -1L) {
                        channel.m_lHead = lPageId + 1L;
                        channel.m_nNext = 0;
                    }
                    if (lPageId == -1L) {
                        this.scheduleHeadIncrement(channel, lPageId);
                    }
                    this.switchChannel();
                } else if (cRemaining == 0 || cRemaining == -3) {
                    if (cRemaining == 0) {
                        this.onChannelEmpty(nChannel, lVersion);
                    }
                    if (!this.switchChannel()) {
                        if (this.f_fCompleteOnEmpty) {
                            this.m_queueValuesPrefetched.add(this.getEmptyElement());
                        } else {
                            ++this.m_cWait;
                        }
                    }
                } else if (cRemaining == -2) {
                    this.disconnectInternal(true);
                }
            } else {
                this.f_queueReceiveOrders.handleError((err, bin) -> e, BatchingOperationsQueue.OnErrorAction.CompleteWithException);
            }
        }
        finally {
            this.f_receiveLock.unlock();
        }
    }

    public static void destroy(PagedTopicCaches caches, SubscriberGroupId groupId, long lSubscriptionId) {
        PagedTopicService service = caches.getService();
        if (lSubscriptionId == 0L && !groupId.isAnonymous()) {
            lSubscriptionId = service.getSubscriptionId(caches.getTopicName(), groupId);
        }
        service.destroySubscription(lSubscriptionId);
        if (caches.isActive() && caches.Subscriptions.isActive()) {
            int cParts = service.getPartitionCount();
            HashSet<Subscription.Key> setSubParts = new HashSet<Subscription.Key>(cParts);
            for (int i = 0; i < cParts; ++i) {
                setSubParts.add(new Subscription.Key(i, 0, groupId));
            }
            DestroySubscriptionProcessor processor = new DestroySubscriptionProcessor(lSubscriptionId);
            InvocableMapHelper.invokeAllAsync(caches.Subscriptions, setSubParts, key -> caches.getUnitOfOrder(key.getPartitionId()), processor, new BiConsumer[0]).join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeInternal(boolean fDestroyed) {
        if (this.m_nState != 5) {
            try (Sentry ignored = this.f_gate.close();){
                this.setState(4);
            }
            try {
                this.unregisterMBean();
                this.f_queueReceiveOrders.close();
                this.f_queueReceiveOrders.cancelAllAndClose("Subscriber has been closed", null);
                try {
                    this.flushInternal(fDestroyed ? FlushMode.FLUSH_DESTROY : FlushMode.FLUSH).get(CLOSE_TIMEOUT_SECS, TimeUnit.SECONDS);
                }
                catch (TimeoutException e) {
                    this.flushInternal(FlushMode.FLUSH_CLOSE_EXCEPTIONALLY).join();
                    Logger.warn("Subscriber.close: timeout after waiting " + CLOSE_TIMEOUT_SECS + " seconds for completion with flush.join(), forcing complete exceptionally");
                }
                catch (InterruptedException | ExecutionException exception) {
                    // empty catch block
                }
                if (!fDestroyed) {
                    this.unregisterDeactivationListener();
                    this.unregisterChannelAllocationListener();
                    this.unregisterNotificationListener();
                    PagedTopicSubscriber.notifyClosed(this.m_caches.Subscriptions, this.f_subscriberGroupId, this.m_subscriptionId, this.f_id);
                    this.removeSubscriberEntry();
                } else {
                    PagedTopicSubscriber.notifyClosed(this.m_caches.Subscriptions, this.f_subscriberGroupId, this.m_subscriptionId, this.f_id);
                }
                if (!fDestroyed && this.f_subscriberGroupId.getMemberTimestamp() != 0L) {
                    PagedTopicSubscriber.destroy(this.m_caches, this.f_subscriberGroupId, this.m_subscriptionId);
                }
            }
            finally {
                this.setState(5);
                this.f_listOnCloseActions.forEach(action -> {
                    try {
                        action.run();
                    }
                    catch (Throwable t) {
                        Logger.finest(this.getClass().getName() + ".close(): handled onClose exception: " + t.getClass().getCanonicalName() + ": " + t.getMessage());
                    }
                });
                this.f_daemon.stop(true);
                this.f_daemonChannels.stop(false);
            }
        }
    }

    private CompletableFuture<Void> flushInternal(FlushMode mode) {
        String sTopicName = this.m_caches.getTopicName();
        String sDescription = null;
        switch (mode.ordinal()) {
            case 1: {
                sDescription = "Topic " + sTopicName + " was destroyed";
            }
            case 2: {
                String sReason = sDescription != null ? sDescription : "Force Close of Subscriber " + String.valueOf(this.f_id) + " for topic " + sTopicName;
                BiFunction<Throwable, Request, Throwable> fn = (err, bin) -> new TopicException(sReason, (Throwable)err);
                Arrays.stream(this.m_aChannel).forEach(channel -> this.f_queueReceiveOrders.handleError(fn, BatchingOperationsQueue.OnErrorAction.CompleteWithException));
                return CompletableFuture.completedFuture(null);
            }
        }
        return this.f_queueReceiveOrders.flush();
    }

    static void notifyClosed(NamedCache<Subscription.Key, Subscription> cache, SubscriberGroupId subscriberGroupId, long lSubscriptionId, SubscriberId subscriberId) {
        block9: {
            PagedTopicSubscription subscription;
            PagedTopicService service = (PagedTopicService)cache.getCacheService();
            if (lSubscriptionId == 0L) {
                String sTopicName = PagedTopicCaches.Names.getTopicName(cache.getCacheName());
                lSubscriptionId = service.getSubscriptionId(sTopicName, subscriberGroupId);
            }
            if ((subscription = service.getSubscription(lSubscriptionId)) != null && subscription.hasSubscriber(subscriberId)) {
                service.destroySubscription(lSubscriptionId, subscriberId);
            }
            if (!cache.isActive()) {
                return;
            }
            try {
                int cParts = service.getPartitionCount();
                ArrayList<Subscription.Key> listSubParts = new ArrayList<Subscription.Key>(cParts);
                for (int i = 0; i < cParts; ++i) {
                    listSubParts.add(new Subscription.Key(i, 0, subscriberGroupId));
                }
                if (cache.isActive()) {
                    try {
                        cache.invokeAll(listSubParts, new CloseSubscriptionProcessor(subscriberId));
                    }
                    catch (Exception exception) {}
                }
            }
            catch (Throwable t) {
                if (!cache.isActive()) break block9;
                String sId = SubscriberId.NullSubscriber.equals(subscriberId) ? "<ALL>" : PagedTopicSubscriber.idToString(subscriberId.getId());
                Logger.fine("Caught exception closing subscription for subscriber " + sId + " in group " + subscriberGroupId.getGroupName(), t);
            }
        }
    }

    protected void removeSubscriberEntry() {
        NamedCache<SubscriberInfo.Key, SubscriberInfo> cache = this.m_caches.Subscribers;
        if (!cache.isActive()) {
            return;
        }
        try {
            cache.invoke(this.f_key, EvictSubscriber.INSTANCE);
        }
        catch (Throwable t) {
            Logger.err(t);
        }
    }

    protected void registerChannelAllocationListener() {
        try {
            this.m_caches.f_topicService.addSubscriptionListener(this.m_listenerChannelAllocation);
        }
        catch (RuntimeException e) {
            Logger.err(e);
        }
    }

    protected void unregisterChannelAllocationListener() {
        try {
            this.m_caches.f_topicService.removeSubscriptionListener(this.m_listenerChannelAllocation);
        }
        catch (RuntimeException e) {
            Logger.err(e);
        }
    }

    protected void registerNotificationListener() {
        if (this.m_caches.Notifications.isActive()) {
            this.m_caches.Notifications.addMapListener((MapListener<NotificationKey, int[]>)this.f_listenerNotification, this.f_filterNotification, false);
        }
    }

    protected void unregisterNotificationListener() {
        if (this.m_caches.Notifications.isActive()) {
            this.m_caches.Notifications.removeMapListener((MapListener<NotificationKey, int[]>)this.f_listenerNotification, this.f_filterNotification);
        }
    }

    protected void registerDeactivationListener() {
        try {
            GroupDeactivationListener listenerGroup;
            if (!this.f_fAnonymous && (listenerGroup = this.m_listenerGroupDeactivation) != null) {
                this.m_caches.Subscriptions.addMapListener((MapListener<Subscription.Key, Subscription>)listenerGroup, this.m_aChannel[0].subscriberPartitionSync, true);
            }
            this.m_caches.addListener(this.f_listenerDeactivation);
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    protected void unregisterDeactivationListener() {
        try {
            GroupDeactivationListener listenerGroup = this.m_listenerGroupDeactivation;
            if (listenerGroup != null && this.m_caches.Subscriptions.isActive()) {
                this.m_caches.Subscriptions.removeMapListener((MapListener<Subscription.Key, Subscription>)listenerGroup, this.m_aChannel[0].subscriberPartitionSync);
            }
            this.m_caches.removeListener(this.f_listenerDeactivation);
        }
        catch (RuntimeException runtimeException) {
            // empty catch block
        }
    }

    protected void registerMBean() {
        MBeanHelper.registerSubscriberMBean(this);
    }

    protected void unregisterMBean() {
        MBeanHelper.unregisterSubscriberMBean(this);
    }

    CommittableElement getEmptyElement() {
        if (this.m_elementEmpty == null) {
            Binary binValue = ExternalizableHelper.toBinary(null, this.f_serializer);
            Binary binElement = PageElement.toBinary(-1, 0L, 0, 0L, binValue);
            this.m_elementEmpty = new CommittableElement(binElement, -1);
        }
        return this.m_elementEmpty;
    }

    public static long createId(long nNotificationId, long nMemberId) {
        return nMemberId << 32 | nNotificationId & 0xFFFFFFFFL;
    }

    public static int memberIdFromId(long nId) {
        return (int)(nId >> 32);
    }

    public static String idToString(long nId) {
        return nId + "/" + PagedTopicSubscriber.memberIdFromId(nId);
    }

    public static String idToString(SubscriberId id) {
        return id.getId() + "/" + id.getMemberId();
    }

    public static String idToString(Collection<Long> setId) {
        return setId.stream().map(PagedTopicSubscriber::idToString).collect(Collectors.joining(","));
    }

    public static String subscriberIdToString(Collection<SubscriberId> setId) {
        return setId.stream().map(PagedTopicSubscriber::idToString).collect(Collectors.joining(","));
    }

    public static int notificationIdFromId(long nId) {
        return (int)(nId & 0xFFFFFFFFL);
    }

    public static Subscriber.Option withIdentifyingName(String sName) {
        return new WithIdentifyingName(sName);
    }

    public static Subscriber.Option withNotificationId(int nId) {
        return () -> nId;
    }

    protected class DeactivationListener
    implements PagedTopicCaches.Listener {
        protected DeactivationListener() {
        }

        @Override
        public void onConnect() {
        }

        @Override
        public void onDisconnect() {
            PagedTopicSubscriber.this.disconnectInternal(false);
        }

        @Override
        public void onDestroy() {
            Logger.finest("Detected destroy of topic " + PagedTopicSubscriber.this.m_caches.getTopicName() + ", closing subscriber " + String.valueOf(PagedTopicSubscriber.this));
            CompletableFuture.runAsync(() -> PagedTopicSubscriber.this.closeInternal(true), PagedTopicSubscriber.this.f_executor);
        }

        @Override
        public void onRelease() {
            Logger.finest("Detected release of topic " + PagedTopicSubscriber.this.m_caches.getTopicName() + ", closing subscriber " + String.valueOf(PagedTopicSubscriber.this));
            CompletableFuture.runAsync(() -> PagedTopicSubscriber.this.closeInternal(true), PagedTopicSubscriber.this.f_executor);
        }
    }

    protected class GroupDeactivationListener
    extends AbstractMapListener {
        protected GroupDeactivationListener() {
        }

        @Override
        public void entryDeleted(MapEvent evt) {
            if (PagedTopicSubscriber.this.isActive()) {
                Logger.finest("Detected removal of subscriber group " + PagedTopicSubscriber.this.f_subscriberGroupId.getGroupName() + ", closing subscriber " + String.valueOf(PagedTopicSubscriber.this));
                CompletableFuture.runAsync(() -> PagedTopicSubscriber.this.closeInternal(true), PagedTopicSubscriber.this.f_executor);
            }
        }
    }

    public static interface WithNotificationId<V, U>
    extends Subscriber.Option<V, U> {
        public int getId();
    }

    protected class ChannelListener
    implements PagedTopicSubscription.Listener {
        private CountDownLatch m_latch;
        private final SubscriberId f_id;
        private final PagedTopicSubscription.Key f_key;

        public ChannelListener(SubscriberId id, PagedTopicSubscription.Key key) {
            this.f_id = id;
            this.f_key = key;
            this.m_latch = new CountDownLatch(1);
        }

        @Override
        public void onUpdate(PagedTopicSubscription subscription) {
            if (Objects.equals(subscription.getKey(), this.f_key)) {
                PagedTopicSubscriber.this.f_daemonChannels.executeTask(() -> this.onChannelAllocation(subscription));
            }
        }

        @Override
        public void onDelete(PagedTopicSubscription subscription) {
            if (Objects.equals(subscription.getKey(), this.f_key)) {
                PagedTopicSubscriber.this.f_daemonChannels.executeTask(() -> PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true));
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ChannelListener that = (ChannelListener)o;
            return Objects.equals(this.f_id, that.f_id) && Objects.equals(this.f_key, that.f_key);
        }

        public int hashCode() {
            return Objects.hash(this.f_id, this.f_key);
        }

        public void reset() {
            if (this.m_latch.getCount() == 0L) {
                PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
                this.m_latch = new CountDownLatch(1);
            }
        }

        private void onChannelAllocation(PagedTopicSubscription subscription) {
            if (!PagedTopicSubscriber.this.isActive()) {
                return;
            }
            SortedSet<Integer> setChannel = null;
            if (subscription.hasSubscriber(this.f_id)) {
                setChannel = subscription.getOwnedChannels(this.f_id);
            }
            if (setChannel != null && setChannel.isEmpty()) {
                PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
            } else if (setChannel != null) {
                PagedTopicSubscriber.this.updateChannelOwnership(setChannel, false);
                this.m_latch.countDown();
            } else if (PagedTopicSubscriber.this.isActive() && !PagedTopicSubscriber.this.f_fAnonymous && PagedTopicSubscriber.this.isConnected()) {
                Logger.finest("Disconnecting Subscriber (null channel set) " + String.valueOf(PagedTopicSubscriber.this));
                PagedTopicSubscriber.this.updateChannelOwnership(PagedTopicSubscription.NO_CHANNELS, true);
                PagedTopicSubscriber.this.disconnectInternal(false);
            }
        }
    }

    protected static class ReconnectTask
    implements Runnable {
        private final PagedTopicSubscriber<?> m_subscriber;
        private final AtomicInteger f_cExecution = new AtomicInteger();

        protected ReconnectTask(PagedTopicSubscriber<?> subscriber) {
            this.m_subscriber = subscriber;
        }

        @Override
        public void run() {
            this.m_subscriber.reconnectInternal();
            this.f_cExecution.incrementAndGet();
        }

        public int getExecutionCount() {
            return this.f_cExecution.get();
        }
    }

    public static class PagedTopicChannel
    implements Subscriber.Channel {
        public static final int HEAD_UNKNOWN = -1;
        volatile long m_lHead = -1L;
        AtomicLong m_lVersion = new AtomicLong();
        AtomicLong m_cNotify = new AtomicLong();
        int m_nNext = -1;
        volatile boolean m_fEmpty;
        Subscription.Key subscriberPartitionSync;
        boolean m_fContended;
        volatile boolean m_fOwned = true;
        Set<Subscription.Key> m_setSubscriptionKeys;
        PagedPosition m_lastReceived;
        long m_cPolls;
        PagedPosition m_firstPolled;
        long m_firstPolledTimestamp;
        PagedPosition m_lastPolled;
        long m_lastPolledTimestamp;
        PagedPosition m_lastCommit;
        long m_cCommited;
        Meter m_cReceived = new Meter();
        boolean m_fPolled;
        boolean m_fHit;
        private final Lock m_lock = new ReentrantLock();

        @Override
        public int getId() {
            return this.subscriberPartitionSync.getChannelId();
        }

        @Override
        public PagedPosition getLastCommit() {
            return this.m_lastCommit;
        }

        @Override
        public long getCommitCount() {
            return this.m_cCommited;
        }

        public void committed(PagedPosition position) {
            this.m_lastCommit = position;
            ++this.m_cCommited;
        }

        @Override
        public PagedPosition getLastReceived() {
            return this.m_lastReceived;
        }

        @Override
        public long getReceiveCount() {
            return this.m_cReceived.getCount();
        }

        public void received(PagedPosition position) {
            this.m_lastReceived = position;
            this.m_cReceived.mark();
        }

        @Override
        public long getPolls() {
            return this.m_cPolls;
        }

        public void adjustPolls(long c) {
            this.m_cPolls += c;
        }

        @Override
        public Position getFirstPolled() {
            return this.m_firstPolled;
        }

        public void setFirstPolled(PagedPosition position, long nTimestamp) {
            if (this.m_firstPolled == null) {
                this.m_firstPolled = position;
                this.m_firstPolledTimestamp = nTimestamp;
            }
        }

        @Override
        public long getFirstPolledTimestamp() {
            return this.m_firstPolledTimestamp;
        }

        @Override
        public Position getLastPolled() {
            return this.m_lastPolled;
        }

        public void setLastPolled(PagedPosition position, long nTimestamp) {
            this.m_lastPolled = position;
            this.m_lastPolledTimestamp = nTimestamp;
        }

        @Override
        public long getLastPolledTimestamp() {
            return this.m_lastPolledTimestamp;
        }

        @Override
        public boolean isEmpty() {
            return this.m_fEmpty;
        }

        @Override
        public boolean isOwned() {
            return this.m_fOwned;
        }

        @Override
        public int getOwnedCode() {
            return this.isOwned() ? 1 : 0;
        }

        @Override
        public PagedPosition getHead() {
            if (this.m_lHead == -1L) {
                return PagedPosition.NULL_POSITION;
            }
            return new PagedPosition(this.m_lHead, this.m_nNext);
        }

        @Override
        public long getReceived() {
            return this.m_cReceived.getCount();
        }

        @Override
        public double getReceivedMeanRate() {
            return this.m_cReceived.getMeanRate();
        }

        @Override
        public double getReceivedOneMinuteRate() {
            return this.m_cReceived.getOneMinuteRate();
        }

        @Override
        public double getReceivedFiveMinuteRate() {
            return this.m_cReceived.getFiveMinuteRate();
        }

        @Override
        public double getReceivedFifteenMinuteRate() {
            return this.m_cReceived.getFifteenMinuteRate();
        }

        protected void setEmpty(long lVersion) {
            this.m_lock.lock();
            try {
                if (this.m_lVersion.get() == lVersion) {
                    this.m_fEmpty = true;
                }
            }
            finally {
                this.m_lock.unlock();
            }
        }

        protected long getVersion() {
            this.m_lock.lock();
            try {
                long l = this.m_lVersion.get();
                return l;
            }
            finally {
                this.m_lock.unlock();
            }
        }

        protected void setOwned() {
            this.m_fOwned = true;
        }

        protected void setUnowned() {
            this.m_fOwned = false;
        }

        protected void onChannelPopulatedNotification() {
            this.m_cNotify.incrementAndGet();
            this.setPopulated();
        }

        protected void setPopulated() {
            this.m_lock.lock();
            try {
                this.m_lVersion.incrementAndGet();
                this.m_fEmpty = false;
            }
            finally {
                this.m_lock.unlock();
            }
        }

        public long getNotify() {
            return this.m_cNotify.get();
        }

        public void setPolled() {
            this.m_fPolled = true;
        }

        public void clearPolled() {
            this.m_fPolled = false;
        }

        public boolean isPolled() {
            return this.m_fPolled;
        }

        public void setHit() {
            this.m_fHit = true;
        }

        public void clearHit() {
            this.m_fHit = false;
        }

        public boolean isHit() {
            return this.m_fHit;
        }

        protected Set<Subscription.Key> ensureSubscriptionKeys(int nPart, SubscriberGroupId subscriberGroupId) {
            if (this.m_setSubscriptionKeys == null) {
                int nChannel = this.subscriberPartitionSync.getChannelId();
                HashSet<Subscription.Key> setKeys = new HashSet<Subscription.Key>();
                for (int p = 0; p < nPart; ++p) {
                    setKeys.add(new Subscription.Key(p, nChannel, subscriberGroupId));
                }
                this.m_setSubscriptionKeys = setKeys;
            }
            return this.m_setSubscriptionKeys;
        }

        public String toString() {
            return "Channel=" + this.subscriberPartitionSync.getChannelId() + ", owned=" + this.m_fOwned + ", empty=" + this.m_fEmpty + ", version=" + this.m_lVersion.get() + ", head=" + this.m_lHead + ", next=" + this.m_nNext + ", polls=" + this.m_cPolls + ", received=" + this.m_cReceived.getCount() + ", committed=" + this.m_cCommited + ", first=" + String.valueOf(this.m_firstPolled) + ", firstTimestamp=" + this.m_firstPolledTimestamp + ", last=" + String.valueOf(this.m_lastPolled) + ", lastTimestamp=" + this.m_lastPolledTimestamp + ", contended=" + this.m_fContended;
        }
    }

    public static class WithIdentifyingName
    implements Subscriber.Option {
        private final String f_sName;

        public WithIdentifyingName(String sName) {
            this.f_sName = sName;
        }

        public String getName() {
            return this.f_sName;
        }
    }

    protected static class ReceiveRequest
    implements Request {
        public static final ReceiveRequest SINGLE = new ReceiveRequest(false, 1);
        private final boolean f_fBatch;
        private final int f_cElement;

        protected ReceiveRequest(boolean fBatch, int cElement) {
            this.f_fBatch = fBatch;
            this.f_cElement = cElement;
        }

        public boolean isBatch() {
            return this.f_fBatch;
        }

        public int getElementCount() {
            return this.f_cElement;
        }
    }

    protected class CommittableElement
    implements Subscriber.Element<V> {
        public static final int EMPTY = -1;
        private final PageElement<V> m_element;
        private final int f_nChannel;

        protected CommittableElement(Binary binValue, int nChannel) {
            this.m_element = PageElement.fromBinary(binValue, PagedTopicSubscriber.this.f_serializer);
            this.f_nChannel = nChannel;
        }

        PageElement<V> getElement() {
            return this.m_element;
        }

        @Override
        public V getValue() {
            return this.m_element.getValue();
        }

        @Override
        public Binary getBinaryValue() {
            return this.m_element.getBinaryValue();
        }

        @Override
        public int getChannel() {
            return this.f_nChannel;
        }

        @Override
        public Position getPosition() {
            return this.m_element.getPosition();
        }

        @Override
        public Instant getTimestamp() {
            return this.m_element.getTimestamp();
        }

        @Override
        public CompletableFuture<Subscriber.CommitResult> commitAsync() {
            try {
                return PagedTopicSubscriber.this.commitAsync(this.getChannel(), this.getPosition());
            }
            catch (Throwable e) {
                Subscriber.CommitResult result = new Subscriber.CommitResult(0, null, e);
                return CompletableFuture.completedFuture(result);
            }
        }

        public boolean isEmpty() {
            return this.getChannel() == -1;
        }

        public String toString() {
            return "Element(channel=" + this.f_nChannel + ", position=" + String.valueOf(this.getPosition()) + ", timestamp=" + String.valueOf(this.getTimestamp()) + ", value=" + String.valueOf(this.getValue()) + ")";
        }
    }

    protected static class GetPositionRequest
    extends FunctionalRequest {
        private final PositionType f_type;

        public GetPositionRequest(PositionType type) {
            this.f_type = type;
        }

        @Override
        protected void execute(PagedTopicSubscriber<?> subscriber, BatchingOperationsQueue<Request, ?> queue) {
            queue.completeElement(switch (this.f_type.ordinal()) {
                case 0 -> subscriber.getHeadsInternal();
                case 1 -> subscriber.getTailsInternal();
                case 2 -> subscriber.getLastCommittedInternal();
                default -> throw new IllegalStateException("Unexpected value: " + String.valueOf((Object)this.f_type));
            }, this::onRequestComplete);
        }
    }

    protected static enum PositionType {
        Head,
        Tail,
        Committed;

    }

    protected static class SeekRequest
    extends FunctionalRequest {
        protected final SeekType m_type;
        protected final Map<Integer, Position> m_mapPosition;
        protected final Instant m_instant;
        protected final int[] m_anChannel;

        public SeekRequest(SeekType type, int ... anChannel) {
            this(type, null, null, anChannel);
        }

        public SeekRequest(Map<Integer, Position> mapPosition) {
            this(SeekType.Position, mapPosition, null, new int[0]);
        }

        public SeekRequest(Instant instant, int ... anChannel) {
            this(SeekType.Instant, null, instant, anChannel);
        }

        private SeekRequest(SeekType type, Map<Integer, Position> mapPosition, Instant instant, int ... anChannel) {
            switch (type.ordinal()) {
                case 2: {
                    if (mapPosition == null) {
                        throw new IllegalArgumentException("Seek request of type " + String.valueOf((Object)type) + " require a position");
                    }
                    anChannel = mapPosition.keySet().stream().mapToInt(Integer::intValue).toArray();
                    break;
                }
                case 3: {
                    if (instant != null) break;
                    throw new IllegalArgumentException("Seek request of type " + String.valueOf((Object)type) + " require an instant");
                }
            }
            this.m_type = type;
            this.m_mapPosition = mapPosition;
            this.m_instant = instant;
            this.m_anChannel = anChannel;
        }

        @Override
        protected void execute(PagedTopicSubscriber<?> subscriber, BatchingOperationsQueue<Request, ?> queueBatch) {
            Map<Integer, Position> map = subscriber.seekInternal(this);
            queueBatch.completeElement(map, this::onRequestComplete);
        }

        public SeekType getType() {
            return this.m_type;
        }

        public Map<Integer, Position> getPositions() {
            return this.m_mapPosition;
        }

        public Instant getInstant() {
            return this.m_instant;
        }

        public int[] getChannels() {
            return this.m_anChannel;
        }
    }

    protected static enum SeekType {
        Head,
        Tail,
        Position,
        Instant;

    }

    protected static interface Request {
    }

    protected static abstract class FunctionalRequest
    implements Request {
        protected FunctionalRequest() {
        }

        protected abstract void execute(PagedTopicSubscriber<?> var1, BatchingOperationsQueue<Request, ?> var2);

        protected void onRequestComplete(Object o) {
        }

        protected Throwable onRequestError(Throwable err, Object o) {
            return new TopicException(err);
        }
    }

    public static interface StateListener
    extends EventListener {
        public void onStateChange(PagedTopicSubscriber<?> var1, int var2, int var3);
    }

    static enum FlushMode {
        FLUSH,
        FLUSH_DESTROY,
        FLUSH_CLOSE_EXCEPTIONALLY;

    }

    public static class TimeoutInterceptor
    implements EventDispatcherAwareInterceptor<EntryEvent<SubscriberInfo.Key, SubscriberInfo>> {
        private static final AtomicInteger f_instance = new AtomicInteger();
        private final Executor f_executor = Executors.newSingleThreadScheduledExecutor(runnable -> {
            String sName = "PagedTopic:SubscriberTimeoutInterceptor:" + f_instance.incrementAndGet();
            return Base.makeThread(null, runnable, sName);
        });

        @Override
        public void introduceEventDispatcher(String sIdentifier, EventDispatcher dispatcher) {
            String sCacheName;
            if (dispatcher instanceof PartitionedCacheDispatcher && PagedTopicCaches.Names.SUBSCRIBERS.equals(PagedTopicCaches.Names.fromCacheName(sCacheName = ((PartitionedCacheDispatcher)dispatcher).getCacheName()))) {
                dispatcher.addEventInterceptor(sIdentifier, this, Collections.singleton(EntryEvent.Type.REMOVED), true);
            }
        }

        @Override
        public void onEvent(EntryEvent<SubscriberInfo.Key, SubscriberInfo> event) {
            if (event.getType() == EntryEvent.Type.REMOVED) {
                SubscriberInfo.Key key = event.getKey();
                long nId = key.getSubscriberId();
                SubscriberGroupId groupId = key.getGroupId();
                Logger.finest(String.format("Cleaning up subscriber %d in group '%s' owned by member %d", key.getSubscriberId(), groupId.getGroupName(), PagedTopicSubscriber.memberIdFromId(nId)));
                this.f_executor.execute(() -> this.processSubscriberRemoval(event));
            }
        }

        private void processSubscriberRemoval(EntryEvent<SubscriberInfo.Key, SubscriberInfo> event) {
            SubscriberInfo.Key key = event.getKey();
            SubscriberInfo info = event.getOriginalValue();
            long nId = key.getSubscriberId();
            SubscriberGroupId groupId = key.getGroupId();
            String sTopicName = PagedTopicCaches.Names.getTopicName(event.getCacheName());
            String sSubscriptions = PagedTopicCaches.Names.SUBSCRIPTIONS.cacheNameForTopicName(sTopicName);
            PagedTopicService topicService = (PagedTopicService)event.getService();
            int nMember = PagedTopicSubscriber.memberIdFromId(nId);
            long lTimestamp = info.getConnectionTimestamp();
            if (event.getEntry().isSynthetic()) {
                Logger.finest(String.format("Subscriber expired after %d ms - groupId='%s', memberId=%d, notificationId=%d, last heartbeat at %s", info.getTimeoutMillis(), groupId.getGroupName(), nMember, PagedTopicSubscriber.notificationIdFromId(nId), info.getLastHeartbeat()));
            } else {
                boolean fManual = topicService.getInfo().getServiceMembers().stream().anyMatch(m -> m.getId() == nMember);
                Object sReason = fManual ? "manual removal of subscriber(s)" : "departure of member " + nMember;
                Logger.finest(String.format("Subscriber %d in group '%s' removed due to %s", nId, groupId.getGroupName(), sReason));
            }
            SubscriberId subscriberId = new SubscriberId(nId, info.getOwningUid());
            long lSubscriptionId = topicService.getSubscriptionId(sTopicName, groupId);
            PagedTopicSubscription subscription = topicService.getSubscription(lSubscriptionId);
            if (subscription == null || subscription.getSubscriberTimestamp(subscriberId) <= lTimestamp) {
                PagedTopicSubscriber.notifyClosed(topicService.ensureCache(sSubscriptions, null), groupId, lSubscriptionId, subscriberId);
            }
        }
    }
}

