/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.client;

import io.netty.channel.Channel;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.network.client.ChunkFetchFailureException;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.ResponseMessage;
import org.apache.spark.network.protocol.RpcFailure;
import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.server.MessageHandler;
import org.apache.spark.network.util.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.annotations.VisibleForTesting;

public class TransportResponseHandler
extends MessageHandler<ResponseMessage> {
    private final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
    private final Channel channel;
    private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches;
    private final Map<Long, RpcResponseCallback> outstandingRpcs;

    public TransportResponseHandler(Channel channel) {
        this.channel = channel;
        this.outstandingFetches = new ConcurrentHashMap<StreamChunkId, ChunkReceivedCallback>();
        this.outstandingRpcs = new ConcurrentHashMap<Long, RpcResponseCallback>();
    }

    public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
        this.outstandingFetches.put(streamChunkId, callback);
    }

    public void removeFetchRequest(StreamChunkId streamChunkId) {
        this.outstandingFetches.remove(streamChunkId);
    }

    public void addRpcRequest(long requestId, RpcResponseCallback callback) {
        this.outstandingRpcs.put(requestId, callback);
    }

    public void removeRpcRequest(long requestId) {
        this.outstandingRpcs.remove(requestId);
    }

    private void failOutstandingRequests(Throwable cause) {
        for (Map.Entry<StreamChunkId, ChunkReceivedCallback> entry : this.outstandingFetches.entrySet()) {
            entry.getValue().onFailure(entry.getKey().chunkIndex, cause);
        }
        for (Map.Entry<Object, Object> entry : this.outstandingRpcs.entrySet()) {
            ((RpcResponseCallback)entry.getValue()).onFailure(cause);
        }
        this.outstandingFetches.clear();
        this.outstandingRpcs.clear();
    }

    @Override
    public void channelUnregistered() {
        if (this.numOutstandingRequests() > 0) {
            String remoteAddress = NettyUtils.getRemoteAddress(this.channel);
            this.logger.error("Still have {} requests outstanding when connection from {} is closed", (Object)this.numOutstandingRequests(), (Object)remoteAddress);
            this.failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
        }
    }

    @Override
    public void exceptionCaught(Throwable cause) {
        if (this.numOutstandingRequests() > 0) {
            String remoteAddress = NettyUtils.getRemoteAddress(this.channel);
            this.logger.error("Still have {} requests outstanding when connection from {} is closed", (Object)this.numOutstandingRequests(), (Object)remoteAddress);
            this.failOutstandingRequests(cause);
        }
    }

    @Override
    public void handle(ResponseMessage message) {
        String remoteAddress = NettyUtils.getRemoteAddress(this.channel);
        if (message instanceof ChunkFetchSuccess) {
            ChunkFetchSuccess resp = (ChunkFetchSuccess)message;
            ChunkReceivedCallback listener = this.outstandingFetches.get(resp.streamChunkId);
            if (listener == null) {
                this.logger.warn("Ignoring response for block {} from {} since it is not outstanding", (Object)resp.streamChunkId, (Object)remoteAddress);
                resp.buffer.release();
            } else {
                this.outstandingFetches.remove(resp.streamChunkId);
                listener.onSuccess(resp.streamChunkId.chunkIndex, resp.buffer);
                resp.buffer.release();
            }
        } else if (message instanceof ChunkFetchFailure) {
            ChunkFetchFailure resp = (ChunkFetchFailure)message;
            ChunkReceivedCallback listener = this.outstandingFetches.get(resp.streamChunkId);
            if (listener == null) {
                this.logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding", new Object[]{resp.streamChunkId, remoteAddress, resp.errorString});
            } else {
                this.outstandingFetches.remove(resp.streamChunkId);
                listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException("Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
            }
        } else if (message instanceof RpcResponse) {
            RpcResponse resp = (RpcResponse)message;
            RpcResponseCallback listener = this.outstandingRpcs.get(resp.requestId);
            if (listener == null) {
                this.logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding", new Object[]{resp.requestId, remoteAddress, resp.response.length});
            } else {
                this.outstandingRpcs.remove(resp.requestId);
                listener.onSuccess(resp.response);
            }
        } else if (message instanceof RpcFailure) {
            RpcFailure resp = (RpcFailure)message;
            RpcResponseCallback listener = this.outstandingRpcs.get(resp.requestId);
            if (listener == null) {
                this.logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding", new Object[]{resp.requestId, remoteAddress, resp.errorString});
            } else {
                this.outstandingRpcs.remove(resp.requestId);
                listener.onFailure(new RuntimeException(resp.errorString));
            }
        } else {
            throw new IllegalStateException("Unknown response type: " + message.type());
        }
    }

    @VisibleForTesting
    public int numOutstandingRequests() {
        return this.outstandingFetches.size() + this.outstandingRpcs.size();
    }
}

