/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Converter;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.collections.Arrays;
import com.oracle.coherence.common.util.Duration;
import com.oracle.coherence.common.util.SafeClock;
import com.tangosol.coherence.config.Config;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicBackingMapManager;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicDependencies;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber;
import com.tangosol.internal.net.topic.impl.paged.agent.EnsureSubscriptionProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.OfferProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.PollProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.SeekProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.SubscriberHeartbeatProcessor;
import com.tangosol.internal.net.topic.impl.paged.model.ContentKey;
import com.tangosol.internal.net.topic.impl.paged.model.NotificationKey;
import com.tangosol.internal.net.topic.impl.paged.model.Page;
import com.tangosol.internal.net.topic.impl.paged.model.PageElement;
import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
import com.tangosol.internal.net.topic.impl.paged.model.PagedTopicSubscription;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberInfo;
import com.tangosol.internal.net.topic.impl.paged.model.Subscription;
import com.tangosol.internal.net.topic.impl.paged.model.Usage;
import com.tangosol.internal.net.topic.impl.paged.statistics.PagedTopicStatistics;
import com.tangosol.net.BackingMapContext;
import com.tangosol.net.BackingMapManagerContext;
import com.tangosol.net.Member;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.cache.ConfigurableCacheMap;
import com.tangosol.net.partition.ObservableSplittingBackingMap;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Position;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.TopicException;
import com.tangosol.util.Binary;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.ConverterCollections;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.InvocableMapHelper;
import com.tangosol.util.MapNotFoundException;
import com.tangosol.util.ObservableMap;
import com.tangosol.util.UUID;
import com.tangosol.util.ValueExtractor;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;

public class PagedTopicPartition {
    public static final String PROP_PUBLISHER_NOTIFICATION_EXPIRY_MILLIS = "coherence.pagedTopic.publisherNotificationExpiry";
    public static final long PUBLISHER_NOTIFICATION_EXPIRY_MILLIS = new Duration(Config.getProperty("coherence.pagedTopic.publisherNotificationExpiry", "10s")).as(Duration.Magnitude.MILLI);
    protected final BackingMapManagerContext f_ctxManager;
    protected final PagedTopicService f_service;
    protected final String f_sName;
    protected final int f_nPartition;

    public PagedTopicPartition(BackingMapManagerContext ctxManager, String sName, int nPartition) {
        this.f_ctxManager = ctxManager;
        this.f_service = (PagedTopicService)ctxManager.getCacheService();
        this.f_sName = sName;
        this.f_nPartition = nPartition;
    }

    public long initialiseTopic(BinaryEntry<Usage.Key, Usage> entry) {
        Usage usage = (Usage)entry.getValue();
        if (usage == null) {
            usage = new Usage();
        }
        if (usage.getPublicationTail() == -1L) {
            long lPage = Math.abs(this.f_sName.hashCode() % this.getPartitionCount());
            usage.setPublicationTail(lPage);
            entry.setValue(usage);
        }
        return usage.getPublicationTail();
    }

    public static PagedTopicPartition ensureTopic(BinaryEntry entry) {
        BackingMapContext ctx = entry.getBackingMapContext();
        return new PagedTopicPartition(ctx.getManagerContext(), PagedTopicCaches.Names.getTopicName(ctx.getCacheName()), ctx.getManagerContext().getKeyPartition(entry.getBinaryKey()));
    }

    protected OfferProcessor.Result onStartOfPage(int nChannel, long lPage, Page page, int nNotifyPostFull, int cElements, long cbCapServer, int cbCapPage) {
        this.cleanupNonDurableSubscribers(this.peekUsage(nChannel).getAnonymousSubscribers());
        if (!page.isSubscribed() && !this.getDependencies().isRetainConsumed()) {
            page.setSealed(true);
            this.removePageIfNotRetainingElements(nChannel, lPage);
            return new OfferProcessor.Result(OfferProcessor.Result.Status.PageSealed, cElements, cbCapPage, 0);
        }
        if (cbCapServer > 0L && this.getStorageBytes() >= cbCapServer) {
            if (nNotifyPostFull != 0) {
                this.requestRemovalNotification(nNotifyPostFull, nChannel);
            }
            int c = this.getChannelCount();
            for (int i = 0; i < c; ++i) {
                Usage usage = this.peekUsage(i);
                long lTail = usage.getPartitionTail();
                if (lTail == -1L || lTail != usage.getPartitionHead()) continue;
                Page pageSeal = this.enlistPage(i, usage.getPartitionHead());
                pageSeal.setSealed(true);
                this.notifyAll(pageSeal.resetInsertionNotifiers());
            }
            return new OfferProcessor.Result(OfferProcessor.Result.Status.TopicFull, 0, cbCapPage, -1);
        }
        return null;
    }

    public OfferProcessor.Result offerToPageTail(BinaryEntry<Page.Key, Page> entry, OfferProcessor processor) {
        int cbRemainingCapacity;
        int nTailStart;
        OfferProcessor.Result result;
        Page page;
        int cbCapPageDyn;
        Page.Key keyPage = (Page.Key)entry.getKey();
        int nChannel = keyPage.getChannelId();
        long lPage = keyPage.getPageId();
        PagedTopicDependencies configuration = this.getDependencies();
        int cbCapPage = configuration.getPageCapacity();
        long cbCapServer = configuration.getServerCapacity();
        List<Binary> listElements = processor.getElements();
        int nNotifyPostFull = processor.getNotifyPostFull();
        boolean fSealPage = processor.isSealPage();
        if (cbCapServer > 0L && nNotifyPostFull != 0 && cbCapPage > (cbCapPageDyn = (int)Math.min(Integer.MAX_VALUE, cbCapServer / 2L / (long)this.getLocalPartitionCount()))) {
            cbCapPage = Math.max(1, cbCapPageDyn);
        }
        if ((page = this.ensurePage(nChannel, entry)) == null || page.isSealed()) {
            return new OfferProcessor.Result(OfferProcessor.Result.Status.PageSealed, 0, 0, -1);
        }
        if (page.getTail() == -1 && (result = this.onStartOfPage(nChannel, lPage, page, nNotifyPostFull, listElements.size(), cbCapServer, cbCapPage)) != null) {
            return result;
        }
        BackingMapContext ctxContent = this.getBackingMapContext(PagedTopicCaches.Names.CONTENT);
        NamedTopic.ElementCalculator calculator = configuration.getElementCalculator();
        long cMillisExpiry = configuration.getElementExpiryMillis();
        int cbPage = page.getByteSize();
        int nTail = nTailStart = page.getTail();
        OfferProcessor.Result.Status status = OfferProcessor.Result.Status.Success;
        int cAccepted = 0;
        long lTimestamp = this.getClusterTime();
        Iterator<Binary> iter = listElements.iterator();
        while (iter.hasNext() && cbPage < cbCapPage) {
            Binary binValue = iter.next();
            Binary binKey = ContentKey.toBinary(this.f_nPartition, nChannel, lPage, ++nTail);
            BinaryEntry binElementEntry = (BinaryEntry)ctxContent.getBackingMapEntry(binKey);
            cbPage += calculator.calculateUnits(binValue);
            Binary binElement = PageElement.toBinary(nChannel, lPage, nTail, lTimestamp, binValue);
            binElementEntry.updateBinaryValue(binElement);
            if (cMillisExpiry > 0L) {
                binElementEntry.expire(cMillisExpiry);
            }
            ++cAccepted;
        }
        page.setTail(nTail);
        page.setTimestampTail(lTimestamp);
        if (nTailStart == -1) {
            page.setTimestampHead(lTimestamp);
        }
        if (cbPage >= cbCapPage || fSealPage) {
            page.setSealed(true);
            status = OfferProcessor.Result.Status.PageSealed;
            cbRemainingCapacity = cbCapPage;
        } else {
            cbRemainingCapacity = cbCapPage - cbPage;
        }
        page.setByteSize(cbPage);
        this.notifyAll(page.resetInsertionNotifiers());
        entry.setValue(page);
        this.getStatistics().onPublished(nChannel, cAccepted, new PagedPosition(keyPage.getPageId(), page.getTail()));
        return new OfferProcessor.Result(status, cAccepted, cbRemainingCapacity, nTailStart + 1);
    }

