/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.net.topic.impl.paged.agent;

import com.oracle.coherence.common.base.Logger;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriber;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberInfo;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.net.DistributedCacheService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.partition.PartitionSet;
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.UUID;
import com.tangosol.util.filter.AlwaysFilter;
import com.tangosol.util.filter.PartitionedFilter;
import com.tangosol.util.processor.AbstractEvolvableProcessor;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class CleanupSubscribers
extends AbstractEvolvableProcessor<SubscriberInfo.Key, SubscriberInfo, Boolean> {
    public void execute(DistributedCacheService service) {
        this.execute(service, null);
    }

    public void execute(DistributedCacheService service, PartitionSet parts) {
        if (!service.isRunning()) {
            return;
        }
        ClassLoader loader = service.getContextClassLoader();
        Enumeration enumeration = service.getCacheNames();
        HashSet<String> setTopic = new HashSet<String>();
        while (enumeration.hasMoreElements()) {
            String sCacheName = (String)enumeration.nextElement();
            String sTopicName = PagedTopicCaches.Names.getTopicName(sCacheName);
            setTopic.add(sTopicName);
        }
        for (String sTopic : setTopic) {
            try {
                if (!service.isRunning()) break;
                PartitionSet partsCleanup = parts == null ? service.getOwnedPartitions(service.getCluster().getLocalMember()) : parts;
                PartitionedFilter filter = new PartitionedFilter(AlwaysFilter.INSTANCE(), partsCleanup);
                NamedCache cache = service.ensureCache(PagedTopicCaches.Names.SUBSCRIBERS.cacheNameForTopicName(sTopic), loader);
                if (!cache.isActive()) continue;
                cache.async().invokeAll(filter, this).handle((mapResult, err) -> {
                    if (err == null) {
                        if (!mapResult.isEmpty()) {
                            int nMember;
                            HashMap<Integer, Map> mapRemoved = new HashMap<Integer, Map>();
                            for (SubscriberInfo.Key key : mapResult.keySet()) {
                                nMember = PagedTopicSubscriber.memberIdFromId(key.getSubscriberId());
                                mapRemoved.computeIfAbsent(nMember, k -> new HashMap()).computeIfAbsent(key.getGroupId(), k -> new ArrayList()).add(key.getSubscriberId());
                            }
                            for (Map.Entry entry : mapRemoved.entrySet()) {
                                nMember = (Integer)entry.getKey();
                                String sMsg = ((Map)entry.getValue()).entrySet().stream().map(e -> "[Group='" + ((SubscriberGroupId)e.getKey()).getGroupName() + "' Subscribers=" + PagedTopicSubscriber.idToString((Collection)e.getValue()) + "]").collect(Collectors.joining(", "));
                                Logger.info("Removed the following subscribers from topic " + sTopic + " due to departure of member " + nMember + " " + sMsg);
                            }
                        }
                    } else if (cache.isActive()) {
                        Logger.err("Caught exception cleaning up subscribers in topic " + sTopic, err);
                    }
                    return null;
                });
            }
            catch (Throwable t) {
                Logger.err("Caught exception cleaning up subscribers in topic " + sTopic, t);
            }
        }
    }

    @Override
    public Map<SubscriberInfo.Key, Boolean> processAll(Set<? extends InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo>> setEntries) {
        HashMap<SubscriberInfo.Key, Boolean> mapResult = new HashMap<SubscriberInfo.Key, Boolean>();
        BinaryEntry binaryEntry = setEntries.stream().findFirst().map(InvocableMap.Entry::asBinaryEntry).orElse(null);
        if (binaryEntry != null) {
            DistributedCacheService service = (DistributedCacheService)binaryEntry.getContext().getCacheService();
            Map<Integer, Member> mapMember = service.getInfo().getServiceMembers().stream().collect(Collectors.toMap(Member::getId, m -> m));
            for (InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry : setEntries) {
                if (!this.process(entry, mapMember).booleanValue()) continue;
                mapResult.put(entry.getKey(), true);
            }
        }
        return mapResult;
    }

    @Override
    public Boolean process(InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry) {
        if (entry.isPresent()) {
            DistributedCacheService service = (DistributedCacheService)entry.asBinaryEntry().getContext().getCacheService();
            Map<Integer, Member> mapMember = service.getInfo().getServiceMembers().stream().collect(Collectors.toMap(Member::getId, m -> m));
            return this.process(entry, mapMember);
        }
        return false;
    }

    @Override
    public int getImplVersion() {
        return 0;
    }

    @Override
    public void readExternal(PofReader in) throws IOException {
    }

    @Override
    public void writeExternal(PofWriter out) throws IOException {
    }

    Boolean process(InvocableMap.Entry<SubscriberInfo.Key, SubscriberInfo> entry, Map<Integer, Member> mapMember) {
        if (entry.isPresent()) {
            long nId = entry.getKey().getSubscriberId();
            SubscriberInfo info = entry.getValue();
            UUID uuid = info.getOwningUid();
            int nMember = PagedTopicSubscriber.memberIdFromId(nId);
            Member member = mapMember.get(nMember);
            boolean fRemove = false;
            if (member == null) {
                fRemove = true;
            } else if (uuid != null && !member.getUuid().equals(uuid)) {
                fRemove = true;
            } else if (Instant.ofEpochMilli(member.getTimestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime().isAfter(info.getLastHeartbeat())) {
                fRemove = true;
            }
            if (fRemove) {
                entry.remove(false);
                return true;
            }
        }
        return false;
    }
}

