/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestClientFactory {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientFactory.class);
    private final NettyClient nettyClient;
    private final int retryNumber;
    private final ConcurrentMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>> clients = new ConcurrentHashMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>>();

    PartitionRequestClientFactory(NettyClient nettyClient) {
        this(nettyClient, 0);
    }

    PartitionRequestClientFactory(NettyClient nettyClient, int retryNumber) {
        this.nettyClient = nettyClient;
        this.retryNumber = retryNumber;
    }

    NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
        while (true) {
            NettyPartitionRequestClient client;
            AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
            CompletableFuture clientFuture = this.clients.computeIfAbsent(connectionId, unused -> {
                isTheFirstOne.set(true);
                return new CompletableFuture();
            });
            if (isTheFirstOne.get()) {
                try {
                    clientFuture.complete(this.connectWithRetries(connectionId));
                }
                catch (InterruptedException e) {
                    clientFuture.complete(null);
                    throw e;
                }
                catch (Exception e) {
                    clientFuture.completeExceptionally(e);
                }
            }
            try {
                client = (NettyPartitionRequestClient)clientFuture.get();
                if (client == null) {
                    this.clients.remove(connectionId, clientFuture);
                    continue;
                }
            }
            catch (ExecutionException e) {
                throw new IOException(e);
            }
            if (client.incrementReferenceCounter()) {
                return client;
            }
            this.destroyPartitionRequestClient(connectionId, client);
        }
    }

    private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionId) throws InterruptedException {
        int tried = 0;
        while (true) {
            try {
                return this.connect(connectionId);
            }
            catch (RemoteTransportException e) {
                LOG.error("Failed {} times to connect to {}", new Object[]{++tried, connectionId.getAddress(), e});
                if (tried <= this.retryNumber) continue;
                throw new CompletionException(e);
            }
            break;
        }
    }

    private NettyPartitionRequestClient connect(ConnectionID connectionId) throws RemoteTransportException, InterruptedException {
        try {
            Channel channel = this.nettyClient.connect(connectionId.getAddress()).await().channel();
            NetworkClientHandler clientHandler = (NetworkClientHandler)channel.pipeline().get(NetworkClientHandler.class);
            return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this);
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RemoteTransportException("Connecting to remote task manager '" + connectionId.getAddress() + "' has failed. This might indicate that the remote task manager has been lost.", connectionId.getAddress(), e);
        }
    }

    void closeOpenChannelConnections(ConnectionID connectionId) {
        CompletableFuture entry = (CompletableFuture)this.clients.get(connectionId);
        if (entry != null && !entry.isDone()) {
            entry.thenAccept(client -> {
                if (client.disposeIfNotUsed()) {
                    this.clients.remove(connectionId, entry);
                }
            });
        }
    }

    int getNumberOfActiveClients() {
        return this.clients.size();
    }

    void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
        CompletableFuture future = (CompletableFuture)this.clients.get(connectionId);
        if (future != null && future.isDone()) {
            future.thenAccept(futureClient -> {
                if (client.equals(futureClient)) {
                    this.clients.remove(connectionId, future);
                }
            });
        }
    }
}