    protected Usage enlistUsage(int nChannel) {
        Usage usage;
        BinaryEntry<Usage.Key, Usage> entry = this.enlistBackingMapEntry(PagedTopicCaches.Names.USAGE, this.toBinaryKey(new Usage.Key(this.getPartition(), nChannel)));
        Usage usage2 = usage = entry == null ? null : (Usage)entry.getValue();
        if (usage == null) {
            usage = new Usage();
        }
        if (entry != null) {
            entry.setValue(usage);
        }
        return usage;
    }

    protected Usage peekUsage(int nChannel) {
        BinaryEntry<Usage.Key, Usage> entry = this.peekBackingMapEntry(PagedTopicCaches.Names.USAGE, this.toBinaryKey(new Usage.Key(this.getPartition(), nChannel)));
        return entry == null ? this.enlistUsage(nChannel) : (Usage)entry.getValue();
    }

    protected Page ensurePage(int nChannel, BinaryEntry<Page.Key, Page> entry) {
        String sCache;
        String sTopic;
        long cSubscription;
        boolean fFirstPage;
        Usage usage;
        if (entry.isPresent()) {
            return (Page)entry.getValue();
        }
        long lPage = ((Page.Key)entry.getKey()).getPageId();
        if (lPage <= (usage = this.enlistUsage(nChannel)).getPartitionMax()) {
            return null;
        }
        Page page = new Page();
        long lTailPrev = usage.getPartitionTail();
        usage.setPartitionTail(lPage);
        usage.setPartitionMax(lPage);
        if (lTailPrev == -1L) {
            usage.setPartitionHead(lPage);
            fFirstPage = true;
        } else {
            fFirstPage = false;
            page.incrementReferenceCount();
            Page pagePrev = this.enlistPage(nChannel, lTailPrev);
            if (pagePrev != null) {
                pagePrev.setNextPartitionPage(lPage);
            }
        }
        int c = usage.resetWaitingSubscriberCount();
        if (fFirstPage && c == 0 && (cSubscription = this.f_service.getSubscriptionCount(sTopic = PagedTopicCaches.Names.getTopicName(sCache = entry.getBackingMapContext().getCacheName()))) > 0L) {
            c = 1;
        }
        page.adjustReferenceCount(c);
        long nTime = this.getClusterTime();
        page.setTimestampHead(nTime);
        page.setTimestampTail(nTime);
        page.setPreviousPartitionPage(lTailPrev);
        entry.setValue(page);
        return page;
    }

    protected void cleanupSubscriberRegistrations() {
        int cChannel = this.getChannelCount();
        for (int nChannel = 0; nChannel < cChannel; ++nChannel) {
            int[] anNotifiers;
            Page page;
            Usage usage = this.enlistUsage(nChannel);
            long lTail = usage.getPartitionTail();
            if (lTail == -1L || (page = this.enlistPage(nChannel, lTail)) == null || (anNotifiers = page.getInsertionNotifiers()) == null || anNotifiers.length < 2) continue;
            int[] anNew = new int[anNotifiers.length - 2];
            int of = ThreadLocalRandom.current().nextInt(anNotifiers.length - 1);
            System.arraycopy(anNotifiers, 0, anNew, 0, of);
            System.arraycopy(anNotifiers, of + 2, anNew, of, anNew.length - of);
            this.notifyAll(new int[]{anNotifiers[of], anNotifiers[of + 1]});
            page.setInsertionNotifies(anNew);
        }
    }

    protected void cleanupNonDurableSubscribers(Collection<SubscriberGroupId> colAnon) {
        if (colAnon == null || colAnon.isEmpty()) {
            return;
        }
        HashMap<Long, List> mapDead = new HashMap<Long, List>();
        for (SubscriberGroupId subscriberGroupId : colAnon) {
            Long ldtMember = subscriberGroupId.getMemberTimestamp();
            List listPeer = mapDead.computeIfAbsent(ldtMember, k -> new ArrayList());
            listPeer.add(subscriberGroupId);
        }
        for (Member member : this.f_ctxManager.getCacheService().getInfo().getServiceMembers()) {
            mapDead.remove(member.getTimestamp());
        }
        if (!mapDead.isEmpty()) {
            for (List list : mapDead.values()) {
                for (SubscriberGroupId subscriberGroupId : list) {
                    this.removeSubscription(subscriberGroupId, 0L);
                }
            }
        }
    }

    public int getPartition() {
        return this.f_nPartition;
    }

    public int getPartitionCount() {
        return ((PartitionedService)((Object)this.f_ctxManager.getCacheService())).getPartitionCount();
    }

    public int getChannelCount() {
        return this.f_service.getChannelCount(this.f_sName);
    }

    public int getLocalPartitionCount() {
        PartitionedService service = (PartitionedService)((Object)this.f_ctxManager.getCacheService());
        return service.getOwnedPartitions(service.getCluster().getLocalMember()).cardinality();
    }

    public long getClusterTime() {
        return this.f_ctxManager.getCacheService().getCluster().getTimeMillis();
    }

    public long getStorageBytes() {
        ObservableMap mapBack = this.getBackingMapContext(PagedTopicCaches.Names.CONTENT).getBackingMap();
        if (mapBack instanceof ConfigurableCacheMap) {
            ConfigurableCacheMap cacheBack = (ConfigurableCacheMap)mapBack;
            return (long)cacheBack.getUnits() * (long)cacheBack.getUnitFactor();
        }
        throw new UnsupportedOperationException();
    }

    protected Page peekPage(int nChannel, long lPageId) {
        BinaryEntry<Page.Key, Page> entry = this.peekPageEntry(nChannel, lPageId);
        return entry == null ? null : (Page)entry.getValue();
    }

    protected BinaryEntry<Page.Key, Page> peekPageEntry(int nChannel, long lPageId) {
        return lPageId == -1L ? null : this.peekBackingMapEntry(PagedTopicCaches.Names.PAGES, this.toBinaryKey(new Page.Key(nChannel, lPageId)));
    }

    protected BinaryEntry<Page.Key, Page> enlistPageEntry(int nChannel, long lPage) {
        BinaryEntry entry = (BinaryEntry)this.getBackingMapContext(PagedTopicCaches.Names.PAGES).getBackingMapEntry(this.toBinaryKey(new Page.Key(nChannel, lPage)));
        Page page = (Page)entry.getValue();
        if (page != null) {
            entry.setValue(page);
        }
        return entry;
    }

