/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zeppelin.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.MembershipConfig;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.discovery.ManagedNodeDiscoveryService;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.BroadcastService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.primitive.PrimitiveState;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.utils.net.Address;
import java.io.File;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.cluster.BroadcastServiceAdapter;
import org.apache.zeppelin.cluster.ClusterManager;
import org.apache.zeppelin.cluster.ClusterMonitor;
import org.apache.zeppelin.cluster.event.ClusterEventListener;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterManagerServer
extends ClusterManager {
    private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerServer.class);
    private static ClusterManagerServer instance = null;
    protected RaftServer raftServer = null;
    protected MessagingService messagingService = null;
    private List<ClusterEventListener> clusterIntpEventListeners = new ArrayList<ClusterEventListener>();
    private List<ClusterEventListener> clusterNoteEventListeners = new ArrayList<ClusterEventListener>();
    private List<ClusterEventListener> clusterAuthEventListeners = new ArrayList<ClusterEventListener>();
    private List<ClusterEventListener> clusterIntpSettingEventListeners = new ArrayList<ClusterEventListener>();
    public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC";
    public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC";
    public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC";
    public static String CLUSTER_INTP_SETTING_EVENT_TOPIC = "CLUSTER_INTP_SETTING_EVENT_TOPIC";
    private BiFunction<Address, byte[], byte[]> subscribeClusterIntpEvent = (address, data) -> {
        String message = new String((byte[])data);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("subscribeClusterIntpEvent() {}", (Object)message);
        }
        for (ClusterEventListener eventListener : this.clusterIntpEventListeners) {
            eventListener.onClusterEvent(message);
        }
        return null;
    };
    private BiFunction<Address, byte[], byte[]> subscribeClusterNoteEvent = (address, data) -> {
        String message = new String((byte[])data);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("subscribeClusterNoteEvent() {}", (Object)message);
        }
        for (ClusterEventListener eventListener : this.clusterNoteEventListeners) {
            eventListener.onClusterEvent(message);
        }
        return null;
    };
    private BiFunction<Address, byte[], byte[]> subscribeClusterAuthEvent = (address, data) -> {
        String message = new String((byte[])data);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("subscribeClusterAuthEvent() {}", (Object)message);
        }
        for (ClusterEventListener eventListener : this.clusterAuthEventListeners) {
            eventListener.onClusterEvent(message);
        }
        return null;
    };
    private BiFunction<Address, byte[], byte[]> subscribeIntpSettingEvent = (address, data) -> {
        String message = new String((byte[])data);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("subscribeIntpSettingEvent() {}", (Object)message);
        }
        for (ClusterEventListener eventListener : this.clusterIntpSettingEventListeners) {
            eventListener.onClusterEvent(message);
        }
        return null;
    };

    private ClusterManagerServer(ZeppelinConfiguration zConf) {
        super(zConf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ClusterManagerServer getInstance(ZeppelinConfiguration zConf) {
        Class<ClusterManagerServer> clazz = ClusterManagerServer.class;
        synchronized (ClusterManagerServer.class) {
            if (instance == null) {
                instance = new ClusterManagerServer(zConf);
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return instance;
        }
    }

    @Override
    public void start() {
        if (!this.zConf.isClusterMode()) {
            return;
        }
        this.initThread();
        String clusterName = this.getClusterNodeName();
        this.clusterMonitor = new ClusterMonitor(this);
        this.clusterMonitor.start(ClusterMetaType.SERVER_META, clusterName);
        super.start();
    }

    @VisibleForTesting
    public void initTestCluster(String clusterAddrList, String host, int port) {
        this.isTest = true;
        this.zeplServerHost = host;
        this.raftServerPort = port;
        this.clusterNodes.clear();
        this.raftAddressMap.clear();
        this.clusterMemberIds.clear();
        String[] cluster = clusterAddrList.split(",");
        for (int i = 0; i < cluster.length; ++i) {
            String[] parts = cluster[i].split(":");
            String clusterHost = parts[0];
            int clusterPort = Integer.valueOf(parts[1]);
            String memberId = clusterHost + ":" + clusterPort;
            Address address = Address.from((String)clusterHost, (int)clusterPort);
            Node node = Node.builder().withId(memberId).withAddress(address).build();
            this.clusterNodes.add(node);
            this.raftAddressMap.put(MemberId.from((String)memberId), address);
            this.clusterMemberIds.add(MemberId.from((String)memberId));
        }
    }

    @Override
    public boolean raftInitialized() {
        return null != this.raftServer && this.raftServer.isRunning() && null != this.raftClient && null != this.raftSessionClient && this.raftSessionClient.getState() == PrimitiveState.CONNECTED;
    }

    @Override
    public boolean isClusterLeader() {
        return null != this.raftServer && this.raftServer.isRunning() && this.raftServer.isLeader();
    }

    private void initThread() {
        new Thread(new Runnable(){

            @Override
            public void run() {
                LOGGER.info("RaftServer run() >>>");
                Address address = Address.from((String)ClusterManagerServer.this.zeplServerHost, (int)ClusterManagerServer.this.raftServerPort);
                Member member = Member.builder((MemberId)MemberId.from((String)(ClusterManagerServer.this.zeplServerHost + ":" + ClusterManagerServer.this.raftServerPort))).withAddress(address).build();
                ClusterManagerServer.this.messagingService = (MessagingService)NettyMessagingService.builder().withAddress(address).build().start().join();
                RaftServerMessagingProtocol protocol = new RaftServerMessagingProtocol(ClusterManagerServer.this.messagingService, ClusterManager.protocolSerializer, ClusterManagerServer.this.raftAddressMap::get);
                BootstrapService bootstrapService = new BootstrapService(){

                    public MessagingService getMessagingService() {
                        return ClusterManagerServer.this.messagingService;
                    }

                    public BroadcastService getBroadcastService() {
                        return new BroadcastServiceAdapter();
                    }
                };
                DefaultClusterMembershipService clusterService = new DefaultClusterMembershipService(member, (ManagedNodeDiscoveryService)new DefaultNodeDiscoveryService(bootstrapService, (Node)member, (NodeDiscoveryProvider)new BootstrapDiscoveryProvider(ClusterManagerServer.this.clusterNodes)), bootstrapService, new MembershipConfig());
                File atomixDateDir = Files.createTempDir();
                atomixDateDir.deleteOnExit();
                RaftServer.Builder builder = RaftServer.builder((MemberId)member.id()).withMembershipService((ClusterMembershipService)clusterService).withProtocol((RaftServerProtocol)protocol).withStorage(RaftStorage.builder().withStorageLevel(StorageLevel.MEMORY).withDirectory(atomixDateDir).withSerializer(ClusterManager.storageSerializer).withMaxSegmentSize(0x100000).build());
                ClusterManagerServer.this.raftServer = (RaftServer)builder.build();
                ClusterManagerServer.this.raftServer.bootstrap((Collection)ClusterManagerServer.this.clusterMemberIds);
                ClusterManagerServer.this.messagingService.registerHandler(CLUSTER_INTP_EVENT_TOPIC, ClusterManagerServer.this.subscribeClusterIntpEvent, MoreExecutors.directExecutor());
                ClusterManagerServer.this.messagingService.registerHandler(CLUSTER_NOTE_EVENT_TOPIC, ClusterManagerServer.this.subscribeClusterNoteEvent, MoreExecutors.directExecutor());
                ClusterManagerServer.this.messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC, ClusterManagerServer.this.subscribeClusterAuthEvent, MoreExecutors.directExecutor());
                ClusterManagerServer.this.messagingService.registerHandler(CLUSTER_INTP_SETTING_EVENT_TOPIC, ClusterManagerServer.this.subscribeIntpSettingEvent, MoreExecutors.directExecutor());
                HashMap<String, Object> meta = new HashMap<String, Object>();
                String nodeName = ClusterManagerServer.this.getClusterNodeName();
                meta.put(ClusterMeta.NODE_NAME, nodeName);
                meta.put(ClusterMeta.SERVER_HOST, ClusterManagerServer.this.zeplServerHost);
                meta.put(ClusterMeta.SERVER_PORT, ClusterManagerServer.this.raftServerPort);
                meta.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
                ClusterManagerServer.this.putClusterMeta(ClusterMetaType.SERVER_META, nodeName, meta);
                LOGGER.info("RaftServer run() <<<");
            }
        }).start();
    }

    @Override
    public void shutdown() {
        if (!this.zConf.isClusterMode()) {
            return;
        }
        try {
            this.deleteClusterMeta(ClusterMetaType.SERVER_META, this.getClusterNodeName());
            Thread.sleep(300L);
            if (this.clusterMonitor != null) {
                this.clusterMonitor.shutdown();
            }
            Thread.sleep(300L);
        }
        catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), (Throwable)e);
        }
        if (null != this.raftServer && this.raftServer.isRunning()) {
            try {
                this.raftServer.shutdown().get(3L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), (Throwable)e);
            }
            catch (ExecutionException e) {
                LOGGER.error(e.getMessage(), (Throwable)e);
            }
            catch (TimeoutException e) {
                LOGGER.error(e.getMessage(), (Throwable)e);
            }
        }
        super.shutdown();
        instance = null;
    }

    public HashMap<String, Object> getIdleNodeMeta() {
        HashMap<String, Object> idleNodeMeta = null;
        HashMap<String, HashMap<String, Object>> clusterMeta = this.getClusterMeta(ClusterMetaType.SERVER_META, "");
        long memoryIdle = 0L;
        for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
            long memoryUsed;
            long memoryCapacity;
            long idle;
            HashMap<String, Object> meta = entry.getValue();
            String status = (String)meta.get(ClusterMeta.STATUS);
            if (null == status || StringUtils.isEmpty((CharSequence)status) || status.equals(ClusterMeta.OFFLINE_STATUS) || (idle = (memoryCapacity = ((Long)meta.get(ClusterMeta.MEMORY_CAPACITY)).longValue()) - (memoryUsed = ((Long)meta.get(ClusterMeta.MEMORY_USED)).longValue())) <= memoryIdle) continue;
            memoryIdle = idle;
            idleNodeMeta = meta;
        }
        return idleNodeMeta;
    }

    public void unicastClusterEvent(String host, int port, String topic, String msg) {
        LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}", new Object[]{host, port, topic, msg});
        Address address = Address.from((String)host, (int)port);
        CompletableFuture response = this.messagingService.sendAndReceive(address, topic, msg.getBytes(), Duration.ofSeconds(2L));
        response.whenComplete((r, e) -> {
            if (null == e) {
                LOGGER.error(e.getMessage(), e);
            }
        });
    }

    public void broadcastClusterEvent(String topic, String msg) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("send broadcastClusterEvent message {}", (Object)msg);
        }
        for (Node node : this.clusterNodes) {
            if (StringUtils.equals((CharSequence)node.address().host(), (CharSequence)this.zeplServerHost) && node.address().port() == this.raftServerPort) continue;
            CompletableFuture response = this.messagingService.sendAndReceive(node.address(), topic, msg.getBytes(), Duration.ofSeconds(2L));
            response.whenComplete((r, e) -> {
                if (null == e) {
                    LOGGER.error(e.getMessage(), e);
                } else {
                    LOGGER.info("broadcastClusterNoteEvent success! {}", (Object)msg);
                }
            });
        }
    }

    public void addClusterEventListeners(String topic, ClusterEventListener listener) {
        if (StringUtils.equals((CharSequence)topic, (CharSequence)CLUSTER_INTP_EVENT_TOPIC)) {
            this.clusterIntpEventListeners.add(listener);
        } else if (StringUtils.equals((CharSequence)topic, (CharSequence)CLUSTER_NOTE_EVENT_TOPIC)) {
            this.clusterNoteEventListeners.add(listener);
        } else if (StringUtils.equals((CharSequence)topic, (CharSequence)CLUSTER_AUTH_EVENT_TOPIC)) {
            this.clusterAuthEventListeners.add(listener);
        } else if (StringUtils.equals((CharSequence)topic, (CharSequence)CLUSTER_INTP_SETTING_EVENT_TOPIC)) {
            this.clusterIntpSettingEventListeners.add(listener);
        } else {
            LOGGER.error("Unknow cluster event topic : {}", (Object)topic);
        }
    }
}

