/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

final class RemoteClusterConnection
extends AbstractComponent
implements TransportConnectionListener,
Closeable {
    private final TransportService transportService;
    private final ConnectionProfile remoteProfile;
    private final ConnectedNodes connectedNodes;
    private final String clusterAlias;
    private final int maxNumRemoteConnections;
    private final Predicate<DiscoveryNode> nodePredicate;
    private volatile List<DiscoveryNode> seedNodes;
    private final ConnectHandler connectHandler;
    private SetOnce<ClusterName> remoteClusterName = new SetOnce();

    RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
        super(settings);
        this.transportService = transportService;
        this.maxNumRemoteConnections = maxNumRemoteConnections;
        this.nodePredicate = nodePredicate;
        this.clusterAlias = clusterAlias;
        ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
        builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
        builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING);
        builder.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
        this.remoteProfile = builder.build();
        this.connectedNodes = new ConnectedNodes(clusterAlias);
        this.seedNodes = Collections.unmodifiableList(seedNodes);
        this.connectHandler = new ConnectHandler();
        transportService.addConnectionListener(this);
    }

    synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
        this.seedNodes = Collections.unmodifiableList(new ArrayList<DiscoveryNode>(seedNodes));
        this.connectHandler.connect(connectListener);
    }

    @Override
    public void onNodeDisconnected(DiscoveryNode node) {
        boolean remove = this.connectedNodes.remove(node);
        if (remove && this.connectedNodes.size() < this.maxNumRemoteConnections) {
            this.connectHandler.forceConnect();
        }
    }

    public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, ActionListener<ClusterSearchShardsResponse> listener) {
        if (this.connectedNodes.size() == 0) {
            this.ensureConnected(ActionListener.wrap(x -> this.fetchShardsInternal(searchRequest, listener), listener::onFailure));
        } else {
            this.fetchShardsInternal(searchRequest, listener);
        }
    }

    public void ensureConnected(ActionListener<Void> voidActionListener) {
        if (this.connectedNodes.size() == 0) {
            this.connectHandler.connect(voidActionListener);
        } else {
            voidActionListener.onResponse(null);
        }
    }

    private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener<ClusterSearchShardsResponse> listener) {
        DiscoveryNode node = this.connectedNodes.get();
        this.transportService.sendRequest(node, "indices:admin/shards/search_shards", searchShardsRequest, new TransportResponseHandler<ClusterSearchShardsResponse>(){

            @Override
            public ClusterSearchShardsResponse newInstance() {
                return new ClusterSearchShardsResponse();
            }

            @Override
            public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                listener.onResponse(clusterSearchShardsResponse);
            }

            @Override
            public void handleException(TransportException e) {
                listener.onFailure(e);
            }

            @Override
            public String executor() {
                return "search";
            }
        });
    }

    void collectNodes(final ActionListener<Function<String, DiscoveryNode>> listener) {
        Runnable runnable = () -> {
            ClusterStateRequest request = new ClusterStateRequest();
            request.clear();
            request.nodes(true);
            request.local(true);
            DiscoveryNode node = this.connectedNodes.get();
            this.transportService.sendRequest(node, "cluster:monitor/state", (TransportRequest)request, TransportRequestOptions.EMPTY, new TransportResponseHandler<ClusterStateResponse>(){

                @Override
                public ClusterStateResponse newInstance() {
                    return new ClusterStateResponse();
                }

                @Override
                public void handleResponse(ClusterStateResponse response) {
                    DiscoveryNodes nodes = response.getState().nodes();
                    listener.onResponse(nodes::get);
                }

                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        };
        try {
            if (this.connectedNodes.size() == 0) {
                this.ensureConnected(ActionListener.wrap(x -> runnable.run(), listener::onFailure));
            } else {
                runnable.run();
            }
        }
        catch (Exception ex) {
            listener.onFailure(ex);
        }
    }

    Transport.Connection getConnection(final DiscoveryNode remoteClusterNode) {
        DiscoveryNode discoveryNode = this.connectedNodes.get();
        final Transport.Connection connection = this.transportService.getConnection(discoveryNode);
        return new Transport.Connection(){

            @Override
            public DiscoveryNode getNode() {
                return remoteClusterNode;
            }

            @Override
            public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
                connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
            }

            @Override
            public void close() throws IOException {
                assert (false) : "proxy connections must not be closed";
            }

            @Override
            public Version getVersion() {
                return connection.getVersion();
            }
        };
    }

    Transport.Connection getConnection() {
        DiscoveryNode discoveryNode = this.connectedNodes.get();
        return this.transportService.getConnection(discoveryNode);
    }

    @Override
    public void close() throws IOException {
        this.connectHandler.close();
    }

    public boolean isClosed() {
        return this.connectHandler.isClosed();
    }

    boolean assertNoRunningConnections() {
        assert (this.connectHandler.running.availablePermits() == 1);
        return true;
    }

    boolean isNodeConnected(DiscoveryNode node) {
        return this.connectedNodes.contains(node);
    }

    DiscoveryNode getConnectedNode() {
        return this.connectedNodes.get();
    }

    void addConnectedNode(DiscoveryNode node) {
        this.connectedNodes.add(node);
    }

    public void getConnectionInfo(final ActionListener<RemoteConnectionInfo> listener) {
        Optional<DiscoveryNode> anyNode = this.connectedNodes.getAny();
        if (!anyNode.isPresent()) {
            RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(this.clusterAlias, Collections.emptyList(), Collections.emptyList(), this.maxNumRemoteConnections, 0, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(this.settings));
            listener.onResponse(remoteConnectionStats);
        } else {
            NodesInfoRequest request = new NodesInfoRequest();
            request.clear();
            request.http(true);
            this.transportService.sendRequest(anyNode.get(), "cluster:monitor/nodes/info", request, new TransportResponseHandler<NodesInfoResponse>(){

                @Override
                public NodesInfoResponse newInstance() {
                    return new NodesInfoResponse();
                }

                @Override
                public void handleResponse(NodesInfoResponse response) {
                    HashSet<TransportAddress> httpAddresses = new HashSet<TransportAddress>();
                    for (NodeInfo info : response.getNodes()) {
                        if (!RemoteClusterConnection.this.connectedNodes.contains(info.getNode()) || info.getHttp() == null) continue;
                        httpAddresses.add(info.getHttp().getAddress().publishAddress());
                    }
                    if (httpAddresses.size() < RemoteClusterConnection.this.maxNumRemoteConnections) {
                        for (NodeInfo info : response.getNodes()) {
                            if (RemoteClusterConnection.this.nodePredicate.test(info.getNode()) && info.getHttp() != null) {
                                httpAddresses.add(info.getHttp().getAddress().publishAddress());
                            }
                            if (httpAddresses.size() != RemoteClusterConnection.this.maxNumRemoteConnections) continue;
                            break;
                        }
                    }
                    RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(RemoteClusterConnection.this.clusterAlias, RemoteClusterConnection.this.seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toList()), new ArrayList<TransportAddress>(httpAddresses), RemoteClusterConnection.this.maxNumRemoteConnections, RemoteClusterConnection.this.connectedNodes.size(), RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(RemoteClusterConnection.this.settings));
                    listener.onResponse(remoteConnectionInfo);
                }

                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        }
    }

    int getNumNodesConnected() {
        return this.connectedNodes.size();
    }

    private static class ConnectedNodes
    implements Supplier<DiscoveryNode> {
        private final Set<DiscoveryNode> nodeSet = new HashSet<DiscoveryNode>();
        private final String clusterAlias;
        private Iterator<DiscoveryNode> currentIterator = null;

        private ConnectedNodes(String clusterAlias) {
            this.clusterAlias = clusterAlias;
        }

        public synchronized DiscoveryNode get() {
            this.ensureIteratorAvailable();
            if (this.currentIterator.hasNext()) {
                return this.currentIterator.next();
            }
            throw new IllegalStateException("No node available for cluster: " + this.clusterAlias);
        }

        synchronized boolean remove(DiscoveryNode node) {
            boolean setRemoval = this.nodeSet.remove(node);
            if (setRemoval) {
                this.currentIterator = null;
            }
            return setRemoval;
        }

        synchronized boolean add(DiscoveryNode node) {
            boolean added = this.nodeSet.add(node);
            if (added) {
                this.currentIterator = null;
            }
            return added;
        }

        synchronized int size() {
            return this.nodeSet.size();
        }

        synchronized boolean contains(DiscoveryNode node) {
            return this.nodeSet.contains(node);
        }

        synchronized Optional<DiscoveryNode> getAny() {
            this.ensureIteratorAvailable();
            if (this.currentIterator.hasNext()) {
                return Optional.of(this.currentIterator.next());
            }
            return Optional.empty();
        }

        private synchronized void ensureIteratorAvailable() {
            if (this.currentIterator == null) {
                this.currentIterator = this.nodeSet.iterator();
            } else if (!this.currentIterator.hasNext() && !this.nodeSet.isEmpty()) {
                this.currentIterator = this.nodeSet.iterator();
            }
        }
    }

    private class ConnectHandler
    implements Closeable {
        private final Semaphore running = new Semaphore(1);
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<ActionListener<Void>>(100);
        private final CancellableThreads cancellableThreads = new CancellableThreads();

        private ConnectHandler() {
        }

        void maybeConnect() {
            this.connect(null);
        }

        void connect(ActionListener<Void> connectListener) {
            this.connect(connectListener, false);
        }

        void forceConnect() {
            this.connect(null, true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void connect(ActionListener<Void> connectListener, boolean forceRun) {
            List toNotify;
            boolean runConnect;
            BlockingQueue<ActionListener<Void>> blockingQueue = this.queue;
            synchronized (blockingQueue) {
                if (connectListener != null && !this.queue.offer(connectListener)) {
                    connectListener.onFailure(new RejectedExecutionException("connect queue is full"));
                    return;
                }
                if (!forceRun && this.queue.isEmpty()) {
                    return;
                }
                runConnect = this.running.tryAcquire();
                if (runConnect) {
                    toNotify = new ArrayList();
                    this.queue.drainTo(toNotify);
                    if (this.closed.get()) {
                        this.running.release();
                        ActionListener.onFailure(toNotify, (Exception)((Object)new AlreadyClosedException("connect handler is already closed")));
                        return;
                    }
                } else {
                    toNotify = Collections.emptyList();
                }
            }
            if (runConnect) {
                this.forkConnect(toNotify);
            }
        }

        private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
            ThreadPool threadPool = RemoteClusterConnection.this.transportService.getThreadPool();
            ExecutorService executor = threadPool.executor("management");
            executor.submit(new AbstractRunnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(Exception e) {
                    BlockingQueue blockingQueue = ConnectHandler.this.queue;
                    synchronized (blockingQueue) {
                        ConnectHandler.this.running.release();
                    }
                    try {
                        ActionListener.onFailure(toNotify, e);
                    }
                    finally {
                        ConnectHandler.this.maybeConnect();
                    }
                }

                @Override
                protected void doRun() throws Exception {
                    ActionListener<Void> listener = ActionListener.wrap(x -> {
                        BlockingQueue blockingQueue = ConnectHandler.this.queue;
                        synchronized (blockingQueue) {
                            ConnectHandler.this.running.release();
                        }
                        try {
                            ActionListener.onResponse(toNotify, x);
                        }
                        finally {
                            ConnectHandler.this.maybeConnect();
                        }
                    }, e -> {
                        BlockingQueue blockingQueue = ConnectHandler.this.queue;
                        synchronized (blockingQueue) {
                            ConnectHandler.this.running.release();
                        }
                        try {
                            ActionListener.onFailure(toNotify, e);
                        }
                        finally {
                            ConnectHandler.this.maybeConnect();
                        }
                    });
                    ConnectHandler.this.collectRemoteNodes(RemoteClusterConnection.this.seedNodes.iterator(), RemoteClusterConnection.this.transportService, listener);
                }
            });
        }

        void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes, TransportService transportService, ActionListener<Void> listener) {
            if (Thread.currentThread().isInterrupted()) {
                listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
            }
            try {
                if (seedNodes.hasNext()) {
                    this.cancellableThreads.executeIO(() -> {
                        DiscoveryNode seedNode = (DiscoveryNode)seedNodes.next();
                        Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
                        boolean success = false;
                        try {
                            DiscoveryNode handshakeNode;
                            try {
                                handshakeNode = transportService.handshake(connection, RemoteClusterConnection.this.remoteProfile.getHandshakeTimeout().millis(), c -> RemoteClusterConnection.this.remoteClusterName.get() == null ? true : c.equals(RemoteClusterConnection.this.remoteClusterName.get()));
                            }
                            catch (IllegalStateException ex) {
                                RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected cluster name {}", (Object)connection.getNode(), RemoteClusterConnection.this.remoteClusterName.get()), (Throwable)ex);
                                throw ex;
                            }
                            if (RemoteClusterConnection.this.nodePredicate.test(handshakeNode) && RemoteClusterConnection.this.connectedNodes.size() < RemoteClusterConnection.this.maxNumRemoteConnections) {
                                transportService.connectToNode(handshakeNode, RemoteClusterConnection.this.remoteProfile);
                                RemoteClusterConnection.this.connectedNodes.add(handshakeNode);
                            }
                            ClusterStateRequest request = new ClusterStateRequest();
                            request.clear();
                            request.nodes(true);
                            ThreadPool threadPool = transportService.getThreadPool();
                            ThreadContext threadContext = threadPool.getThreadContext();
                            TransportService.ContextRestoreResponseHandler<ClusterStateResponse> responseHandler = new TransportService.ContextRestoreResponseHandler<ClusterStateResponse>(threadContext.newRestorableContext(false), new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes, this.cancellableThreads));
                            try (ThreadContext.StoredContext ignore = threadContext.stashContext();){
                                threadContext.markAsSystemContext();
                                transportService.sendRequest(connection, "cluster:monitor/state", (TransportRequest)request, TransportRequestOptions.EMPTY, responseHandler);
                            }
                            success = true;
                        }
                        finally {
                            if (!success) {
                                connection.close();
                            }
                        }
                    });
                } else {
                    listener.onFailure(new IllegalStateException("no seed node left"));
                }
            }
            catch (CancellableThreads.ExecutionCancelledException ex) {
                listener.onFailure(ex);
            }
            catch (IOException | IllegalStateException | ConnectTransportException ex) {
                if (seedNodes.hasNext()) {
                    RemoteClusterConnection.this.logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)ex);
                    this.collectRemoteNodes(seedNodes, transportService, listener);
                }
                listener.onFailure(ex);
            }
        }

        @Override
        public void close() throws IOException {
            try {
                if (this.closed.compareAndSet(false, true)) {
                    this.cancellableThreads.cancel("connect handler is closed");
                    this.running.acquire();
                    this.running.release();
                    this.maybeConnect();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        final boolean isClosed() {
            return this.closed.get();
        }

        private class SniffClusterStateResponseHandler
        implements TransportResponseHandler<ClusterStateResponse> {
            private final TransportService transportService;
            private final Transport.Connection connection;
            private final ActionListener<Void> listener;
            private final Iterator<DiscoveryNode> seedNodes;
            private final CancellableThreads cancellableThreads;

            SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection, ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes, CancellableThreads cancellableThreads) {
                this.transportService = transportService;
                this.connection = connection;
                this.listener = listener;
                this.seedNodes = seedNodes;
                this.cancellableThreads = cancellableThreads;
            }

            @Override
            public ClusterStateResponse newInstance() {
                return new ClusterStateResponse();
            }

            @Override
            public void handleResponse(ClusterStateResponse response) {
                assert (!this.transportService.getThreadPool().getThreadContext().isSystemContext()) : "context is a system context";
                try {
                    if (RemoteClusterConnection.this.remoteClusterName.get() == null) {
                        assert (response.getClusterName().value() != null);
                        RemoteClusterConnection.this.remoteClusterName.set((Object)response.getClusterName());
                    }
                    try (Transport.Connection theConnection = this.connection;){
                        this.cancellableThreads.executeIO(() -> {
                            DiscoveryNodes nodes = response.getState().nodes();
                            Iterable nodesIter = nodes.getNodes()::valuesIt;
                            for (DiscoveryNode node : nodesIter) {
                                if (!RemoteClusterConnection.this.nodePredicate.test(node) || RemoteClusterConnection.this.connectedNodes.size() >= RemoteClusterConnection.this.maxNumRemoteConnections) continue;
                                try {
                                    this.transportService.connectToNode(node, RemoteClusterConnection.this.remoteProfile);
                                    RemoteClusterConnection.this.connectedNodes.add(node);
                                }
                                catch (IllegalStateException | ConnectTransportException ex) {
                                    RemoteClusterConnection.this.logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", (Object)node), (Throwable)ex);
                                }
                            }
                        });
                    }
                    this.listener.onResponse(null);
                }
                catch (CancellableThreads.ExecutionCancelledException ex) {
                    this.listener.onFailure(ex);
                }
                catch (Exception ex) {
                    RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)ex);
                    ConnectHandler.this.collectRemoteNodes(this.seedNodes, this.transportService, this.listener);
                }
            }

            @Override
            public void handleException(TransportException exp) {
                assert (!this.transportService.getThreadPool().getThreadContext().isSystemContext()) : "context is a system context";
                RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)exp);
                try {
                    IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{this.connection});
                }
                finally {
                    ConnectHandler.this.collectRemoteNodes(this.seedNodes, this.transportService, this.listener);
                }
            }

            @Override
            public String executor() {
                return "management";
            }
        }
    }
}