    protected Page enlistPage(int nChannel, long lPage) {
        return (Page)this.enlistPageEntry(nChannel, lPage).getValue();
    }

    public boolean removePageIfNotRetainingElements(int nChannel, long lPage) {
        PagedTopicDependencies dependencies = this.getDependencies();
        if (dependencies.isRetainConsumed()) {
            return false;
        }
        return this.removePage(nChannel, lPage);
    }

    public boolean removePage(int nChannel, long lPage) {
        BinaryEntry<Page.Key, Page> entryNext;
        Page pageNext;
        BinaryEntry<Page.Key, Page> entryPage = this.enlistPageEntry(nChannel, lPage);
        Page page = (Page)entryPage.getValue();
        if (page == null) {
            return false;
        }
        BackingMapContext ctxElem = this.getBackingMapContext(PagedTopicCaches.Names.CONTENT);
        for (int nPos = page.getTail(); nPos >= 0; --nPos) {
            ctxElem.getBackingMapEntry(ContentKey.toBinary(this.f_nPartition, nChannel, lPage, nPos)).remove(false);
        }
        Usage usage = this.enlistUsage(nChannel);
        if (usage.getPartitionTail() == lPage) {
            usage.setPartitionHead(-1L);
            usage.setPartitionTail(-1L);
        } else {
            usage.setPartitionHead(page.getNextPartitionPage());
        }
        long lPageNext = page.getNextPartitionPage();
        if (lPageNext != -1L && (pageNext = (Page)(entryNext = this.enlistPageEntry(nChannel, lPage)).getValue()) != null) {
            pageNext.decrementReferenceCount();
            entryNext.setValue(pageNext);
        }
        entryPage.remove(false);
        this.notifyAll(usage.resetRemovalNotifiers());
        return true;
    }

    public void notifyAll(int[] anNotify) {
        if (anNotify != null) {
            BackingMapContext ctxNotify = this.getBackingMapContext(PagedTopicCaches.Names.NOTIFICATIONS);
            int nPart = this.getPartition();
            for (int nNotify : anNotify) {
                InvocableMap.Entry entry = ctxNotify.getBackingMapEntry(this.toBinaryKey(new NotificationKey(nPart, nNotify)));
                if (!entry.isPresent()) continue;
                entry.remove(false);
            }
        }
    }

    public void removeSubscription(SubscriberGroupId subscriberGroupId, long lSubscriptionId) {
        if (lSubscriptionId != 0L && this.f_service.hasSubscription(lSubscriptionId)) {
            this.f_service.destroySubscription(lSubscriptionId);
        }
        BackingMapContext ctxSubscriptions = this.getBackingMapContext(PagedTopicCaches.Names.SUBSCRIPTIONS);
        int c = this.getChannelCount();
        for (int nChannel = 0; nChannel < c; ++nChannel) {
            long lPage;
            Page page;
            BinaryEntry entrySub = (BinaryEntry)ctxSubscriptions.getBackingMapEntry(this.toBinaryKey(new Subscription.Key(this.getPartition(), nChannel, subscriberGroupId)));
            Subscription subscription = (Subscription)entrySub.getValue();
            if (subscription == null) {
                return;
            }
            Usage usage = this.enlistUsage(nChannel);
            entrySub.remove(false);
            if (subscriberGroupId.getMemberTimestamp() != 0L) {
                usage.removeAnonymousSubscriber(subscriberGroupId);
            }
            Page page2 = page = (lPage = subscription.getPage()) == -1L ? null : this.enlistPage(nChannel, lPage);
            if (subscription.getPosition() == Integer.MAX_VALUE || page == null) {
                lPage = page == null ? usage.getPartitionHead() : page.getNextPartitionPage();
                if (lPage == -1L) {
                    usage.decrementWaitingSubscriberCount();
                    page = null;
                } else {
                    page = this.enlistPage(nChannel, lPage);
                }
            }
            while (page != null && page.decrementReferenceCount() == 0) {
                this.removePageIfNotRetainingElements(nChannel, lPage);
                lPage = page.getNextPartitionPage();
                page = lPage == -1L ? null : this.enlistPage(nChannel, lPage);
            }
        }
    }

