/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Set;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.RpcFailure;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.server.MessageHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Throwables;
import org.spark-project.guava.collect.Sets;

public class TransportRequestHandler
extends MessageHandler<RequestMessage> {
    private final Logger logger = LoggerFactory.getLogger(TransportRequestHandler.class);
    private final Channel channel;
    private final TransportClient reverseClient;
    private final RpcHandler rpcHandler;
    private final StreamManager streamManager;
    private final Set<Long> streamIds;

    public TransportRequestHandler(Channel channel, TransportClient reverseClient, RpcHandler rpcHandler) {
        this.channel = channel;
        this.reverseClient = reverseClient;
        this.rpcHandler = rpcHandler;
        this.streamManager = rpcHandler.getStreamManager();
        this.streamIds = Sets.newHashSet();
    }

    @Override
    public void exceptionCaught(Throwable cause) {
    }

    @Override
    public void channelUnregistered() {
        for (long streamId : this.streamIds) {
            this.streamManager.connectionTerminated(streamId);
        }
        this.rpcHandler.connectionTerminated(this.reverseClient);
    }

    @Override
    public void handle(RequestMessage request) {
        if (request instanceof ChunkFetchRequest) {
            this.processFetchRequest((ChunkFetchRequest)request);
        } else if (request instanceof RpcRequest) {
            this.processRpcRequest((RpcRequest)request);
        } else {
            throw new IllegalArgumentException("Unknown request type: " + request);
        }
    }

    private void processFetchRequest(ChunkFetchRequest req) {
        ManagedBuffer buf;
        String client = NettyUtils.getRemoteAddress(this.channel);
        this.streamIds.add(req.streamChunkId.streamId);
        this.logger.trace("Received req from {} to fetch block {}", (Object)client, (Object)req.streamChunkId);
        try {
            buf = this.streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
        }
        catch (Exception e) {
            this.logger.error(String.format("Error opening block %s for request from %s", req.streamChunkId, client), (Throwable)e);
            this.respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
            return;
        }
        this.respond(new ChunkFetchSuccess(req.streamChunkId, buf));
    }

    private void processRpcRequest(final RpcRequest req) {
        try {
            this.rpcHandler.receive(this.reverseClient, req.message, new RpcResponseCallback(){

                @Override
                public void onSuccess(byte[] response) {
                    TransportRequestHandler.this.respond(new RpcResponse(req.requestId, response));
                }

                @Override
                public void onFailure(Throwable e) {
                    TransportRequestHandler.this.respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
                }
            });
        }
        catch (Exception e) {
            this.logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, (Throwable)e);
            this.respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
    }

    private void respond(final Encodable result) {
        final String remoteAddress = this.channel.remoteAddress().toString();
        this.channel.writeAndFlush((Object)result).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    TransportRequestHandler.this.logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
                } else {
                    TransportRequestHandler.this.logger.error(String.format("Error sending result %s to %s; closing connection", result, remoteAddress), future.cause());
                    TransportRequestHandler.this.channel.close();
                }
            }
        });
    }
}

