/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.impl.connection.tcp;

import com.hazelcast.client.AuthenticationException;
import com.hazelcast.client.ClientNotAllowedInClusterException;
import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.HazelcastClientOfflineException;
import com.hazelcast.client.LoadBalancer;
import com.hazelcast.client.UnsupportedClusterVersionException;
import com.hazelcast.client.UnsupportedRoutingModeException;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientConnectionStrategyConfig;
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.config.ClientTpcConfig;
import com.hazelcast.client.config.ConnectionRetryConfig;
import com.hazelcast.client.config.RoutingMode;
import com.hazelcast.client.impl.clientside.CandidateClusterContext;
import com.hazelcast.client.impl.clientside.ClientLoggingService;
import com.hazelcast.client.impl.clientside.ClusterDiscoveryService;
import com.hazelcast.client.impl.clientside.HazelcastClientInstance;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.clientside.LifecycleServiceImpl;
import com.hazelcast.client.impl.clientside.SubsetMembersView;
import com.hazelcast.client.impl.connection.AddressProvider;
import com.hazelcast.client.impl.connection.Addresses;
import com.hazelcast.client.impl.connection.ClientConnection;
import com.hazelcast.client.impl.connection.ClientConnectionManager;
import com.hazelcast.client.impl.connection.tcp.AuthenticationKeyValuePairConstants;
import com.hazelcast.client.impl.connection.tcp.AuthenticationResponse;
import com.hazelcast.client.impl.connection.tcp.TcpClientConnection;
import com.hazelcast.client.impl.connection.tcp.TpcChannelClientConnectionAdapter;
import com.hazelcast.client.impl.connection.tcp.TpcChannelCloseListener;
import com.hazelcast.client.impl.connection.tcp.TpcChannelConnector;
import com.hazelcast.client.impl.connection.tcp.WaitStrategy;
import com.hazelcast.client.impl.management.ClientConnectionProcessListener;
import com.hazelcast.client.impl.management.ClientConnectionProcessListenerRegistry;
import com.hazelcast.client.impl.management.ManagementCenterService;
import com.hazelcast.client.impl.protocol.AuthenticationStatus;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCodec;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCustomCodec;
import com.hazelcast.client.impl.spi.ClientClusterService;
import com.hazelcast.client.impl.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
import com.hazelcast.client.impl.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.impl.spi.impl.ClientPartitionServiceImpl;
import com.hazelcast.client.properties.ClientProperty;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.networking.nio.NioNetworking;
import com.hazelcast.internal.nio.ConnectionListener;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.AddressUtil;
import com.hazelcast.internal.util.EmptyStatement;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.IterableUtil;
import com.hazelcast.internal.util.RuntimeAvailableProcessors;
import com.hazelcast.internal.util.ThreadAffinity;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.internal.util.executor.LoggingScheduledExecutor;
import com.hazelcast.internal.util.executor.PoolExecutorThreadFactory;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.nio.SocketInterceptor;
import com.hazelcast.security.Credentials;
import com.hazelcast.security.PasswordCredentials;
import com.hazelcast.security.TokenCredentials;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import com.hazelcast.sql.impl.CoreQueryUtils;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nonnull;