    public long[] ensureSubscription(Subscription.Key key, EnsureSubscriptionProcessor processor) {
        int nPhase = processor.getPhase();
        long[] alSubscriptionHeadGlobal = processor.getPages();
        Filter<?> filter = processor.getFilter();
        ValueExtractor<?, ?> converter = processor.getConverter();
        SubscriberId subscriberId = processor.getSubscriberId();
        boolean fReconnect = processor.isReconnect();
        boolean fCreateGroupOnly = processor.isCreateGroupOnly();
        long lSubscriptionId = processor.getSubscriptionId();
        BackingMapContext ctxSubscriptions = this.getBackingMapContext(PagedTopicCaches.Names.SUBSCRIPTIONS);
        PagedTopicDependencies dependencies = this.getDependencies();
        SubscriberGroupId subscriberGroupId = key.getGroupId();
        boolean fAnonymous = subscriberGroupId.getMemberTimestamp() != 0L;
        int cChannel = this.getChannelCount();
        long[] alResult = new long[cChannel];
        int cParts = this.getPartitionCount();
        if (!fAnonymous) {
            if (lSubscriptionId == 0L) {
                lSubscriptionId = this.f_service.ensureSubscription(this.f_sName, subscriberGroupId, subscriberId, filter, converter);
                processor.setSubscriptionId(lSubscriptionId);
            } else if (this.f_service.isSubscriptionDestroyed(lSubscriptionId)) {
                throw new IllegalStateException("The subscriber group " + subscriberGroupId.getGroupName() + " (id=" + lSubscriptionId + ") has been destroyed");
            }
        }
        if (fCreateGroupOnly) {
            if (fAnonymous) {
                throw new IllegalArgumentException("Cannot specify create group only action for an anonymous subscriber");
            }
            fReconnect = false;
        }
        HashMap<Integer, BinaryEntry> mapSubscriptionByChannel = new HashMap<Integer, BinaryEntry>();
        for (int nChannel = 0; nChannel < alResult.length; ++nChannel) {
            BinaryEntry entrySub = (BinaryEntry)ctxSubscriptions.getBackingMapEntry(this.toBinaryKey(new Subscription.Key(this.getPartition(), nChannel, subscriberGroupId)));
            mapSubscriptionByChannel.put(nChannel, entrySub);
        }
        switch (nPhase) {
            case 0: {
                this.cleanupSubscriberRegistrations();
                break;
            }
            case 1: {
                this.cleanupNonDurableSubscribers(this.enlistUsage(0).getAnonymousSubscribers());
                break;
            }
        }
        Subscription subscriptionZero = null;
        for (int nChannel = 0; nChannel < alResult.length; ++nChannel) {
            BinaryEntry entrySub = (BinaryEntry)mapSubscriptionByChannel.get(nChannel);
            Subscription subscription = (Subscription)entrySub.getValue();
            Usage usage = this.enlistUsage(nChannel);
            if (nPhase == 0) {
                if (subscription == null || subscription.getSubscriptionHead() == -1L) {
                    return null;
                }
                if (filter != null && !Objects.equals(subscription.getFilter(), filter)) {
                    throw new TopicException("Cannot change the Filter in existing Subscriber group \"" + subscriberGroupId.getGroupName() + "\" current=" + String.valueOf(subscription.getFilter()) + " new=" + String.valueOf(filter));
                }
                if (converter != null && !Objects.equals(subscription.getConverter(), converter)) {
                    throw new TopicException("Cannot change the converter in existing Subscriber group \"" + subscriberGroupId.getGroupName() + "\" current=" + String.valueOf(subscription.getFilter()) + " new=" + String.valueOf(filter));
                }
                PagedPosition headPosition = subscription.getHeadPosition();
                alResult[nChannel] = headPosition.getPage() == -1L ? subscription.getSubscriptionHead() : subscription.getRollbackPosition().getPage();
            } else if (nPhase == 1 && subscription == null) {
                long lPage;
                subscription = new Subscription(cChannel);
                if (fAnonymous) {
                    this.enlistUsage(nChannel);
                    usage.addAnonymousSubscriber(subscriberGroupId);
                }
                long l = lPage = dependencies.isRetainConsumed() ? usage.getPartitionHead() : usage.getPartitionTail();
                if (lPage == -1L) {
                    usage.incrementWaitingSubscriberCount();
                    lPage = usage.getPartitionMax();
                    subscription.setPage(lPage);
                    if (lPage != -1L) {
                        subscription.setPosition(Integer.MAX_VALUE);
                    }
                } else {
                    this.enlistPage(nChannel, lPage).incrementReferenceCount();
                    subscription.setPage(lPage);
                }
                subscription.setFilter(filter);
                subscription.setConverter(converter);
                entrySub.setValue(subscription);
                alResult[nChannel] = lPage;
            } else if (nPhase == 2 && subscription.getSubscriptionHead() == -1L) {
                position = subscription.getRollbackPosition();
                long lPage = position.getPage();
                Page page = lPage == -1L ? null : this.enlistPage(nChannel, lPage);
                long lHead = usage.getPartitionHead();
                if (page == null && lHead != -1L) {
                    lPage = lHead;
                    page = this.enlistPage(nChannel, lPage);
                    subscription.setPage(lPage);
                    subscription.setPosition(0);
                }
                while (lPage < alSubscriptionHeadGlobal[nChannel] && page != null) {
                    long lPageNext = page.getNextPartitionPage();
                    if (page.decrementReferenceCount() == 0) {
                        this.removePageIfNotRetainingElements(nChannel, lPage);
                    }
                    if (lPageNext == -1L) {
                        page = null;
                        usage.incrementWaitingSubscriberCount();
                        subscription.setPosition(Integer.MAX_VALUE);
                        continue;
                    }
                    lPage = lPageNext;
                    page = this.enlistPage(nChannel, lPage);
                    subscription.setPage(lPage);
                    page.incrementReferenceCount();
                }
                if (lPage == alSubscriptionHeadGlobal[nChannel] && page != null) {
                    int nPos = dependencies.isRetainConsumed() ? 0 : page.getTail() + 1;
                    subscription.setPosition(nPos);
                }
                subscription.setSubscriptionHead(alSubscriptionHeadGlobal[nChannel]);
                entrySub.setValue(subscription);
            } else {
                position = subscription.getRollbackPosition();
                long l = alResult[nChannel] = position.getOffset() == Integer.MAX_VALUE ? -1L : position.getPage();
            }
            if (!fCreateGroupOnly) {
                if (nChannel == 0) {
                    subscriptionZero = subscription;
                    if (fAnonymous) {
                        if (!subscriptionZero.hasSubscriber(subscriberId)) {
                            subscriptionZero.assignAll(subscriberId, cChannel, this.getMemberSet());
                        }
                    } else {
                        fReconnect = subscriptionZero.hasSubscriber(subscriberId);
                        PagedTopicSubscription pagedTopicSubscription = this.f_service.getSubscription(lSubscriptionId);
                        subscriptionZero.update(pagedTopicSubscription);
                    }
                }
                SubscriberId nOwner = subscriptionZero.getChannelOwner(nChannel);
                subscription.setOwningSubscriber(nOwner);
                this.getStatistics().getSubscriberGroupStatistics(subscriberGroupId).setOwningSubscriber(nChannel, nOwner);
                if (fReconnect && Objects.equals(nOwner, subscriberId)) {
                    subscription.rollback();
                }
            }
            entrySub.setValue(subscription);
        }
        return alResult;
    }

    public void closeSubscription(Subscription.Key key, SubscriberId subscriberId) {
        int cChannel;
        try {
            cChannel = this.getChannelCount();
        }
        catch (IllegalArgumentException ignored) {
            return;
        }
        BackingMapContext ctxSubscriptions = this.getBackingMapContext(PagedTopicCaches.Names.SUBSCRIPTIONS);
        long[] alResult = new long[cChannel];
        SubscriberGroupId subscriberGroupId = key.getGroupId();
        int cParts = this.getPartitionCount();
        int nSyncPartition = Subscription.getSyncPartition(subscriberGroupId, 0, cParts);
        boolean fSyncPartition = key.getPartitionId() == nSyncPartition;
        String sGroup = subscriberGroupId.getGroupName();
        Subscription subscriptionZero = null;
        for (int nChannel = 0; nChannel < alResult.length; ++nChannel) {
            BinaryEntry entrySub = (BinaryEntry)ctxSubscriptions.getBackingMapEntry(this.toBinaryKey(new Subscription.Key(this.getPartition(), nChannel, subscriberGroupId)));
            Subscription subscription = (Subscription)entrySub.getValue();
            if (subscription == null) continue;
            if (subscriptionZero == null) {
                Map<Integer, Set<SubscriberId>> mapRemoved;
                subscriptionZero = subscription;
                Map<Integer, Set<SubscriberId>> map = mapRemoved = SubscriberId.NullSubscriber.equals(subscriberId) ? subscriptionZero.removeAllSubscribers(cChannel, this.getMemberSet()) : subscriptionZero.removeSubscriber(subscriberId, cChannel, this.getMemberSet());
                if (fSyncPartition && !mapRemoved.isEmpty()) {
                    this.logRemoval(mapRemoved, this.f_sName, sGroup);
                }
                entrySub.setValue(subscription);
            }
            SubscriberId owner = subscriptionZero.getChannelOwner(nChannel);
            subscription.setOwningSubscriber(owner);
            this.getStatistics().getSubscriberGroupStatistics(subscriberGroupId).setOwningSubscriber(nChannel, owner);
            entrySub.setValue(subscription);
        }
    }

    private void logRemoval(Map<Integer, Set<SubscriberId>> mapRemoved, String sTopic, String sGroup) {
        for (Map.Entry<Integer, Set<SubscriberId>> entry : mapRemoved.entrySet()) {
            Logger.finest("Removed the following subscribers from topic '" + sTopic + "' owningMember=" + String.valueOf(entry.getKey()) + " [Group='" + sGroup + "' Subscribers=" + PagedTopicSubscriber.subscriberIdToString((Collection<SubscriberId>)entry.getValue()) + "]");
        }
    }

    private Set<Member> getMemberSet() {
        return this.f_ctxManager.getCacheService().getInfo().getServiceMembers();
    }

