/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged;

import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.common.base.Logger;
import com.oracle.coherence.common.collections.ConcurrentHashMap;
import com.tangosol.internal.net.NamedCacheDeactivationListener;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicDependencies;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber;
import com.tangosol.internal.net.topic.impl.paged.agent.EnsureSubscriptionProcessor;
import com.tangosol.internal.net.topic.impl.paged.agent.EvictSubscriber;
import com.tangosol.internal.net.topic.impl.paged.filter.UnreadTopicContentFilter;
import com.tangosol.internal.net.topic.impl.paged.model.ContentKey;
import com.tangosol.internal.net.topic.impl.paged.model.NotificationKey;
import com.tangosol.internal.net.topic.impl.paged.model.Page;
import com.tangosol.internal.net.topic.impl.paged.model.PagedPosition;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberInfo;
import com.tangosol.internal.net.topic.impl.paged.model.Subscription;
import com.tangosol.internal.net.topic.impl.paged.model.Usage;
import com.tangosol.io.ClassLoaderAware;
import com.tangosol.io.Serializer;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.Member;
import com.tangosol.net.MemberEvent;
import com.tangosol.net.MemberListener;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.cache.TypeAssertion;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Position;
import com.tangosol.util.AbstractMapListener;
import com.tangosol.util.Aggregators;
import com.tangosol.util.Binary;
import com.tangosol.util.ConcurrentMap;
import com.tangosol.util.Filter;
import com.tangosol.util.Filters;
import com.tangosol.util.HashHelper;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.InvocableMapHelper;
import com.tangosol.util.MapEvent;
import com.tangosol.util.NullImplementation;
import com.tangosol.util.ValueExtractor;
import com.tangosol.util.aggregator.Count;
import com.tangosol.util.aggregator.GroupAggregator;
import com.tangosol.util.extractor.ReflectionExtractor;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.NotFilter;
import java.io.PrintStream;
import java.lang.invoke.LambdaMetafactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class PagedTopicCaches
implements ClassLoaderAware,
AutoCloseable {
    protected final String f_sTopicName;
    protected final PagedTopicService f_topicService;
    protected final String f_sCacheServiceName;
    private volatile State m_state;
    protected final int f_cPartition;
    protected volatile Set<NamedCache> f_setCaches;
    public NamedCache<Page.Key, Page> Pages;
    public NamedCache<Binary, Binary> Data;
    public NamedCache<Subscription.Key, Subscription> Subscriptions;
    public NamedCache<SubscriberInfo.Key, SubscriberInfo> Subscribers;
    public NamedCache<NotificationKey, int[]> Notifications;
    public NamedCache<Usage.Key, Usage> Usages;
    private final BiFunction<String, ClassLoader, NamedCache> f_functionCache;
    private final DeactivationListener m_deactivationListener = new DeactivationListener();
    private final Map<Listener, Object> m_mapListener = new ConcurrentHashMap<Listener, Object>();
    private final PagedTopicDependencies f_dependencies;

    public PagedTopicCaches(String sName, PagedTopicService cacheService) {
        this(sName, cacheService, true);
    }

    public PagedTopicCaches(String sName, PagedTopicService cacheService, boolean registerListeners) {
        this(sName, cacheService, null, registerListeners);
    }

    PagedTopicCaches(String sName, PagedTopicService cacheService, BiFunction<String, ClassLoader, NamedCache> functionCache, boolean registerListeners) {
        if (sName == null || sName.isEmpty()) {
            throw new IllegalArgumentException("The name argument cannot be null or empty String");
        }
        if (cacheService == null) {
            throw new IllegalArgumentException("The cacheService argument cannot be null");
        }
        if (functionCache == null) {
            functionCache = cacheService::ensureCache;
        }
        this.f_sTopicName = sName;
        this.f_topicService = cacheService;
        this.f_sCacheServiceName = cacheService.getInfo().getServiceName();
        this.f_cPartition = cacheService.getPartitionCount();
        this.f_functionCache = functionCache;
        this.f_dependencies = cacheService.getTopicBackingMapManager().getTopicDependencies(sName);
        this.initializeCaches(registerListeners);
        this.m_state = State.Active;
    }

    public Serializer getSerializer() {
        return this.f_topicService.getSerializer();
    }

    @Override
    public void close() {
        this.release();
    }

    public void release() {
        this.releaseOrDestroy(false);
    }

    public void destroy() {
        this.releaseOrDestroy(true);
    }

    public boolean isActive() {
        State state = this.m_state;
        return state == State.Active || state == State.Disconnected;
    }

    public boolean isDestroyed() {
        return this.Pages.isDestroyed();
    }

    public boolean isReleased() {
        return this.Pages.isReleased();
    }

    public void addListener(Listener listener) {
        this.m_mapListener.put(listener, Boolean.TRUE);
    }

    public void removeListener(Listener listener) {
        this.m_mapListener.remove(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ensureConnected() {
        if (this.m_state == State.Disconnected) {
            PagedTopicCaches pagedTopicCaches = this;
            synchronized (pagedTopicCaches) {
                if (this.m_state == State.Disconnected) {
                    this.m_state = State.Active;
                    this.f_setCaches.forEach(ConcurrentMap::size);
                    Set<Listener> setListener = this.m_mapListener.keySet();
                    for (Listener listener : setListener) {
                        try {
                            listener.onConnect();
                        }
                        catch (Throwable t) {
                            Logger.err(t);
                        }
                    }
                }
            }
        }
    }

    @Override
    public ClassLoader getContextClassLoader() {
        return this.f_topicService.getContextClassLoader();
    }

    @Override
    public void setContextClassLoader(ClassLoader classLoader) {
        throw new UnsupportedOperationException();
    }

    public String getTopicName() {
        return this.f_sTopicName;
    }

    public int getBasePage() {
        return Math.abs(this.f_sTopicName.hashCode() % this.f_cPartition);
    }

    public int getPartitionCount() {
        return this.f_cPartition;
    }

    public int getChannelCount() {
        return this.f_topicService.getChannelCount(this.f_sTopicName);
    }

    public Set<NotificationKey> getPartitionNotifierSet(int nNotifier) {
        HashSet<NotificationKey> setKey = new HashSet<NotificationKey>();
        for (int i = 0; i < this.f_cPartition; ++i) {
            setKey.add(new NotificationKey(i, nNotifier));
        }
        return setKey;
    }

    public int getUnitOfOrder(int nPartition) {
        return this.f_sTopicName.hashCode() + nPartition;
    }

    public PagedTopicService getService() {
        return this.f_topicService;
    }

    public PagedTopicDependencies getDependencies() {
        return this.f_dependencies;
    }

    public Usage.Key getUsageSyncKey(int nChannel) {
        int nPart = Math.abs(HashHelper.hash(this.f_sTopicName.hashCode(), nChannel) % this.f_cPartition);
        return new Usage.Key(nPart, nChannel);
    }

    public boolean isCommitted(SubscriberGroupId groupId, int nChannel, Position position) {
        if (position instanceof PagedPosition && nChannel >= 0 && nChannel < this.getChannelCount()) {
            Map<Integer, Position> map = this.getLastCommitted(groupId);
            Position posCommitted = map.get(nChannel);
            return posCommitted != null && posCommitted.compareTo(position) >= 0;
        }
        return false;
    }

    public Map<Integer, Position> getLastCommitted(SubscriberGroupId subscriberGroupId) {
        InvocableMap.StreamingAggregator aggregatorPos = Aggregators.comparableMax(Subscription::getCommittedPosition);
        return this.getPositions(subscriberGroupId, aggregatorPos);
    }

    public Map<Integer, Position> getHeads(SubscriberGroupId subscriberGroupId, long nSubscriberId) {
        InvocableMap.StreamingAggregator aggregatorPos = Aggregators.comparableMin(new Subscription.HeadExtractor(nSubscriberId));
        return this.getPositions(subscriberGroupId, aggregatorPos);
    }

    private Map<Integer, Position> getPositions(SubscriberGroupId subscriberGroupId, InvocableMap.EntryAggregator<Subscription.Key, Subscription, Position> aggregator) {
        ReflectionExtractor extractorChannel = new ReflectionExtractor("getChannelId", new Object[0], 1);
        ReflectionExtractor extractorGroup = new ReflectionExtractor("getGroupId", new Object[0], 1);
        EqualsFilter filter = Filters.equal(extractorGroup, subscriberGroupId);
        NotFilter filterPosition = Filters.not(Filters.equal(PagedPosition::getPage, Long.valueOf(-1L)));
        return ((Map)this.Subscriptions.aggregate(filter, GroupAggregator.createInstance(extractorChannel, aggregator, filterPosition))).entrySet().stream().filter(e -> (Integer)e.getKey() != -1 && e.getValue() != null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    public Map<Integer, Position> getHeads() {
        ReflectionExtractor extractorChannel = new ReflectionExtractor("getChannelId", new Object[0], 1);
        InvocableMap.StreamingAggregator aggregatorTail = Aggregators.comparableMin(Page.HeadExtractor.INSTANCE);
        return (Map)this.Pages.aggregate(GroupAggregator.createInstance(extractorChannel, aggregatorTail));
    }

    public Map<Integer, Position> getTails() {
        ReflectionExtractor extractorChannel = new ReflectionExtractor("getChannelId", new Object[0], 1);
        InvocableMap.StreamingAggregator aggregatorTail = Aggregators.comparableMax(Page.TailExtractor.INSTANCE);
        return (Map)this.Pages.aggregate(GroupAggregator.createInstance(extractorChannel, aggregatorTail));
    }

    public NamedTopic.ElementCalculator getElementCalculator() {
        return this.getDependencies().getElementCalculator();
    }

    public Set<SubscriberId> getSubscribers(String sGroupName) {
        return this.f_topicService.getSubscribers(this.f_sTopicName, SubscriberGroupId.withName(sGroupName));
    }

    public Set<String> getSubscriberGroups() {
        return this.getSubscriberGroupsIds(false).stream().map(SubscriberGroupId::getGroupName).collect(Collectors.toSet());
    }

    public Set<SubscriberGroupId> getSubscriberGroupsIds(boolean fAnonymous) {
        Stream stream = fAnonymous ? this.f_topicService.getSubscriberGroups(this.f_sTopicName).stream() : this.f_topicService.getSubscriberGroups(this.f_sTopicName).stream().filter(SubscriberGroupId::isDurable);
        return stream.collect(Collectors.toSet());
    }

    public Map<Long, Set<Integer>> getChannelAllocations(String sGroup) {
        Subscription.Key key = new Subscription.Key(0, 0, SubscriberGroupId.withName(sGroup));
        Subscription subscription = (Subscription)this.Subscriptions.get(key);
        if (subscription != null) {
            return Collections.unmodifiableMap(subscription.getAllocationMap());
        }
        return Collections.emptyMap();
    }

    public void printChannelAllocations(String sGroup, PrintStream out) {
        Map<Integer, String> mapMember = this.f_topicService.getCluster().getMemberSet().stream().collect(Collectors.toMap(Member::getId, (Function<Member, String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, toString(), (Lcom/tangosol/net/Member;)Ljava/lang/String;)()));
        out.println("Subscriber channel allocations for topic \"" + this.f_sTopicName + "\" subscriber group \"" + sGroup + "\":");
        for (Map.Entry<Long, Set<Integer>> entry : this.getChannelAllocations(sGroup).entrySet()) {
            long nId = entry.getKey();
            int nMember = PagedTopicSubscriber.memberIdFromId(nId);
            out.println("SubscriberId=" + nId + " channels=" + String.valueOf(entry.getValue()) + " " + mapMember.get(nMember));
        }
    }

    public void disconnectSubscriber(SubscriberGroupId groupId, SubscriberId id) {
        this.Subscribers.invoke(new SubscriberInfo.Key(groupId, id.getId()), EvictSubscriber.INSTANCE);
    }

    public long[] disconnectAllSubscribers(String sGroup, int nMember) {
        return this.disconnectAllSubscribers(SubscriberGroupId.withName(sGroup), nMember);
    }

    public long[] disconnectAllSubscribers(SubscriberGroupId id, int nMember) {
        Filter filter = Filters.equal(SubscriberInfo.GroupIdExtractor.INSTANCE, id).and(Filters.equal(SubscriberInfo.MemberIdExtractor.INSTANCE, Integer.valueOf(nMember)));
        Map<SubscriberInfo.Key, Boolean> map = this.Subscribers.invokeAll(filter, EvictSubscriber.INSTANCE);
        return map.keySet().stream().mapToLong(SubscriberInfo.Key::getSubscriberId).toArray();
    }

    public long[] disconnectAllSubscribers(String sGroup) {
        return this.disconnectAllSubscribers(SubscriberGroupId.withName(sGroup));
    }

    public long[] disconnectAllSubscribers(SubscriberGroupId id) {
        EqualsFilter<Map.Entry<SubscriberInfo.Key, SubscriberInfo>, SubscriberGroupId> filter = Filters.equal(SubscriberInfo.GroupIdExtractor.INSTANCE, id);
        Map<SubscriberInfo.Key, Boolean> map = this.Subscribers.invokeAll(filter, EvictSubscriber.INSTANCE);
        return map.keySet().stream().mapToLong(SubscriberInfo.Key::getSubscriberId).toArray();
    }

    public void disconnectAllSubscribers() {
        for (SubscriberGroupId id : this.getSubscriberGroupsIds(false)) {
            long lSubscription = this.f_topicService.getSubscriptionId(this.f_sTopicName, id);
            PagedTopicSubscriber.notifyClosed(this.Subscriptions, id, lSubscription, SubscriberId.NullSubscriber);
        }
        this.Subscribers.clear();
    }

    public void ensureSubscriberGroup(String sName, Filter<?> filter, ValueExtractor<?, ?> extractor) {
        SubscriberGroupId subscriberGroupId = SubscriberGroupId.withName(sName);
        this.initializeSubscription(subscriberGroupId, SubscriberId.NullSubscriber, 0L, filter, extractor, false, true, false);
    }

    protected long[] initializeSubscription(SubscriberGroupId subscriberGroupId, SubscriberId subscriberId, long lSubscription, Filter<?> filter, ValueExtractor<?, ?> extractor, boolean fReconnect, boolean fCreateGroupOnly, boolean fDisconnected) {
        try {
            long[] alHead;
            Collection<EnsureSubscriptionProcessor.Result> results;
            String sName = subscriberGroupId.getGroupName();
            HashSet<Subscription.Key> setSubKeys = new HashSet<Subscription.Key>(this.f_cPartition);
            if (lSubscription == 0L) {
                lSubscription = this.getService().ensureSubscription(this.f_sTopicName, subscriberGroupId, subscriberId, filter, extractor);
            }
            for (int i = 0; i < this.f_cPartition; ++i) {
                setSubKeys.add(new Subscription.Key(i, 0, subscriberGroupId));
            }
            EnsureSubscriptionProcessor processor = new EnsureSubscriptionProcessor(0, null, filter, extractor, subscriberId, fReconnect, fCreateGroupOnly, lSubscription);
            if (sName == null) {
                results = null;
            } else {
                CompletableFuture<Map<Subscription.Key, EnsureSubscriptionProcessor.Result>> future = InvocableMapHelper.invokeAllAsync(this.Subscriptions, setSubKeys, key -> this.getUnitOfOrder(key.getPartitionId()), processor, new BiConsumer[0]);
                try {
                    long cMillis = this.getDependencies().getSubscriberTimeoutMillis();
                    results = future.get(cMillis, TimeUnit.MILLISECONDS).values();
                }
                catch (TimeoutException e) {
                    try {
                        future.cancel(true);
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                    throw Exceptions.ensureRuntimeException(e);
                }
            }
            Collection<long[]> colPages = EnsureSubscriptionProcessor.Result.assertPages(results);
            if (colPages == null || colPages.contains(null) || fDisconnected) {
                alHead = this.initialiseSubscriptionPages(subscriberId, lSubscription, filter, extractor, fReconnect, fCreateGroupOnly, setSubKeys);
            } else {
                int cChannel = colPages.stream().mapToInt(an -> ((long[])an).length).max().orElse(this.getChannelCount());
                alHead = new long[cChannel];
                for (int nChannel = 0; nChannel < cChannel; ++nChannel) {
                    int finChan = nChannel;
                    alHead[nChannel] = colPages.stream().mapToLong(alResult -> alResult[finChan]).min().orElse(-1L);
                }
            }
            return alHead;
        }
        catch (InterruptedException | ExecutionException e) {
            if (this.isActive()) {
                throw Exceptions.ensureRuntimeException(e);
            }
            return new long[0];
        }
    }

    protected long[] initialiseSubscriptionPages(SubscriberId subscriberId, long lSubscription, Filter<?> filter, ValueExtractor<?, ?> extractor, boolean fReconnect, boolean fCreateGroupOnly, Set<Subscription.Key> setSubKeys) throws InterruptedException, ExecutionException {
        Collection<EnsureSubscriptionProcessor.Result> results;
        EnsureSubscriptionProcessor processor = new EnsureSubscriptionProcessor(1, null, filter, extractor, subscriberId, fReconnect, fCreateGroupOnly, lSubscription);
        CompletableFuture<Map<Subscription.Key, EnsureSubscriptionProcessor.Result>> future = InvocableMapHelper.invokeAllAsync(this.Subscriptions, setSubKeys, key -> this.getUnitOfOrder(key.getPartitionId()), processor, new BiConsumer[0]);
        try {
            long cMillis = this.getDependencies().getSubscriberTimeoutMillis();
            results = future.get(cMillis, TimeUnit.MILLISECONDS).values();
        }
        catch (TimeoutException e) {
            try {
                future.cancel(true);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw Exceptions.ensureRuntimeException(e);
        }
        Collection<long[]> colPages = EnsureSubscriptionProcessor.Result.assertPages(results);
        int cChannel = colPages.stream().mapToInt(an -> ((long[])an).length).max().orElse(this.getChannelCount());
        PagedTopicDependencies dependencies = this.getDependencies();
        long lPageBase = this.getBasePage();
        long[] alHead = new long[cChannel];
        for (int nChannel = 0; nChannel < cChannel; ++nChannel) {
            int finChan = nChannel;
            alHead[nChannel] = fReconnect || dependencies.isRetainConsumed() ? colPages.stream().mapToLong(alPage -> ((long[])alPage).length > finChan ? Math.max(alPage[finChan], lPageBase) : lPageBase).min().getAsLong() : colPages.stream().mapToLong(alPage -> Math.max(alPage[finChan], lPageBase)).max().getAsLong();
        }
        processor = new EnsureSubscriptionProcessor(2, alHead, filter, extractor, subscriberId, fReconnect, fCreateGroupOnly, lSubscription);
        CompletableFuture<Map<Subscription.Key, EnsureSubscriptionProcessor.Result>> futureSub = InvocableMapHelper.invokeAllAsync(this.Subscriptions, setSubKeys, key -> this.getUnitOfOrder(key.getPartitionId()), processor, new BiConsumer[0]);
        try {
            futureSub.get(30L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            throw Exceptions.ensureRuntimeException(e, "Timed out waiting for subscriptions");
        }
        return alHead;
    }

    public Set<NamedCache> getCaches() {
        return Collections.unmodifiableSet(this.f_setCaches);
    }

    public int getRemainingMessages(SubscriberGroupId id, int ... anChannel) {
        if (this.Subscriptions.containsKey(new Subscription.Key(0, 0, id))) {
            Map<Integer, Position> mapHeads = this.getLastCommitted(id);
            Map<Integer, Position> mapTails = this.getTails();
            for (int i = 0; i < this.getChannelCount(); ++i) {
                mapHeads.putIfAbsent(i, new PagedPosition(-1L, -1));
            }
            if (anChannel.length > 0) {
                List listChannel = IntStream.of(anChannel).boxed().collect(Collectors.toList());
                mapHeads.keySet().retainAll(listChannel);
                mapTails.keySet().retainAll(listChannel);
            }
            Count counter = new Count();
            Binary bin = (Binary)this.Data.aggregate(new UnreadTopicContentFilter(mapHeads, mapTails), counter);
            return ((Number)this.f_topicService.getBackingMapManager().getContext().getValueFromInternalConverter().convert(bin)).intValue();
        }
        return this.Data.size();
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        PagedTopicCaches that = (PagedTopicCaches)o;
        return this.f_sTopicName.equals(that.f_sTopicName) && this.f_topicService.equals(that.f_topicService);
    }

    public int hashCode() {
        return HashHelper.hash(this.f_sTopicName, 31);
    }

    public String toString() {
        return "TopicCaches(name='" + this.f_sTopicName + ", service=" + this.f_sCacheServiceName + ", state=" + String.valueOf((Object)this.m_state) + ")";
    }

    private void initializeCaches(boolean registerListeners) {
        this.f_topicService.start();
        ClassLoader loader = this.f_topicService.getContextClassLoader();
        this.Pages = this.f_functionCache.apply(Names.PAGES.cacheNameForTopicName(this.f_sTopicName), loader);
        this.Subscribers = this.f_functionCache.apply(Names.SUBSCRIBERS.cacheNameForTopicName(this.f_sTopicName), loader);
        this.Notifications = this.f_functionCache.apply(Names.NOTIFICATIONS.cacheNameForTopicName(this.f_sTopicName), loader);
        this.Usages = this.f_functionCache.apply(Names.USAGE.cacheNameForTopicName(this.f_sTopicName), loader);
        this.Subscriptions = this.f_functionCache.apply(Names.SUBSCRIPTIONS.cacheNameForTopicName(this.f_sTopicName), loader);
        this.Data = this.f_functionCache.apply(Names.CONTENT.cacheNameForTopicName(this.f_sTopicName), NullImplementation.getClassLoader());
        this.f_setCaches = new HashSet<NamedCache>();
        HashSet<NamedCache> setCaches = this.f_setCaches;
        setCaches.add(this.Pages);
        setCaches.add(this.Data);
        setCaches.add(this.Subscriptions);
        setCaches.add(this.Subscribers);
        setCaches.add(this.Notifications);
        setCaches.add(this.Usages);
        if (registerListeners) {
            this.ensureListeners();
        }
    }

    private void ensureListeners() {
        DeactivationListener listener = this.m_deactivationListener;
        this.Pages.addMapListener(listener);
        this.f_topicService.addMemberListener(listener);
    }

    private void removeListeners() {
        DeactivationListener listener = this.m_deactivationListener;
        if (this.Pages.isActive()) {
            this.Pages.removeMapListener(listener);
        }
        this.f_topicService.removeMemberListener(listener);
    }

    private void releaseOrDestroy(boolean fDestroy) {
        if (this.isActive()) {
            this.m_state = fDestroy ? State.Destroyed : State.Released;
            Set<Listener> setListener = this.m_mapListener.keySet();
            for (Listener listener : setListener) {
                try {
                    if (fDestroy) {
                        listener.onDestroy();
                        continue;
                    }
                    listener.onRelease();
                }
                catch (Throwable t) {
                    Logger.err(t);
                }
            }
            this.removeListeners();
            if (this.f_setCaches != null) {
                Consumer<NamedCache> function;
                Consumer<NamedCache> consumer = function = fDestroy ? this::destroyCache : this::releaseCache;
                if (this.f_setCaches != null) {
                    this.f_setCaches.forEach(c -> {
                        if (c.isActive()) {
                            function.accept((NamedCache)c);
                        }
                    });
                    this.f_setCaches = null;
                }
            }
        }
    }

    private void releaseCache(NamedCache<?, ?> cache) {
        if (cache.isActive() && !cache.isReleased()) {
            this.f_topicService.releaseCache(cache);
        }
    }

    private void destroyCache(NamedCache<?, ?> cache) {
        if (cache.isActive() && !cache.isDestroyed()) {
            this.f_topicService.destroyCache(cache);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void disconnected() {
        if (this.m_state == State.Active) {
            PagedTopicCaches pagedTopicCaches = this;
            synchronized (pagedTopicCaches) {
                if (this.m_state == State.Active) {
                    this.m_state = State.Disconnected;
                    Set<Listener> setListener = this.m_mapListener.keySet();
                    for (Listener listener : setListener) {
                        try {
                            listener.onDisconnect();
                        }
                        catch (Throwable t) {
                            Logger.err(t);
                        }
                    }
                }
            }
        }
    }

    class DeactivationListener
    extends AbstractMapListener
    implements NamedCacheDeactivationListener,
    MemberListener {
        DeactivationListener() {
        }

        @Override
        public void entryDeleted(MapEvent evt) {
            NamedCache cache = (NamedCache)evt.getMap();
            boolean fReleased = cache.isReleased();
            boolean fDestroyed = cache.isDestroyed();
            if (fReleased || fDestroyed) {
                PagedTopicCaches.this.releaseOrDestroy(fDestroyed);
            }
        }

        @Override
        public void memberLeft(MemberEvent evt) {
            DistributedCacheService service = (DistributedCacheService)evt.getService();
            if (evt.isLocal()) {
                Logger.fine("Detected local member disconnect in service " + String.valueOf(PagedTopicCaches.this));
                PagedTopicCaches.this.disconnected();
            } else {
                service.getOwnershipSenior();
                if (service.getOwnershipEnabledMembers().isEmpty()) {
                    Logger.fine("Detected loss of all storage members in service " + String.valueOf(PagedTopicCaches.this));
                    PagedTopicCaches.this.disconnected();
                }
            }
        }

        @Override
        public void memberJoined(MemberEvent evt) {
        }

        @Override
        public void memberLeaving(MemberEvent evt) {
        }

        @Override
        public boolean equals(Object oThat) {
            return oThat instanceof DeactivationListener && this.hashCode() == oThat.hashCode();
        }

        public int hashCode() {
            return System.identityHashCode(this);
        }
    }

    public static enum State {
        Active,
        Released,
        Destroyed,
        Disconnected,
        Closed;

    }

    public static interface Listener {
        public void onDisconnect();

        public void onConnect();

        public void onDestroy();

        public void onRelease();
    }

    public static class Names<K, V> {
        private static final Set<Names> s_setValues = new HashSet<Names>();
        public static final String METACACHE_PREFIX = "$meta$topic";
        public static final String CONTENT_PREFIX = "$topic";
        public static final Names<ContentKey, Object> CONTENT = new Names<ContentKey, Object>("content", "$topic$", ContentKey.class, Object.class, Storage.Data);
        public static final Names<Page.Key, Page> PAGES = new Names<Page.Key, Page>("pages", "$meta$topic$pages$", Page.Key.class, Page.class, Storage.MetaData);
        public static final Names<Subscription.Key, Subscription> SUBSCRIPTIONS = new Names<Subscription.Key, Subscription>("subscriptions", "$meta$topic$subscriptions$", Subscription.Key.class, Subscription.class, Storage.MetaData);
        public static final Names<SubscriberInfo.Key, SubscriberInfo> SUBSCRIBERS = new Names<SubscriberInfo.Key, SubscriberInfo>("subscribers", "$meta$topic$subscribers$", SubscriberInfo.Key.class, SubscriberInfo.class, Storage.MetaData);
        public static final Names<NotificationKey, int[]> NOTIFICATIONS = new Names<NotificationKey, int[]>("notifications", "$meta$topic$notifications$", NotificationKey.class, int[].class, Storage.MetaData);
        public static final Names<Usage.Key, Usage> USAGE = new Names<Usage.Key, Usage>("usage", "$meta$topic$usage$", Usage.Key.class, Usage.class, Storage.Data);
        private final String f_sName;
        private final String f_sPrefix;
        private final Class<K> f_classKey;
        private final Class<V> f_classValue;
        private final TypeAssertion<K, V> f_typeAssertion;
        private final Storage f_storage;

        private Names(String sName, String sPrefix, Class<K> classKey, Class<V> classValue, Storage storage) {
            this.f_sName = sName;
            this.f_sPrefix = sPrefix;
            this.f_classKey = classKey;
            this.f_classValue = classValue;
            this.f_typeAssertion = TypeAssertion.withTypes(this.f_classKey, this.f_classValue);
            this.f_storage = storage;
            s_setValues.add(this);
        }

        public String cacheNameForTopicName(String sTopicName) {
            return this.f_sPrefix + sTopicName;
        }

        public static Names fromCacheName(String sCacheName) {
            for (Names pagedTopicCacheNames : Names.values()) {
                if (!sCacheName.startsWith(pagedTopicCacheNames.f_sPrefix)) continue;
                return pagedTopicCacheNames;
            }
            throw new IllegalArgumentException("Cache name " + sCacheName + " is not a valid TopicCacheName");
        }

        public static String getTopicName(String sCacheName) {
            for (Names pagedTopicCacheNames : Names.values()) {
                if (!sCacheName.startsWith(pagedTopicCacheNames.f_sPrefix)) continue;
                return sCacheName.substring(pagedTopicCacheNames.f_sPrefix.length());
            }
            return sCacheName;
        }

        public static Set<Names> values() {
            return Collections.unmodifiableSet(s_setValues);
        }

        public TypeAssertion<K, V> getTypeAssertion() {
            return this.f_typeAssertion;
        }

        public String getPrefix() {
            return this.f_sPrefix;
        }

        public Class<K> getKeyClass() {
            return this.f_classKey;
        }

        public Class<V> getValueClass() {
            return this.f_classValue;
        }

        public Storage getStorage() {
            return this.f_storage;
        }

        public boolean isInternal() {
            return Storage.MetaData.equals((Object)this.getStorage());
        }

        public boolean isA(String sCacheName) {
            return sCacheName != null && sCacheName.startsWith(this.f_sPrefix);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Names names = (Names)o;
            return this.f_sName.equals(names.f_sName);
        }

        public int hashCode() {
            return this.f_sName.hashCode();
        }

        public String toString() {
            return this.f_sName;
        }

        public static enum Storage {
            Data,
            MetaData;

        }
    }
}