public class TcpClientConnectionManager
implements ClientConnectionManager,
MembershipListener {
    public static final HazelcastProperty SKIP_MEMBER_LIST_DURING_RECONNECTION = new HazelcastProperty("hazelcast.client.internal.skip.member.list.during.reconnection", false);
    private static final int DEFAULT_IO_THREAD_COUNT = 3;
    private static final int EXECUTOR_CORE_POOL_SIZE = 10;
    private static final int SMALL_MACHINE_PROCESSOR_COUNT = 8;
    private static final int SQL_CONNECTION_RANDOM_ATTEMPTS = 10;
    protected final AtomicInteger connectionIdGen = new AtomicInteger();
    private final AtomicBoolean isAlive = new AtomicBoolean();
    private final ILogger logger;
    private final int connectionTimeoutMillis;
    private final HazelcastClientInstanceImpl client;
    private final Collection<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
    private final ClientConnectionProcessListenerRegistry connectionProcessListenerRunner;
    private final NioNetworking networking;
    private final long authenticationTimeout;
    private final String connectionType;
    private final UUID clientUuid = UuidUtil.newUnsecureUUID();
    private final LinkedList<Integer> outboundPorts = new LinkedList();
    private final Set<String> labels;
    private final int outboundPortCount;
    private final boolean failoverConfigProvided;
    private final ScheduledExecutorService executor;
    private final boolean shuffleMemberList;
    private final WaitStrategy waitStrategy;
    private final ClusterDiscoveryService clusterDiscoveryService;
    private final boolean asyncStart;
    private final ClientConnectionStrategyConfig.ReconnectMode reconnectMode;
    private final LoadBalancer loadBalancer;
    private final RoutingMode routingMode;
    private final boolean isTpcAwareClient;
    private final boolean skipMemberListDuringReconnection;
    private final ClientClusterService clientClusterService;
    private volatile Credentials currentCredentials;
    private final Object clientStateMutex = new Object();
    private final ConcurrentMap<UUID, TcpClientConnection> activeConnections = new ConcurrentHashMap<UUID, TcpClientConnection>();
    private volatile ClientState clientState = ClientState.INITIAL;
    private volatile boolean connectToClusterTaskSubmitted;
    private boolean establishedInitialClusterConnection;

    public TcpClientConnectionManager(HazelcastClientInstanceImpl client) {
        this.client = client;
        ClientConfig config = client.getClientConfig();
        HazelcastProperties properties = client.getProperties();
        this.loadBalancer = client.getLoadBalancer();
        this.labels = Collections.unmodifiableSet(config.getLabels());
        LoggingService loggingService = client.getLoggingService();
        this.logger = loggingService.getLogger(ClientConnectionManager.class);
        this.connectionType = properties.getBoolean(ManagementCenterService.MC_CLIENT_MODE_PROP) ? "MCJVM" : "JVM";
        this.connectionTimeoutMillis = this.initConnectionTimeoutMillis();
        this.networking = this.initNetworking();
        this.outboundPorts.addAll(this.getOutboundPorts());
        this.outboundPortCount = this.outboundPorts.size();
        this.authenticationTimeout = properties.getPositiveMillisOrDefault(ClientProperty.HEARTBEAT_TIMEOUT);
        this.failoverConfigProvided = client.getFailoverConfig() != null;
        this.executor = this.createExecutorService();
        this.clusterDiscoveryService = client.getClusterDiscoveryService();
        this.waitStrategy = this.initializeWaitStrategy(config);
        this.shuffleMemberList = properties.getBoolean(ClientProperty.SHUFFLE_MEMBER_LIST);
        this.routingMode = TcpClientConnectionManager.decideRoutingMode(config);
        this.isTpcAwareClient = config.getTpcConfig().isEnabled();
        this.asyncStart = config.getConnectionStrategyConfig().isAsyncStart();
        this.reconnectMode = config.getConnectionStrategyConfig().getReconnectMode();
        this.connectionProcessListenerRunner = new ClientConnectionProcessListenerRegistry(client);
        this.skipMemberListDuringReconnection = properties.getBoolean(SKIP_MEMBER_LIST_DURING_RECONNECTION);
        this.clientClusterService = client.getClientClusterService();
    }

    private static RoutingMode decideRoutingMode(ClientConfig config) {
        ClientNetworkConfig networkConfig = config.getNetworkConfig();
        RoutingMode mode = networkConfig.getClusterRoutingConfig().getRoutingMode();
        if (config.getTpcConfig().isEnabled() && mode != RoutingMode.ALL_MEMBERS) {
            throw new IllegalStateException("TPC is enabled but our RoutingMode is " + String.valueOf((Object)mode) + " instead of ALL_MEMBERS!");
        }
        return mode;
    }

    private int initConnectionTimeoutMillis() {
        ClientNetworkConfig networkConfig = this.client.getClientConfig().getNetworkConfig();
        int connTimeout = networkConfig.getConnectionTimeout();
        return connTimeout == 0 ? Integer.MAX_VALUE : connTimeout;
    }

    private ScheduledExecutorService createExecutorService() {
        ClassLoader classLoader = this.client.getClientConfig().getClassLoader();
        String name = this.client.getName();
        return new LoggingScheduledExecutor(this.logger, 10, new PoolExecutorThreadFactory(name + ".internal-", classLoader), (r, executor) -> {
            String message = "Internal executor rejected task: " + String.valueOf(r) + ", because client is shutting down...";
            this.logger.finest(message);
            throw new RejectedExecutionException(message);
        });
    }

    private Collection<Integer> getOutboundPorts() {
        ClientNetworkConfig networkConfig = this.client.getClientConfig().getNetworkConfig();
        Collection<Integer> outboundPorts = networkConfig.getOutboundPorts();
        Collection<String> outboundPortDefinitions = networkConfig.getOutboundPortDefinitions();
        return AddressUtil.getOutboundPorts(outboundPorts, outboundPortDefinitions);
    }

    public NioNetworking getNetworking() {
        return this.networking;
    }

    protected NioNetworking initNetworking() {
        HazelcastProperties properties = this.client.getProperties();
        int configuredInputThreads = properties.getInteger(ClientProperty.IO_INPUT_THREAD_COUNT);
        int configuredOutputThreads = properties.getInteger(ClientProperty.IO_OUTPUT_THREAD_COUNT);
        int inputThreads = this.findThreadCount(configuredInputThreads);
        int outputThreads = this.findThreadCount(configuredOutputThreads);
        return new NioNetworking(new NioNetworking.Context().loggingService(this.client.getLoggingService()).metricsRegistry(this.client.getMetricsRegistry()).threadNamePrefix(this.client.getName()).errorHandler(new ClientChannelErrorHandler()).inputThreadCount(inputThreads).inputThreadAffinity(ThreadAffinity.newSystemThreadAffinity("hazelcast.client.io.input.thread.affinity")).outputThreadCount(outputThreads).outputThreadAffinity(ThreadAffinity.newSystemThreadAffinity("hazelcast.client.io.output.thread.affinity")).balancerIntervalSeconds(properties.getInteger(ClientProperty.IO_BALANCER_INTERVAL_SECONDS)).writeThroughEnabled(properties.getBoolean(ClientProperty.IO_WRITE_THROUGH_ENABLED)).concurrencyDetection(this.client.getConcurrencyDetection()));
    }

    private int findThreadCount(int configuredThreadCount) {
        if (configuredThreadCount != -1) {
            return configuredThreadCount;
        }
        if (this.routingMode == RoutingMode.SINGLE_MEMBER) {
            return 1;
        }
        return RuntimeAvailableProcessors.get() > 8 ? 3 : 1;
    }

    private WaitStrategy initializeWaitStrategy(ClientConfig clientConfig) {
        ConnectionRetryConfig retryConfig = clientConfig.getConnectionStrategyConfig().getConnectionRetryConfig();
        long clusterConnectTimeout = retryConfig.getClusterConnectTimeoutMillis();
        if (clusterConnectTimeout == -1L) {
            clusterConnectTimeout = this.failoverConfigProvided ? 120000L : Long.MAX_VALUE;
        }
        return new WaitStrategy(retryConfig.getInitialBackoffMillis(), retryConfig.getMaxBackoffMillis(), retryConfig.getMultiplier(), clusterConnectTimeout, retryConfig.getJitter(), this.logger);
    }

    public synchronized void start() {
        if (!this.isAlive.compareAndSet(false, true)) {
            return;
        }
        this.startNetworking();
    }

    public void tryConnectToAllClusterMembers(boolean sync) {
        if (this.routingMode == RoutingMode.SINGLE_MEMBER) {
            return;
        }
        if (sync) {
            for (Member member : this.client.getClientClusterService().getEffectiveMemberList()) {
                try {
                    this.getOrConnectToMember(member, false);
                }
                catch (Exception e) {
                    EmptyStatement.ignore(e);
                }
            }
        }
        this.executor.scheduleWithFixedDelay(new ConnectToAllClusterMembersTask(), 1L, 1L, TimeUnit.SECONDS);
    }

    protected void startNetworking() {
        this.networking.restart();
    }

    public synchronized void shutdown() {
        if (!this.isAlive.compareAndSet(true, false)) {
            return;
        }
        this.executor.shutdownNow();
        ClientExecutionServiceImpl.awaitExecutorTermination("cluster", this.executor, this.logger);
        for (ClientConnection connection : this.activeConnections.values()) {
            connection.close("Hazelcast client is shutting down", null);
        }
        this.stopNetworking();
        this.connectionListeners.clear();
        this.clusterDiscoveryService.current().destroy();
    }

    protected void stopNetworking() {
        this.networking.shutdown();
    }

    public void connectToCluster() {
        this.clusterDiscoveryService.current().start();
        if (this.asyncStart) {
            this.submitConnectToClusterTask();
        } else {
            this.doConnectToCluster();
        }
    }

    private void submitConnectToClusterTask() {
        if (this.connectToClusterTaskSubmitted) {
            return;
        }
        this.executor.submit(() -> {
            try {
                this.doConnectToCluster();
                Object object = this.clientStateMutex;
                synchronized (object) {
                    this.connectToClusterTaskSubmitted = false;
                    if (this.activeConnections.isEmpty()) {
                        if (this.logger.isFineEnabled()) {
                            this.logger.warning("No connection to cluster: " + String.valueOf(this.clientClusterService.getClusterId()));
                        }
                        this.submitConnectToClusterTask();
                    }
                }
            }
            catch (Throwable e) {
                this.logger.warning("Could not connect to any cluster, shutting down the client", e);
                this.shutdownWithExternalThread();
            }
        });
        this.connectToClusterTaskSubmitted = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doConnectToCluster() {
        CandidateClusterContext currentContext = this.clusterDiscoveryService.current();
        this.logger.info("Trying to connect to cluster: " + currentContext.getClusterName());
        if (this.doConnectToCandidateCluster(currentContext, false)) {
            this.connectionProcessListenerRunner.onClusterConnectionSucceeded(currentContext.getClusterName());
            return;
        }
        Object object = this.clientStateMutex;
        synchronized (object) {
            if (!this.activeConnections.isEmpty()) {
                return;
            }
            this.clientState = ClientState.SWITCHING_CLUSTER;
        }
        if (this.clusterDiscoveryService.tryNextCluster(this::destroyCurrentClusterConnectionAndTryNextCluster)) {
            return;
        }
        String msg = this.client.getLifecycleService().isRunning() ? "Unable to connect to any cluster." : "Client is being shutdown.";
        throw new IllegalStateException(msg);
    }

    private Boolean destroyCurrentClusterConnectionAndTryNextCluster(CandidateClusterContext currentContext, CandidateClusterContext nextContext) {
        currentContext.destroy();
        this.client.onTryToConnectNextCluster();
        nextContext.start();
        ((ClientLoggingService)this.client.getLoggingService()).updateClusterName(nextContext.getClusterName());
        this.logger.info("Trying to connect to next cluster: " + nextContext.getClusterName());
        if (this.doConnectToCandidateCluster(nextContext, true)) {
            this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CHANGED_CLUSTER);
            return true;
        }
        return false;
    }

    <A> ClientConnection connect(A target, Function<A, ClientConnection> getOrConnectFunction, Function<A, Address> addressTranslator) {
        try {
            this.logger.info("Trying to connect to " + String.valueOf(target));
            return getOrConnectFunction.apply(target);
        }
        catch (UnsupportedClusterVersionException | UnsupportedRoutingModeException | InvalidConfigurationException e) {
            this.logger.warning("Exception during initial connection to " + String.valueOf(target) + ": " + String.valueOf(e));
            throw ExceptionUtil.rethrow(e);
        }
        catch (ClientNotAllowedInClusterException e) {
            this.logger.warning("Exception during initial connection to " + String.valueOf(target) + ": " + String.valueOf(e));
            throw e;
        }
        catch (TargetDisconnectedException e) {
            this.logger.warning("Exception during initial connection to " + String.valueOf(target) + ": " + String.valueOf(e));
            this.connectionProcessListenerRunner.onRemoteClosedConnection(addressTranslator, target);
            return null;
        }
        catch (Exception e) {
            this.logger.warning("Exception during initial connection to " + String.valueOf(target) + ": " + String.valueOf(e));
            this.connectionProcessListenerRunner.onConnectionAttemptFailed(addressTranslator, target);
            return null;
        }
    }

    private void fireLifecycleEvent(LifecycleEvent.LifecycleState state) {
        LifecycleServiceImpl lifecycleService = (LifecycleServiceImpl)this.client.getLifecycleService();
        lifecycleService.fireLifecycleEvent(state);
    }

    private boolean doConnectToCandidateCluster(CandidateClusterContext context, boolean switchingToNextCluster) {
        HashSet<Address> triedAddresses = new HashSet<Address>();
        try {
            this.waitStrategy.reset();
            do {
                HashSet<Address> triedAddressesPerAttempt;
                if (this.tryConnectToMemberList(switchingToNextCluster, triedAddressesPerAttempt = new HashSet<Address>())) {
                    return true;
                }
                for (Address address : this.getPossibleMemberAddresses(context.getAddressProvider())) {
                    this.checkClientActive();
                    if (!triedAddressesPerAttempt.add(address)) continue;
                    this.connectionProcessListenerRunner.onAttemptingToConnectToTarget(this::translate, address);
                    ClientConnection connection = this.connect(address, o -> this.getOrConnectToAddress((Address)o, switchingToNextCluster), this::translate);
                    if (connection == null) continue;
                    return true;
                }
                triedAddresses.addAll(triedAddressesPerAttempt);
                if (!triedAddressesPerAttempt.isEmpty()) continue;
                this.checkClientActive();
            } while (this.waitStrategy.sleep());
        }
        catch (ClientNotAllowedInClusterException | InvalidConfigurationException e) {
            this.logger.warning("Stopped trying on the cluster: " + context.getClusterName() + " reason: " + e.getMessage());
        }
        catch (UnsupportedRoutingModeException e) {
            this.connectionProcessListenerRunner.onClusterConnectionFailed(context.getClusterName());
            this.logger.warning("Stopped trying on the cluster: " + context.getClusterName() + " reason: " + e.getMessage());
            throw new InvalidConfigurationException(e.getMessage());
        }
        catch (UnsupportedClusterVersionException e) {
            this.connectionProcessListenerRunner.onClusterConnectionFailed(context.getClusterName());
            this.logger.warning("Stopped trying on the cluster: " + context.getClusterName() + " reason: " + e.getMessage());
            throw new ClientNotAllowedInClusterException(e.getMessage());
        }
        this.connectionProcessListenerRunner.onClusterConnectionFailed(context.getClusterName());
        this.logger.info("Unable to connect to any address from the cluster with name: " + context.getClusterName() + ". The following addresses were tried: " + String.valueOf(triedAddresses));
        return false;
    }

    private boolean tryConnectToMemberList(boolean switchingToNextCluster, Set<Address> triedAddressesPerAttempt) {
        if (this.skipMemberListDuringReconnection) {
            return false;
        }
        ArrayList<Member> memberList = new ArrayList<Member>(this.client.getClientClusterService().getEffectiveMemberList());
        if (this.shuffleMemberList) {
            Collections.shuffle(memberList);
        }
        for (Member member : memberList) {
            this.checkClientActive();
            triedAddressesPerAttempt.add(member.getAddress());
            this.connectionProcessListenerRunner.onAttemptingToConnectToTarget(this::translate, member);
            ClientConnection connection = this.connect(member, o -> this.getOrConnectToMember((Member)o, switchingToNextCluster), this::translate);
            if (connection == null) continue;
            return true;
        }
        return false;
    }

    @Override
    public String getConnectionType() {
        return this.connectionType;
    }

    @Override
    public void checkInvocationAllowed() throws IOException {
        ClientState state = this.clientState;
        if (state == ClientState.INITIALIZED_ON_CLUSTER && !this.activeConnections.isEmpty()) {
            return;
        }
        if (state == ClientState.INITIAL) {
            if (this.asyncStart) {
                throw new HazelcastClientOfflineException();
            }
            throw new IOException("No connection found to cluster since the client is starting.");
        }
        if (ClientConnectionStrategyConfig.ReconnectMode.ASYNC == this.reconnectMode) {
            throw new HazelcastClientOfflineException();
        }
        throw new IOException("No connection found to cluster.");
    }

    @Override
    public boolean clientInitializedOnCluster() {
        return this.clientState == ClientState.INITIALIZED_ON_CLUSTER;
    }

    Collection<Address> getPossibleMemberAddresses(AddressProvider addressProvider) {
        LinkedHashSet<Address> addresses = new LinkedHashSet<Address>();
        try {
            Addresses result = addressProvider.loadAddresses(this.connectionProcessListenerRunner);
            if (this.shuffleMemberList) {
                Collections.shuffle(result.primary());
                Collections.shuffle(result.secondary());
            }
            addresses.addAll(result.primary());
            addresses.addAll(result.secondary());
        }
        catch (NullPointerException e) {
            throw e;
        }
        catch (Exception e) {
            this.logger.warning("Exception from AddressProvider: " + String.valueOf(addressProvider), e);
        }
        return addresses;
    }

    private void shutdownWithExternalThread() {
        new Thread(() -> {
            try {
                this.client.getLifecycleService().shutdown();
            }
            catch (Exception exception) {
                this.logger.severe("Exception during client shutdown", exception);
            }
        }, this.client.getName() + ".clientShutdown-").start();
    }

    @Override
    public Collection<ClientConnection> getActiveConnections() {
        return this.activeConnections.values();
    }

    @Override
    public boolean isAlive() {
        return this.isAlive.get();
    }

    @Override
    public UUID getClientUuid() {
        return this.clientUuid;
    }

    @Override
    public ClientConnection getActiveConnection(@Nonnull UUID uuid) {
        return (ClientConnection)this.activeConnections.get(uuid);
    }

    TcpClientConnection getOrConnectToAddress(@Nonnull Address address, boolean switchingToNextCluster) {
        for (ClientConnection activeConnection : this.getActiveConnections()) {
            if (!activeConnection.getRemoteAddress().equals(address)) continue;
            return (TcpClientConnection)activeConnection;
        }
        address = this.translate(address);
        TcpClientConnection connection = this.createSocketConnection(address);
        AuthenticationResponse response = this.authenticateOnCluster(connection);
        return this.onAuthenticated(connection, response, switchingToNextCluster);
    }

    TcpClientConnection getOrConnectToMember(@Nonnull Member member, boolean switchingToNextCluster) {
        UUID uuid = member.getUuid();
        TcpClientConnection connection = (TcpClientConnection)this.activeConnections.get(uuid);
        if (connection != null) {
            return connection;
        }
        Address address = this.translate(member);
        connection = this.createSocketConnection(address);
        AuthenticationResponse response = this.authenticateOnCluster(connection);
        return this.onAuthenticated(connection, response, switchingToNextCluster);
    }

    private void fireConnectionEvent(TcpClientConnection connection, boolean isAdded) {
        if (!this.isAlive()) {
            return;
        }
        try {
            for (ConnectionListener listener : this.connectionListeners) {
                if (isAdded) {
                    this.executor.execute(() -> listener.connectionAdded(connection));
                    continue;
                }
                this.executor.execute(() -> listener.connectionRemoved(connection));
            }
        }
        catch (RejectedExecutionException e) {
            EmptyStatement.ignore(e);
        }
    }

    private boolean useAnyOutboundPort() {
        return this.outboundPortCount == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int acquireOutboundPort() {
        if (this.outboundPortCount == 0) {
            return 0;
        }
        LinkedList<Integer> linkedList = this.outboundPorts;
        synchronized (linkedList) {
            Integer port = this.outboundPorts.removeFirst();
            this.outboundPorts.addLast(port);
            return port;
        }
    }

    private void bindSocketToPort(Socket socket) throws IOException {
        if (this.useAnyOutboundPort()) {
            InetSocketAddress socketAddress = new InetSocketAddress(0);
            socket.bind(socketAddress);
        } else {
            int retryCount = this.outboundPortCount * 2;
            IOException ex = null;
            for (int i = 0; i < retryCount; ++i) {
                int port = this.acquireOutboundPort();
                if (port == 0) {
                    return;
                }
                InetSocketAddress socketAddress = new InetSocketAddress(port);
                try {
                    socket.bind(socketAddress);
                    return;
                }
                catch (IOException e) {
                    ex = e;
                    this.logger.finest("Could not bind port[ %s]: %s", port, e.getMessage());
                    continue;
                }
            }
            if (ex != null) {
                throw ex;
            }
        }
    }

    protected TcpClientConnection createSocketConnection(Address target) {
        CandidateClusterContext currentClusterContext = this.clusterDiscoveryService.current();
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            Socket socket = socketChannel.socket();
            this.bindSocketToPort(socket);
            ChannelInitializer channelInitializer = currentClusterContext.getChannelInitializer();
            Channel channel = this.networking.register(channelInitializer, socketChannel, true);
            channel.attributeMap().put(Address.class, target);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(target.getInetAddress(), target.getPort());
            channel.connect(inetSocketAddress, this.connectionTimeoutMillis);
            TcpClientConnection connection = new TcpClientConnection(this.client, this.connectionIdGen.incrementAndGet(), channel);
            if (this.isTpcAwareClient) {
                connection.attributeMap().put(CandidateClusterContext.class, currentClusterContext);
            }
            socketChannel.configureBlocking(true);
            SocketInterceptor socketInterceptor = currentClusterContext.getSocketInterceptor();
            if (socketInterceptor != null) {
                socketInterceptor.onConnect(socket);
            }
            channel.start();
            return connection;
        }
        catch (Exception e) {
            IOUtil.closeResource(socketChannel);
            this.logger.finest(e);
            throw ExceptionUtil.rethrow(e);
        }
    }

    private Channel createTpcChannel(Address address, TcpClientConnection connection) {
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            Socket socket = socketChannel.socket();
            this.bindSocketToPort(socket);
            ChannelInitializer channelInitializer = this.clusterDiscoveryService.current().getChannelInitializer();
            Channel channel = this.networking.register(channelInitializer, socketChannel, true);
            channel.addCloseListener(new TpcChannelCloseListener(this.client));
            ConcurrentMap attributeMap = channel.attributeMap();
            attributeMap.put(Address.class, address);
            attributeMap.put(TcpClientConnection.class, connection);
            attributeMap.put(TpcChannelClientConnectionAdapter.class, new TpcChannelClientConnectionAdapter(channel));
            InetSocketAddress socketAddress = new InetSocketAddress(address.getHost(), address.getPort());
            channel.connect(socketAddress, this.connectionTimeoutMillis);
            channel.start();
            return channel;
        }
        catch (Exception e) {
            IOUtil.closeResource(socketChannel);
            this.logger.finest(e);
            throw ExceptionUtil.rethrow(e);
        }
    }

    private Address translate(Member member) {
        return this.translate(member, AddressProvider::translate);
    }

    private Address translate(Address address) {
        return this.translate(address, AddressProvider::translate);
    }

    private <T> Address translate(T target, BiFunctionEx<AddressProvider, T, Address> translateFunction) {
        CandidateClusterContext currentContext = this.clusterDiscoveryService.current();
        AddressProvider addressProvider = currentContext.getAddressProvider();
        try {
            Address translatedAddress = translateFunction.apply(addressProvider, (AddressProvider)target);
            if (translatedAddress == null) {
                throw new HazelcastException("Address Provider " + String.valueOf(addressProvider.getClass()) + " could not translate " + String.valueOf(target));
            }
            return translatedAddress;
        }
        catch (Exception e) {
            this.logger.warning("Failed to translate " + String.valueOf(target) + " via address provider " + e.getMessage());
            throw ExceptionUtil.rethrow(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onConnectionClose(TcpClientConnection connection) {
        this.client.getInvocationService().onConnectionClose(connection);
        Address endpoint = connection.getRemoteAddress();
        UUID memberUuid = connection.getRemoteUuid();
        if (endpoint == null) {
            this.logger.finest("Destroying %s, but it has end-point set to null -> not removing it from a connection map", connection);
            return;
        }
        Object object = this.clientStateMutex;
        synchronized (object) {
            if (this.activeConnections.remove(memberUuid, connection)) {
                this.clientClusterService.getSubsetMembers().onConnectionRemoved(connection);
                this.logger.info("Removed connection to endpoint: " + String.valueOf(endpoint) + ":" + String.valueOf(memberUuid) + ", connection: " + String.valueOf(connection));
                if (this.activeConnections.isEmpty()) {
                    if (this.clientState == ClientState.INITIALIZED_ON_CLUSTER) {
                        this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_DISCONNECTED);
                    }
                    this.clientState = ClientState.DISCONNECTED_FROM_CLUSTER;
                    this.triggerClusterReconnection();
                }
                this.fireConnectionEvent(connection, false);
            } else {
                this.logger.finest("Destroying a connection, but there is no mapping %s:%s -> %s in the connection map.", endpoint, memberUuid, connection);
            }
        }
    }

    private void triggerClusterReconnection() {
        if (this.reconnectMode == ClientConnectionStrategyConfig.ReconnectMode.OFF) {
            this.logger.info("RECONNECT MODE is off. Shutting down the client.");
            this.shutdownWithExternalThread();
            return;
        }
        if (this.client.getLifecycleService().isRunning()) {
            try {
                this.submitConnectToClusterTask();
            }
            catch (RejectedExecutionException r) {
                this.shutdownWithExternalThread();
            }
        }
    }

    @Override
    public void addConnectionListener(ConnectionListener connectionListener) {
        this.connectionListeners.add(Objects.requireNonNull(connectionListener, "connectionListener cannot be null"));
    }

    @Override
    public void addClientConnectionProcessListener(ClientConnectionProcessListener listener) {
        this.connectionProcessListenerRunner.addListener(listener);
    }

    @Override
    public RoutingMode getRoutingMode() {
        return this.routingMode;
    }

    public Credentials getCurrentCredentials() {
        return this.currentCredentials;
    }

    public void reset() {
        for (TcpClientConnection activeConnection : this.activeConnections.values()) {
            activeConnection.close(null, new TargetDisconnectedException("Closing since client is switching cluster"));
        }
    }

    @Override
    public ClientConnection getRandomConnection() {
        if (this.routingMode != RoutingMode.SINGLE_MEMBER) {
            ClientConnection connection;
            Member member = this.loadBalancer.next();
            ClientConnection clientConnection = connection = member != null ? (ClientConnection)this.activeConnections.get(member.getUuid()) : null;
            if (connection != null) {
                return connection;
            }
        }
        Collection<ClientConnection> connections = this.getActiveConnections();
        return IterableUtil.getFirst(connections, null);
    }

    @Override
    public ClientConnection getConnectionForSql() {
        if (this.routingMode != RoutingMode.SINGLE_MEMBER) {
            Member member;
            for (int i = 0; i < 10 && (member = CoreQueryUtils.memberOfLargerSameVersionGroup(this.client.getClientClusterService().getEffectiveMemberList(), null)) != null; ++i) {
                ClientConnection connection = (ClientConnection)this.activeConnections.get(member.getUuid());
                if (connection == null) continue;
                return connection;
            }
        }
        ClientConnection firstConnection = null;
        for (Map.Entry connectionEntry : this.activeConnections.entrySet()) {
            if (firstConnection == null) {
                firstConnection = (ClientConnection)connectionEntry.getValue();
            }
            UUID memberId = (UUID)connectionEntry.getKey();
            Member member = this.client.getClientClusterService().getMember(memberId);
            if (member == null || member.isLiteMember()) continue;
            return (ClientConnection)connectionEntry.getValue();
        }
        return firstConnection;
    }

    private AuthenticationResponse authenticateOnCluster(TcpClientConnection connection) {
        Address memberAddress = connection.getInitAddress();
        ClientMessage request = this.encodeAuthenticationRequest(memberAddress);
        ClientInvocationFuture future = new ClientInvocation((HazelcastClientInstance)this.client, request, null, connection).invokeUrgent();
        try {
            return AuthenticationResponse.from((ClientMessage)future.get(this.authenticationTimeout, TimeUnit.MILLISECONDS));
        }
        catch (Exception e) {
            connection.close("Failed to authenticate connection", e);
            throw ExceptionUtil.rethrow(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TcpClientConnection onAuthenticated(TcpClientConnection connection, AuthenticationResponse response, boolean switchingToNextCluster) {
        Object object = this.clientStateMutex;
        synchronized (object) {
            boolean clusterIdChanged;
            this.checkAuthenticationResponse(connection, response);
            connection.setRemoteAddress(response.getAddress());
            connection.setRemoteUuid(response.getMemberUuid());
            connection.setClusterUuid(response.getClusterId());
            TcpClientConnection existingConnection = (TcpClientConnection)this.activeConnections.get(response.getMemberUuid());
            if (existingConnection != null) {
                connection.close("Duplicate connection to same member with uuid : " + String.valueOf(response.getMemberUuid()), null);
                return existingConnection;
            }
            UUID newClusterId = response.getClusterId();
            UUID currentClusterId = this.clientClusterService.getClusterId();
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Checking the cluster: %s, current cluster: %s", newClusterId, currentClusterId);
            }
            boolean bl = clusterIdChanged = currentClusterId != null && !newClusterId.equals(currentClusterId);
            if (clusterIdChanged) {
                this.checkClientStateOnClusterIdChange(connection, switchingToNextCluster);
                this.logger.warning("Switching from current cluster: " + String.valueOf(currentClusterId) + " to new cluster: " + String.valueOf(newClusterId));
                this.client.onConnectionToNewCluster();
            }
            this.checkClientState(connection, switchingToNextCluster);
            List<Integer> tpcPorts = response.getTpcPorts();
            if (this.isTpcAwareClient && tpcPorts != null && !tpcPorts.isEmpty()) {
                this.connectTpcPorts(connection, tpcPorts, response.getTpcToken());
            }
            boolean connectionsEmpty = this.activeConnections.isEmpty();
            this.activeConnections.put(response.getMemberUuid(), connection);
            this.updateClusterViewMetaDataIfAvailable(connection, response);
            if (connectionsEmpty) {
                this.clientClusterService.onClusterConnect(newClusterId);
                if (this.establishedInitialClusterConnection) {
                    this.clientState = ClientState.CONNECTED_TO_CLUSTER;
                    this.executor.execute(() -> {
                        this.initializeClientOnCluster(newClusterId);
                        this.client.collectAndSendStatsNow();
                    });
                } else {
                    this.establishedInitialClusterConnection = true;
                    if (!this.asyncStart) {
                        this.clientState = ClientState.INITIALIZED_ON_CLUSTER;
                        this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
                    } else {
                        this.executor.execute(() -> this.initializeClientOnCluster(this.clientClusterService.getClusterId()));
                    }
                }
            }
            this.logger.info("Authenticated with server " + String.valueOf(response.getAddress()) + ":" + String.valueOf(response.getMemberUuid()) + ", server version: " + response.getServerHazelcastVersion() + ", local address: " + String.valueOf(connection.getLocalSocketAddress()));
            this.fireConnectionEvent(connection, true);
        }
        if (!connection.isAlive()) {
            this.onConnectionClose(connection);
        }
        return connection;
    }

    private void updateClusterViewMetaDataIfAvailable(TcpClientConnection connection, AuthenticationResponse response) {
        if (response.isMemberListVersionExists()) {
            this.client.getClientClusterService().handleMembersViewEvent(response.getMemberListVersion(), response.getMemberInfos(), connection.getClusterUuid());
        }
        if (response.isPartitionListVersionExists()) {
            this.client.getClientPartitionService().handlePartitionsViewEvent(connection, response.getPartitions(), response.getPartitionListVersion());
        }
        if (response.isKeyValuePairsExists()) {
            Map<String, String> keyValuePairs = Collections.unmodifiableMap(response.getKeyValuePairs());
            this.client.getClientClusterService().updateOnAuth(connection.getClusterUuid(), connection.getRemoteUuid(), keyValuePairs);
            this.client.getCPGroupViewService().initializeKnownLeaders(connection.getRemoteUuid(), connection.getRemoteAddress(), keyValuePairs);
        } else if (this.routingMode == RoutingMode.MULTI_MEMBER) {
            throw new UnsupportedClusterVersionException(AuthenticationKeyValuePairConstants.ROUTING_MODE_NOT_SUPPORTED_MESSAGE);
        }
    }

    private void checkClientState(TcpClientConnection connection, boolean switchingToNextCluster) {
        if (this.clientState == ClientState.SWITCHING_CLUSTER && !switchingToNextCluster) {
            String reason = "There is a cluster switch in progress. This connection attempt initiated before the progress and not allowed to be authenticated.";
            connection.close(reason, null);
            throw new AuthenticationException(reason);
        }
        if (this.clientState != ClientState.SWITCHING_CLUSTER && switchingToNextCluster) {
            String reason = "The cluster switch is already completed. This connection attempt is not allowed to be authenticated.";
            connection.close(reason, null);
            throw new AuthenticationException(reason);
        }
    }

    private void checkAuthenticationResponse(TcpClientConnection connection, AuthenticationResponse response) {
        AuthenticationStatus authenticationStatus = AuthenticationStatus.getById(response.getStatus());
        if (this.failoverConfigProvided && !response.isFailoverSupported()) {
            this.logger.warning("Cluster does not support failover. This feature is available in Hazelcast Enterprise");
            authenticationStatus = AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER;
        }
        switch (authenticationStatus) {
            case AUTHENTICATED: {
                this.connectionProcessListenerRunner.onAuthenticationSuccess(connection.getInitAddress());
                break;
            }
            case CREDENTIALS_FAILED: {
                AuthenticationException authException = new AuthenticationException("Authentication failed. The configured cluster name on the client (see ClientConfig.setClusterName()) does not match the one configured in the cluster or the credentials set in the Client security config could not be authenticated");
                connection.close("Failed to authenticate connection", authException);
                this.connectionProcessListenerRunner.onCredentialsFailed(connection.getInitAddress());
                throw authException;
            }
            case NOT_ALLOWED_IN_CLUSTER: {
                this.connectionProcessListenerRunner.onClientNotAllowedInCluster(connection.getInitAddress());
                ClientNotAllowedInClusterException notAllowedException = new ClientNotAllowedInClusterException("Client is not allowed in the cluster");
                connection.close("Failed to authenticate connection", notAllowedException);
                throw notAllowedException;
            }
            default: {
                AuthenticationException exception = new AuthenticationException("Authentication status code not supported. status: " + String.valueOf((Object)authenticationStatus));
                connection.close("Failed to authenticate connection", exception);
                throw exception;
            }
        }
        ClientPartitionServiceImpl partitionService = (ClientPartitionServiceImpl)this.client.getClientPartitionService();
        if (!partitionService.checkAndSetPartitionCount(response.getPartitionCount())) {
            ClientNotAllowedInClusterException exception = new ClientNotAllowedInClusterException("Client can not work with this cluster because it has a different partition count. Expected partition count: " + partitionService.getPartitionCount() + ", Member partition count: " + response.getPartitionCount());
            connection.close("Failed to authenticate connection", exception);
            throw exception;
        }
    }

    private void checkClientStateOnClusterIdChange(TcpClientConnection connection, boolean switchingToNextCluster) {
        if (this.activeConnections.isEmpty()) {
            if (this.failoverConfigProvided && !switchingToNextCluster) {
                String reason = "Force to hard cluster switch";
                connection.close(reason, null);
                throw new ClientNotAllowedInClusterException(reason);
            }
        } else {
            String reason = "Connection does not belong to this cluster";
            connection.close(reason, null);
            throw new IllegalStateException(reason);
        }
    }

    private ClientMessage encodeAuthenticationRequest(Address toAddress) {
        byte[] secretBytes;
        InternalSerializationService ss = this.client.getSerializationService();
        String clientVersion = BuildInfoProvider.getBuildInfo().getVersion();
        CandidateClusterContext currentContext = this.clusterDiscoveryService.current();
        Credentials credentials = currentContext.getCredentialsFactory().newCredentials(toAddress);
        String clusterName = currentContext.getClusterName();
        this.currentCredentials = credentials;
        boolean cpDirectToLeader = this.client.getCPGroupViewService().isDirectToLeaderEnabled();
        byte routingModeByte = (byte)this.client.getConnectionManager().getRoutingMode().ordinal();
        if (credentials instanceof PasswordCredentials) {
            PasswordCredentials passwordCredentials = (PasswordCredentials)credentials;
            return this.encodePasswordCredentialsRequest(clusterName, passwordCredentials, ss.getVersion(), clientVersion, routingModeByte, cpDirectToLeader);
        }
        if (credentials instanceof TokenCredentials) {
            TokenCredentials tokenCredentials = (TokenCredentials)credentials;
            secretBytes = tokenCredentials.getToken();
        } else {
            secretBytes = ss.toDataWithSchema(credentials).toByteArray();
        }
        return this.encodeCustomCredentialsRequest(clusterName, secretBytes, ss.getVersion(), clientVersion, routingModeByte, cpDirectToLeader);
    }

    private ClientMessage encodePasswordCredentialsRequest(String clusterName, PasswordCredentials credentials, byte serializationVersion, String clientVersion, byte routingMode, boolean cpDirectToLeader) {
        return ClientAuthenticationCodec.encodeRequest(clusterName, credentials.getName(), credentials.getPassword(), this.clientUuid, this.connectionType, serializationVersion, clientVersion, this.client.getName(), this.labels, routingMode, cpDirectToLeader);
    }

    private ClientMessage encodeCustomCredentialsRequest(String clusterName, byte[] secretBytes, byte serializationVersion, String clientVersion, byte routingMode, boolean cpDirectToLeader) {
        return ClientAuthenticationCustomCodec.encodeRequest(clusterName, secretBytes, this.clientUuid, this.connectionType, serializationVersion, clientVersion, this.client.getName(), this.labels, routingMode, cpDirectToLeader);
    }

    protected void checkClientActive() {
        if (!this.client.getLifecycleService().isRunning()) {
            throw new HazelcastClientNotActiveException();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeClientOnCluster(@Nonnull UUID targetClusterId) {
        try {
            UUID clusterId;
            Object object = this.clientStateMutex;
            synchronized (object) {
                clusterId = this.clientClusterService.getClusterId();
                if (!targetClusterId.equals(clusterId)) {
                    this.logger.warning("Won't send client state to cluster: " + String.valueOf(targetClusterId) + " Because switched to a new cluster: " + String.valueOf(clusterId));
                    return;
                }
            }
            this.client.sendStateToCluster();
            object = this.clientStateMutex;
            synchronized (object) {
                clusterId = this.clientClusterService.getClusterId();
                if (targetClusterId.equals(clusterId)) {
                    if (this.logger.isFineEnabled()) {
                        this.logger.fine("Client state is sent to cluster: %s", targetClusterId);
                    }
                    this.clientState = ClientState.INITIALIZED_ON_CLUSTER;
                    this.fireLifecycleEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
                } else if (this.logger.isFineEnabled()) {
                    this.logger.warning("Cannot set client state to " + String.valueOf((Object)ClientState.INITIALIZED_ON_CLUSTER) + " because current cluster id: " + String.valueOf(clusterId) + " is different than expected cluster id: " + String.valueOf(targetClusterId));
                }
            }
        }
        catch (Exception e) {
            String clusterName = this.clusterDiscoveryService.current().getClusterName();
            this.logger.warning("Failure during sending state to the cluster.", e);
            Object object = this.clientStateMutex;
            synchronized (object) {
                UUID clusterId = this.clientClusterService.getClusterId();
                if (targetClusterId.equals(clusterId)) {
                    if (this.logger.isFineEnabled()) {
                        this.logger.warning("Retrying sending state to the cluster: " + String.valueOf(targetClusterId) + ", name: " + clusterName);
                    }
                    this.executor.execute(() -> this.initializeClientOnCluster(targetClusterId));
                }
            }
        }
    }

    private void connectTpcPorts(TcpClientConnection connection, List<Integer> tpcPorts, byte[] tpcToken) {
        List<Integer> targetTpcPorts = TcpClientConnectionManager.getTargetTpcPorts(tpcPorts, this.client.getClientConfig().getTpcConfig());
        TpcChannelConnector connector = new TpcChannelConnector(this.client, this.authenticationTimeout, this.clientUuid, connection, targetTpcPorts, tpcToken, this.executor, this::createTpcChannel, this.client.getLoggingService());
        connector.initiate();
    }

    static List<Integer> getTargetTpcPorts(List<Integer> tpcPorts, ClientTpcConfig tpcConfig) {
        List<Integer> targetTpcPorts;
        int tpcConnectionCount = tpcConfig.getConnectionCount();
        if (tpcConnectionCount == 0 || tpcConnectionCount >= tpcPorts.size()) {
            targetTpcPorts = tpcPorts;
        } else {
            LinkedList<Integer> tpcPortsCopy = new LinkedList<Integer>(tpcPorts);
            targetTpcPorts = new ArrayList<Integer>(tpcConnectionCount);
            ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
            for (int k = 0; k < tpcConnectionCount; ++k) {
                int index = threadLocalRandom.nextInt(tpcPortsCopy.size());
                targetTpcPorts.add((Integer)tpcPortsCopy.remove(index));
            }
        }
        return targetTpcPorts;
    }

    private void tryCloseConnectionsToMembersNotInSubset() {
        Set<UUID> subsetMembers;
        SubsetMembersView subsetMembersView = this.client.getClientClusterService().getSubsetMembers().getSubsetMembersView();
        Set<Object> set = subsetMembers = subsetMembersView == null ? Collections.emptySet() : subsetMembersView.members();
        if (!this.haveAllSubsetMembersConnected(subsetMembers)) {
            return;
        }
        for (Member member : this.client.getClientClusterService().getMemberList()) {
            TcpClientConnection candidateForClosure;
            if (subsetMembers.contains(member.getUuid()) || (candidateForClosure = (TcpClientConnection)this.activeConnections.get(member.getUuid())) == null || this.client.getInvocationService().isConnectionInUse(candidateForClosure)) continue;
            candidateForClosure.close("Connection is closed because it is not relevant for the current MULTI_MEMBER configuration", null);
        }
    }

    private boolean haveAllSubsetMembersConnected(Collection<UUID> subsetMembers) {
        if (subsetMembers.isEmpty()) {
            return false;
        }
        for (UUID member : subsetMembers) {
            if (this.activeConnections.containsKey(member)) continue;
            return false;
        }
        return true;
    }

    @Override
    public void memberAdded(MembershipEvent membershipEvent) {
    }

    @Override
    public void memberRemoved(MembershipEvent membershipEvent) {
        Member member = membershipEvent.getMember();
        ClientConnection connection = this.getActiveConnection(member.getUuid());
        if (connection != null) {
            connection.close(null, new TargetDisconnectedException("The client has closed the connection to this member, after receiving a member left event from the cluster. " + String.valueOf(connection)));
        }
    }

    private static enum ClientState {
        INITIAL,
        CONNECTED_TO_CLUSTER,
        INITIALIZED_ON_CLUSTER,
        DISCONNECTED_FROM_CLUSTER,
        SWITCHING_CLUSTER;

    }

    private class ClientChannelErrorHandler
    implements ChannelErrorHandler {
        private ClientChannelErrorHandler() {
        }

        @Override
        public void onError(Channel channel, Throwable cause) {
            if (channel == null) {
                TcpClientConnectionManager.this.logger.severe(cause);
            } else {
                if (cause instanceof OutOfMemoryError) {
                    TcpClientConnectionManager.this.logger.severe(cause);
                }
                ConcurrentMap attributeMap = channel.attributeMap();
                boolean isTpcChannel = attributeMap.containsKey(TpcChannelClientConnectionAdapter.class);
                ClientConnection connection = (ClientConnection)attributeMap.get(TcpClientConnection.class);
                if (isTpcChannel && connection.getTpcChannels() == null) {
                    IOUtil.closeResource(channel);
                }
                if (cause instanceof EOFException) {
                    connection.close("Connection closed by the other side", cause);
                } else {
                    connection.close("Exception in " + String.valueOf(connection) + ", thread=" + Thread.currentThread().getName(), cause);
                }
            }
        }
    }

    private class ConnectToAllClusterMembersTask
    implements Runnable {
        private final Set<UUID> connectingAddresses = ConcurrentHashMap.newKeySet();

        private ConnectToAllClusterMembersTask() {
        }

        @Override
        public void run() {
            if (!TcpClientConnectionManager.this.client.getLifecycleService().isRunning()) {
                return;
            }
            for (Member member : TcpClientConnectionManager.this.client.getClientClusterService().getEffectiveMemberList()) {
                if (TcpClientConnectionManager.this.clientState == ClientState.SWITCHING_CLUSTER || TcpClientConnectionManager.this.clientState == ClientState.DISCONNECTED_FROM_CLUSTER) {
                    return;
                }
                UUID uuid = member.getUuid();
                if (TcpClientConnectionManager.this.activeConnections.get(uuid) != null || !this.connectingAddresses.add(uuid)) continue;
                TcpClientConnectionManager.this.executor.submit(() -> {
                    try {
                        if (!TcpClientConnectionManager.this.client.getLifecycleService().isRunning()) {
                            return;
                        }
                        TcpClientConnectionManager.this.getOrConnectToMember(member, false);
                    }
                    catch (Exception e) {
                        if (TcpClientConnectionManager.this.logger.isFineEnabled()) {
                            TcpClientConnectionManager.this.logger.warning("Could not connect to member " + String.valueOf(uuid), e);
                        } else {
                            TcpClientConnectionManager.this.logger.warning("Could not connect to member " + String.valueOf(uuid) + ", reason " + String.valueOf(e));
                        }
                    }
                    finally {
                        this.connectingAddresses.remove(uuid);
                    }
                });
            }
            if (TcpClientConnectionManager.this.getRoutingMode() == RoutingMode.MULTI_MEMBER) {
                TcpClientConnectionManager.this.tryCloseConnectionsToMembersNotInSubset();
            }
        }
    }
}