    public void heartbeat(InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry, SubscriberHeartbeatProcessor processor) {
        UUID uuid = processor.getUuid();
        long lSubscription = processor.getSubscription();
        long lTimestamp = processor.getConnectionTimestamp();
        PagedTopicDependencies dependencies = this.getDependencies();
        SubscriberInfo info = entry.isPresent() ? entry.getValue() : new SubscriberInfo();
        long cMillis = dependencies.getSubscriberTimeoutMillis();
        if (lTimestamp == 0L) {
            lTimestamp = SafeClock.INSTANCE.getSafeTimeMillis();
        }
        info.setlConnectionTimestamp(lTimestamp);
        info.setLastHeartbeat(LocalDateTime.now());
        info.setTimeoutMillis(cMillis);
        info.setOwningUid(uuid);
        info.setSubscriptionId(lSubscription);
        entry.setValue(info);
        ((BinaryEntry)entry).expire(cMillis);
    }

    public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subscription> entrySubscription, long lPage, int cReqValues, int nNotifierId, SubscriberId subscriberId) {
        PollProcessor.Result result;
        int nPos;
        int nPosCommitted;
        SubscriberId owner;
        int cChannel;
        PagedTopicDependencies dependencies = this.getDependencies();
        Subscription.Key keySubscription = (Subscription.Key)entrySubscription.getKey();
        int nChannel = keySubscription.getChannelId();
        if (nChannel >= (cChannel = this.getChannelCount()) || nChannel < 0) {
            return PollProcessor.Result.notAllocated(0);
        }
        if (!entrySubscription.isPresent() || entrySubscription.getValue() == null) {
            return PollProcessor.Result.unknownSubscriber();
        }
        PagedTopicSubscription pagedTopicSubscription = this.getPagedTopicSubscription(entrySubscription);
        int cChannelActual = this.getChannelCount();
        if (cChannel != cChannelActual) {
            return PollProcessor.Result.unknownSubscriber();
        }
        Subscription subscription = (Subscription)entrySubscription.getValue();
        SubscriberId subscriberId2 = owner = pagedTopicSubscription != null ? pagedTopicSubscription.getOwningSubscriber(nChannel) : subscription.getOwningSubscriber();
        if (!Objects.equals(owner, subscriberId)) {
            return PollProcessor.Result.notAllocated(Integer.MAX_VALUE);
        }
        if (!Objects.equals(subscriberId, subscription.getChannelOwner(nChannel))) {
            subscription.setOwningSubscriber(subscriberId);
        }
        subscription.setLastPolledSubscriber(subscriberId);
        if (lPage == -1L) {
            return PollProcessor.Result.exhausted(subscription);
        }
        Page page = this.peekPage(nChannel, lPage);
        PagedPosition posCommitted = subscription.getCommittedPosition();
        long lPageCommitted = posCommitted == null ? -1L : posCommitted.getPage();
        int n = nPosCommitted = posCommitted == null ? -1 : posCommitted.getOffset();
        if (lPage < lPageCommitted) {
            this.checkForPageCleanup(entrySubscription, lPage, page);
            return PollProcessor.Result.exhausted(subscription);
        }
        if (lPage == lPageCommitted && (page == null || page.isSealed() && page.getTail() <= nPosCommitted)) {
            this.checkForPageCleanup(entrySubscription, lPage, page);
            return PollProcessor.Result.exhausted(subscription);
        }
        long lPageThis = subscription.getPage();
        if (lPage == lPageThis) {
            nPos = subscription.getPosition();
            if (lPage == -1L || nPos == Integer.MAX_VALUE) {
                return PollProcessor.Result.exhausted(subscription);
            }
        } else {
            if (lPage < lPageThis) {
                return PollProcessor.Result.exhausted(subscription);
            }
            if (page == null && (page = this.ensurePage(nChannel, this.enlistPageEntry(nChannel, lPage))) == null) {
                return PollProcessor.Result.exhausted(subscription);
            }
            nPos = 0;
            subscription.setPage(lPage);
            subscription.setPosition(nPos);
        }
        entrySubscription.setValue(subscription);
        int nPosTail = page.getTail();
        BackingMapContext ctxElements = this.getBackingMapContext(PagedTopicCaches.Names.CONTENT);
        LinkedList<Binary> listValues = new LinkedList<Binary>();
        Filter<?> filter = subscription.getFilter();
        Function<?, ?> fnConvert = subscription.getConverter();
        int cbResult = 0;
        long cbLimit = dependencies.getMaxBatchSizeBytes();
        Converter converterFrom = this.getValueFromInternalConverter();
        Converter converterTo = this.getValueToInternalConverter();
        if (lPage == lPageCommitted && nPosCommitted != -1) {
            nPos = Math.max(nPos, nPosCommitted + 1);
        }
        while (cReqValues > 0 && nPos <= nPosTail && (long)cbResult < cbLimit) {
            Binary binPosKey = ContentKey.toBinary(this.f_nPartition, nChannel, lPage, nPos);
            BinaryEntry entryElement = (BinaryEntry)ctxElements.getReadOnlyEntry(binPosKey);
            Binary binValue = entryElement.getBinaryValue();
            if (binValue == null && (entryElement = ctxElements.getBackingMapEntry(binPosKey).asBinaryEntry()).isPresent()) {
                binValue = entryElement.getBinaryValue();
            }
            if (binValue != null && (filter == null || InvocableMapHelper.evaluateEntry(filter, entryElement))) {
                if (fnConvert != null) {
                    binValue = PageElement.fromBinary(binValue, converterFrom).convert(fnConvert, converterTo);
                }
                if (binValue != null) {
                    listValues.add(binValue);
                    cbResult += binValue.length();
                    --cReqValues;
                }
            }
            ++nPos;
        }
        long lPageNext = lPage;
        int nPosNext = nPos;
        if (nPos > nPosTail && page.isSealed()) {
            page = this.enlistPage(nChannel, lPage);
            lPageNext = page.getNextPartitionPage();
            nPosNext = 0;
            if (lPageNext == -1L) {
                this.enlistUsage(nChannel).incrementWaitingSubscriberCount();
                subscription.setPosition(Integer.MAX_VALUE);
            } else {
                subscription.setPage(lPageNext);
                subscription.setPosition(0);
            }
            result = new PollProcessor.Result(-1, nPos, listValues, subscription.getSubscriptionHead());
        } else {
            subscription.setPosition(nPos);
            if (nPos > nPosTail) {
                Page pageEnlisted = this.enlistPage(nChannel, lPage);
                int nPosTailLatest = pageEnlisted.getTail();
                if (nPosTail == nPosTailLatest) {
                    this.requestInsertionNotification(pageEnlisted, nNotifierId, nChannel);
                }
                nPosTail = nPosTailLatest;
            }
            result = new PollProcessor.Result(nPosTail - nPos + 1, nPos, listValues, subscription.getSubscriptionHead());
        }
        SubscriberGroupId subscriberGroupId = keySubscription.getGroupId();
        PagedPosition posHead = new PagedPosition(lPageNext, nPosNext);
        this.getStatistics().getSubscriberGroupStatistics(subscriberGroupId).onPolled(nChannel, listValues.size(), posHead);
        return result;
    }

    public void checkForPageCleanup(BinaryEntry<Subscription.Key, Subscription> entry, long lPage, Page page) {
        if (page == null) {
            return;
        }
        Subscription.Key key = (Subscription.Key)entry.getKey();
        int nPart = key.getPartitionId();
        ObservableMap backingMap = entry.getBackingMapContext().getBackingMap();
        Map partitionMap = ((ObservableSplittingBackingMap)backingMap).getPartitionMap(nPart);
        BackingMapManagerContext context = entry.getContext();
        ConverterCollections.ConverterMap map = ConverterCollections.getMap(partitionMap, context.getKeyFromInternalConverter(), context.getKeyToInternalConverter(), context.getValueFromInternalConverter(), context.getValueToInternalConverter());
        int nChannel = key.getChannelId();
        int cSubscription = 0;
        for (Map.Entry subscription : map.entrySet()) {
            Subscription.Key subscriptionKey = (Subscription.Key)subscription.getKey();
            if (subscriptionKey.getChannelId() != nChannel || Objects.equals(key, subscriptionKey) || ((Subscription)subscription.getValue()).getSubscriptionHead() > lPage) continue;
            ++cSubscription;
        }
        if (cSubscription == 0 && page.decrementReferenceCount() == 0) {
            Logger.fine(String.format("Removing previously fully committed page. Channel=%d Page=%d Group=%s", nChannel, lPage, key.getGroupId().getGroupName()));
            this.removePageIfNotRetainingElements(nChannel, lPage);
        }
    }

    public Subscriber.CommitResult commitPosition(BinaryEntry<Subscription.Key, Subscription> entrySubscription, Position position, SubscriberId subscriberId) {
        PagedPosition rollbackPosition;
        long lPrev;
        Subscription.Key keySubscription = (Subscription.Key)entrySubscription.getKey();
        int nChannel = keySubscription.getChannelId();
        if (!entrySubscription.isPresent()) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.Rejected, new IllegalStateException("Unknown subscriber group " + String.valueOf(keySubscription.getGroupId())));
        }
        if (!(position instanceof PagedPosition)) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.Rejected, new IllegalArgumentException("Invalid position type"));
        }
        PagedTopicSubscription pagedTopicSubscription = this.getPagedTopicSubscription(entrySubscription);
        Subscription subscription = (Subscription)entrySubscription.getValue();
        SubscriberId owner = pagedTopicSubscription != null ? pagedTopicSubscription.getOwningSubscriber(nChannel) : subscription.getOwningSubscriber();
        boolean fOwned = Objects.equals(owner, subscriberId);
        if (!fOwned && this.getDependencies().isOnlyOwnedCommits()) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.Rejected, new IllegalStateException("Attempted to commit a position in an unowned channel"));
        }
        long lPage = subscription.getPage();
        PagedPosition pagedPosition = (PagedPosition)position;
        long lPageCommit = pagedPosition.getPage();
        int nPosCommit = pagedPosition.getOffset();
        int nOffset = subscription.getPosition();
        if (lPage == -1L || lPage < lPageCommit && nOffset != Integer.MAX_VALUE || lPage == lPageCommit && nOffset < nPosCommit) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.Rejected, new IllegalArgumentException("Attempted to commit an unread position"));
        }
        PagedPosition committedPosition = subscription.getCommittedPosition();
        if (position.compareTo(committedPosition) <= 0) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.AlreadyCommitted);
        }
        Page page = this.peekPage(nChannel, lPage);
        long lPageActual = lPage;
        while (page != null && lPageActual > lPageCommit && (lPrev = page.getPreviousPartitionPage()) != -1L) {
            lPageActual = lPrev;
            page = this.peekPage(nChannel, lPageActual);
        }
        if (page == null) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.AlreadyCommitted);
        }
        if (lPageActual == lPageCommit) {
            if (page.getTail() < nPosCommit) {
                nPosCommit = page.getTail();
            }
        } else if (lPageActual < lPageCommit || page.isSealedAndEmpty()) {
            lPageCommit = lPageActual;
            nPosCommit = page.getTail();
        } else {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.NothingToCommit);
        }
        if (committedPosition.getPage() != -1L && committedPosition.compareTo(new PagedPosition(lPageCommit, nPosCommit)) >= 0) {
            return new Subscriber.CommitResult(nChannel, position, Subscriber.CommitResultStatus.AlreadyCommitted);
        }
        LinkedList<Long> listPagesDereference = new LinkedList<Long>();
        PagedPosition prevRollbackPosition = subscription.getRollbackPosition();
        long lPagePrevRollback = prevRollbackPosition.getPage();
        PagedPosition commitPosition = new PagedPosition(lPageCommit, nPosCommit);
        if (page.isSealed() && nPosCommit >= page.getTail()) {
            Page pageNext;
            listPagesDereference.add(lPageActual);
            long lPageNext = page.getNextPartitionPage();
            Page page2 = pageNext = lPageNext == -1L ? null : this.peekPage(nChannel, lPageNext);
            while (pageNext != null && pageNext.isSealed() && pageNext.getTail() == -1) {
                lPageActual = lPageNext;
                listPagesDereference.add(lPageActual);
                lPageNext = pageNext.getNextPartitionPage();
                pageNext = lPageNext != -1L ? this.peekPage(nChannel, lPageNext) : null;
            }
            rollbackPosition = lPageNext == -1L ? new PagedPosition(lPageNext, Integer.MAX_VALUE) : new PagedPosition(lPageNext, 0);
        } else {
            rollbackPosition = new PagedPosition(lPageCommit, nPosCommit + 1);
        }
        subscription.setSubscriptionHead(pagedPosition.getPage());
        subscription.setCommittedPosition(commitPosition, rollbackPosition);
        this.getStatistics().getSubscriberGroupStatistics(keySubscription.getGroupId()).onCommitted(nChannel, commitPosition);
        entrySubscription.setValue(subscription);
        long lPagePrev = page.getPreviousPartitionPage();
        BinaryEntry<Page.Key, Page> entry = this.peekPageEntry(nChannel, lPagePrev);
        while (entry != null && entry.isPresent() && lPagePrev >= lPagePrevRollback) {
            listPagesDereference.add(((Page.Key)entry.getKey()).getPageId());
            lPagePrev = ((Page)entry.getValue()).getPreviousPartitionPage();
            entry = lPagePrev > -1L ? this.peekPageEntry(nChannel, lPagePrev) : null;
        }
        if (!listPagesDereference.isEmpty()) {
            long lPageNext;
            BinaryEntry<Page.Key, Page> entryFirst;
            Page pageFirst;
            long lPageFirst = (Long)listPagesDereference.peekFirst();
            long lPageDereference = (Long)listPagesDereference.removeLast();
            BinaryEntry<Page.Key, Page> entryDereference = this.enlistPageEntry(nChannel, lPageDereference);
            Page pageDereference = (Page)entryDereference.getValue();
            if (pageDereference.decrementReferenceCount() == 0) {
                this.removePageIfNotRetainingElements(nChannel, ((Page.Key)entryDereference.getKey()).getPageId());
            }
            if (!pageDereference.isSubscribed()) {
                while (!listPagesDereference.isEmpty() && (pageDereference = (Page)(entryDereference = this.enlistPageEntry(nChannel, lPageDereference = ((Long)listPagesDereference.removeLast()).longValue())).getValue()).getReferenceCount() == 1) {
                    this.removePageIfNotRetainingElements(nChannel, ((Page.Key)entryDereference.getKey()).getPageId());
                }
            }
            Page page3 = pageFirst = (entryFirst = this.enlistPageEntry(nChannel, lPageFirst)).isPresent() ? (Page)entryFirst.getValue() : null;
            if (pageFirst != null && pageFirst.isSubscribed() && (lPageNext = pageFirst.getNextPartitionPage()) != -1L) {
                this.enlistPage(nChannel, lPageNext).incrementReferenceCount();
            }
        }
        Subscriber.CommitResultStatus status = fOwned ? Subscriber.CommitResultStatus.Committed : Subscriber.CommitResultStatus.Unowned;
        return new Subscriber.CommitResult(nChannel, position, status);
    }

    public SeekProcessor.Result seekPosition(BinaryEntry<Subscription.Key, Subscription> entrySubscription, PagedPosition position, SubscriberId subscriberId) {
        PagedPosition positionSeek;
        PagedPosition positionHead;
        long lPageRollback;
        Page page;
        Subscription subscription = (Subscription)entrySubscription.getValue();
        int nChannel = ((Subscription.Key)entrySubscription.getKey()).getChannelId();
        long lPage = subscription.getPage();
        long lPageSeek = position.getPage();
        int nOffsetSeek = position.getOffset();
        if (!Objects.equals(subscription.getOwningSubscriber(), subscriberId)) {
            throw new IllegalStateException("Subscriber is not allocated channel " + nChannel);
        }
        if (lPage > lPageSeek) {
            page = this.peekPage(nChannel, lPage);
            lPageRollback = lPage;
            while (page != null && (lPageRollback = page.getPreviousPartitionPage()) != -1L && lPageRollback >= lPageSeek) {
                lPage = lPageRollback;
                page = this.peekPage(nChannel, lPage);
            }
        } else {
            Usage usage;
            if (lPage == -1L && (lPage = (usage = this.enlistUsage(nChannel)).getPartitionHead()) == -1L) {
                return new SeekProcessor.Result(null, new PagedPosition(0L, 0));
            }
            page = this.peekPage(nChannel, lPage);
            lPageRollback = lPage;
            while (lPageRollback < lPageSeek && (lPageRollback = page.getNextPartitionPage()) != -1L) {
                lPage = lPageRollback;
                page = this.peekPage(nChannel, lPage);
            }
        }
        if (page == null) {
            subscription.setPage(lPage);
            subscription.setPosition(Integer.MAX_VALUE);
            positionHead = new PagedPosition(lPageRollback, 0);
            positionSeek = new PagedPosition(lPage - 1L, Integer.MAX_VALUE);
        } else if (lPage == lPageSeek) {
            if (page.isSealed() && nOffsetSeek >= page.getTail()) {
                lPageRollback = page.getNextPartitionPage();
                if (lPageRollback == -1L) {
                    subscription.setPage(lPage);
                    subscription.setPosition(Integer.MAX_VALUE);
                } else {
                    subscription.setPage(lPageRollback);
                    subscription.setPosition(0);
                }
                positionHead = new PagedPosition(lPageSeek + 1L, 0);
                positionSeek = new PagedPosition(lPageSeek, page.getTail());
            } else {
                subscription.setPage(lPage);
                subscription.setPosition(nOffsetSeek + 1);
                positionHead = new PagedPosition(lPage, nOffsetSeek + 1);
                positionSeek = new PagedPosition(lPage, nOffsetSeek);
            }
        } else if (lPage < lPageSeek) {
            if (page.isSealed()) {
                lPageRollback = page.getNextPartitionPage();
                if (lPageRollback == -1L) {
                    subscription.setPage(lPage);
                    subscription.setPosition(Integer.MAX_VALUE);
                } else {
                    subscription.setPage(lPageRollback);
                    subscription.setPosition(0);
                }
                positionHead = new PagedPosition(lPageSeek + 1L, 0);
                positionSeek = new PagedPosition(lPageSeek, page.getTail());
            } else {
                subscription.setPage(lPage);
                int nTail = page.getTail();
                int nTailNext = nTail + 1;
                subscription.setPosition(nTailNext);
                positionHead = new PagedPosition(lPage, nTailNext);
                positionSeek = new PagedPosition(lPage, nTail);
            }
        } else {
            subscription.setPage(lPage);
            subscription.setPosition(0);
            positionHead = new PagedPosition(lPage, 0);
            positionSeek = new PagedPosition(lPage - 1L, Integer.MAX_VALUE);
        }
        PagedPosition committed = subscription.getCommittedPosition();
        long lPageCommitted = committed.getPage();
        int nOffsetCommitted = committed.getOffset();
        long lPageSub = subscription.getPage();
        int nOffsetSub = subscription.getPosition();
        if (lPageCommitted > lPageSub || lPageCommitted == lPageSub && nOffsetCommitted > nOffsetSub) {
            PagedPosition posCommit;
            PagedPosition posRollback = new PagedPosition(lPageSub, nOffsetSub);
            if (nOffsetSub == 0) {
                page = this.peekPage(nChannel, lPageSub);
                posCommit = new PagedPosition(page.getPreviousPartitionPage(), page.getTail());
            } else {
                posCommit = new PagedPosition(lPageSub, nOffsetSub - 1);
            }
            subscription.setCommittedPosition(posCommit, posRollback);
        }
        entrySubscription.setValue(subscription);
        return new SeekProcessor.Result(positionHead, positionSeek);
    }

    public SeekProcessor.Result seekTimestamp(BinaryEntry<Subscription.Key, Subscription> entrySubscription, long lTimestamp, long nSubscriberId) {
        PagedPosition positionSeek;
        PagedPosition positionHead;
        Subscription subscription = (Subscription)entrySubscription.getValue();
        int nChannel = ((Subscription.Key)entrySubscription.getKey()).getChannelId();
        long lPage = subscription.getPage();
        SubscriberId owner = subscription.getOwningSubscriber();
        if (owner == null || owner.getId() != nSubscriberId) {
            throw new IllegalStateException("Subscriber is not allocated channel " + nChannel);
        }
        if (lPage == -1L) {
            return new SeekProcessor.Result(null, new PagedPosition(0L, 0));
        }
        Page page = this.peekPage(nChannel, lPage);
        int nMatch = page == null ? 0 : page.compareTimestamp(lTimestamp);
        long lPageRollback = lPage;
        if (nMatch > 0) {
            while (page != null) {
                lPageRollback = page.getPreviousPartitionPage();
                nMatch = page.compareTimestamp(lTimestamp);
                if (lPageRollback != -1L && nMatch >= 0) {
                    lPage = lPageRollback;
                    page = this.peekPage(nChannel, lPage);
                    continue;
                }
                break;
            }
        } else if (nMatch < 0) {
            while (page != null) {
                lPageRollback = page.getNextPartitionPage();
                nMatch = page.compareTimestamp(lTimestamp);
                if (lPageRollback != -1L && nMatch < 0) {
                    lPage = lPageRollback;
                    page = this.peekPage(nChannel, lPage);
                    continue;
                }
                break;
            }
        }
        if (page == null) {
            subscription.setPage(lPage);
            subscription.setPosition(Integer.MAX_VALUE);
            positionHead = new PagedPosition(lPageRollback, 0);
            positionSeek = new PagedPosition(lPage - 1L, Integer.MAX_VALUE);
        } else if (nMatch == 0) {
            int nPos;
            BackingMapContext ctxElements = this.getBackingMapContext(PagedTopicCaches.Names.CONTENT);
            Converter converterFrom = this.getValueFromInternalConverter();
            long lElementTime = 0L;
            for (nPos = 0; nPos < page.getTail() && lElementTime < lTimestamp; ++nPos) {
                Binary binPosKey = ContentKey.toBinary(this.f_nPartition, nChannel, lPage, nPos);
                BinaryEntry entryElement = (BinaryEntry)ctxElements.getReadOnlyEntry(binPosKey);
                PageElement element = PageElement.fromBinary(entryElement.getBinaryValue(), converterFrom);
                lElementTime = element.getTimestampMillis();
            }
            if (nPos >= page.getTail() && page.isSealed()) {
                lPageRollback = page.getNextPartitionPage();
                if (lPageRollback == -1L) {
                    subscription.setPage(lPage);
                    subscription.setPosition(Integer.MAX_VALUE);
                } else {
                    subscription.setPage(lPageRollback);
                    subscription.setPosition(0);
                }
                positionHead = new PagedPosition(lPage + 1L, 0);
                positionSeek = new PagedPosition(lPage, page.getTail());
            } else {
                subscription.setPage(lPage);
                subscription.setPosition(nPos);
                positionHead = new PagedPosition(lPage, nPos);
                if (nPos > 0) {
                    positionSeek = new PagedPosition(lPage, nPos - 1);
                } else {
                    long lPagePrev = page.getPreviousPartitionPage();
                    Page pagePrev = lPagePrev == -1L ? null : this.peekPage(nChannel, lPagePrev);
                    int nTail = pagePrev == null ? Integer.MAX_VALUE : pagePrev.getTail();
                    positionSeek = new PagedPosition(lPagePrev, nTail);
                }
            }
        } else if (nMatch < 0) {
            if (page.isSealed()) {
                lPageRollback = page.getNextPartitionPage();
                if (lPageRollback == -1L) {
                    subscription.setPage(lPage);
                    subscription.setPosition(Integer.MAX_VALUE);
                } else {
                    subscription.setPage(lPageRollback);
                    subscription.setPosition(0);
                }
                positionHead = new PagedPosition(lPage + 1L, 0);
                positionSeek = new PagedPosition(lPage, page.getTail());
            } else {
                subscription.setPage(lPage);
                int nTail = page.getTail();
                int nTailNext = nTail + 1;
                subscription.setPosition(nTailNext);
                positionHead = new PagedPosition(lPage, nTailNext);
                positionSeek = new PagedPosition(lPage, nTail);
            }
        } else {
            subscription.setPage(lPage);
            subscription.setPosition(0);
            positionHead = new PagedPosition(lPage + 1L, 0);
            positionSeek = new PagedPosition(lPage, page.getTail());
        }
        PagedPosition committed = subscription.getCommittedPosition();
        long lPageCommitted = committed.getPage();
        int nOffsetCommitted = committed.getOffset();
        long lPageSub = subscription.getPage();
        int nOffsetSub = subscription.getPosition();
        if (lPageCommitted > lPageSub || lPageCommitted == lPageSub && nOffsetCommitted > nOffsetSub) {
            PagedPosition posCommit;
            PagedPosition posRollback = new PagedPosition(lPageSub, nOffsetSub);
            if (nOffsetSub == 0) {
                page = this.peekPage(nChannel, lPageSub);
                posCommit = new PagedPosition(page.getPreviousPartitionPage(), page.getTail());
            } else {
                posCommit = new PagedPosition(lPageSub, nOffsetSub - 1);
            }
            subscription.setCommittedPosition(posCommit, posRollback);
        }
        entrySubscription.setValue(subscription);
        return new SeekProcessor.Result(positionHead, positionSeek);
    }

    protected void requestInsertionNotification(Page page, int nNotifyPostEmpty, int nChannel) {
        page.addInsertionNotifier(nNotifyPostEmpty);
        BinaryEntry<NotificationKey, int[]> entry = this.enlistBackingMapEntry(PagedTopicCaches.Names.NOTIFICATIONS, this.toBinaryKey(new NotificationKey(this.getPartition(), nNotifyPostEmpty)));
        if (entry != null) {
            entry.setValue(Arrays.binaryInsert((int[])entry.getValue(), nChannel));
            entry.expire(this.getDependencies().getNotificationTimeout());
        }
    }

    protected void requestRemovalNotification(int nNotifyPostFull, int nChannel) {
        int c = this.getChannelCount();
        for (int i = 0; i < c; ++i) {
            Usage usage = this.enlistUsage(i);
            if (usage.getPartitionHead() == -1L) continue;
            usage.addRemovalNotifier(nNotifyPostFull);
            BinaryEntry<NotificationKey, int[]> entry = this.enlistBackingMapEntry(PagedTopicCaches.Names.NOTIFICATIONS, this.toBinaryKey(new NotificationKey(this.getPartition(), nNotifyPostFull)));
            if (entry == null) continue;
            entry.setValue(Arrays.binaryInsert((int[])entry.getValue(), nChannel));
            entry.expire(PUBLISHER_NOTIFICATION_EXPIRY_MILLIS);
        }
    }

    public PagedTopicDependencies getDependencies() {
        PagedTopicBackingMapManager mgr = (PagedTopicBackingMapManager)this.f_ctxManager.getManager();
        return mgr.getTopicDependencies(this.f_sName);
    }

    protected PagedTopicStatistics getStatistics() {
        PagedTopicBackingMapManager mgr = (PagedTopicBackingMapManager)this.f_ctxManager.getManager();
        return mgr.getStatistics(this.f_sName);
    }

    protected PagedTopicSubscription getPagedTopicSubscription(BinaryEntry<Subscription.Key, Subscription> entry) {
        SubscriberGroupId groupId = ((Subscription.Key)entry.getKey()).getGroupId();
        long lId = this.f_service.getSubscriptionId(this.f_sName, groupId);
        return this.f_service.getSubscription(lId);
    }

    protected <K, V> BinaryEntry<K, V> enlistBackingMapEntry(PagedTopicCaches.Names<K, V> cacheName, Binary binaryKey) {
        BackingMapContext context = this.getBackingMapContext(cacheName);
        if (context == null) {
            return null;
        }
        return (BinaryEntry)context.getBackingMapEntry(binaryKey);
    }

    protected <K, V> BinaryEntry<K, V> peekBackingMapEntry(PagedTopicCaches.Names<K, V> cacheName, Binary binaryKey) {
        return (BinaryEntry)this.getBackingMapContext(cacheName).getReadOnlyEntry(binaryKey);
    }

    protected BackingMapContext getBackingMapContext(PagedTopicCaches.Names cacheName) {
        String sCacheName = cacheName.cacheNameForTopicName(this.f_sName);
        BackingMapContext ctx = this.f_ctxManager.getBackingMapContext(sCacheName);
        if (ctx == null) {
            throw new MapNotFoundException(sCacheName);
        }
        return ctx;
    }

    protected <F> Converter<F, Binary> getKeyToInternalConverter() {
        return this.f_ctxManager.getKeyToInternalConverter();
    }

    public Binary toBinaryKey(Object o) {
        return (Binary)this.f_ctxManager.getKeyToInternalConverter().convert(o);
    }

    protected <F> Converter<Binary, F> getValueFromInternalConverter() {
        return this.f_ctxManager.getValueFromInternalConverter();
    }

    protected <F> Converter<F, Binary> getValueToInternalConverter() {
        return this.f_ctxManager.getValueToInternalConverter();
    }
}

