/*
 * Decompiled with CFR 0.152.
 */
package com.weibo.api.motan.transport.netty4;

import com.weibo.api.motan.codec.Codec;
import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.exception.MotanErrorMsgConstant;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.DefaultResponse;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.rpc.RpcContext;
import com.weibo.api.motan.transport.Channel;
import com.weibo.api.motan.transport.MessageHandler;
import com.weibo.api.motan.transport.netty4.CodecUtil;
import com.weibo.api.motan.transport.netty4.NettyMessage;
import com.weibo.api.motan.transport.netty4.NettyServer;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;
import com.weibo.api.motan.util.NetUtils;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

public class NettyChannelHandler
extends ChannelDuplexHandler {
    private ThreadPoolExecutor threadPoolExecutor;
    private MessageHandler messageHandler;
    private Channel channel;
    private Codec codec;

    public NettyChannelHandler(Channel channel, MessageHandler messageHandler) {
        this.channel = channel;
        this.messageHandler = messageHandler;
        this.codec = (Codec)ExtensionLoader.getExtensionLoader(Codec.class).getExtension(channel.getUrl().getParameter(URLParamType.codec.getName(), URLParamType.codec.getValue()));
    }

    public NettyChannelHandler(Channel channel, MessageHandler messageHandler, ThreadPoolExecutor threadPoolExecutor) {
        this.channel = channel;
        this.messageHandler = messageHandler;
        this.threadPoolExecutor = threadPoolExecutor;
        this.codec = (Codec)ExtensionLoader.getExtensionLoader(Codec.class).getExtension(channel.getUrl().getParameter(URLParamType.codec.getName(), URLParamType.codec.getValue()));
    }

    private String getRemoteIp(ChannelHandlerContext ctx) {
        String ip = "";
        SocketAddress remote = ctx.channel().remoteAddress();
        if (remote != null) {
            try {
                ip = ((InetSocketAddress)remote).getAddress().getHostAddress();
            }
            catch (Exception e) {
                LoggerUtil.warn((String)"get remoteIp error! default will use. msg:{}, remote:{}", (Object[])new Object[]{e.getMessage(), remote.toString()});
            }
        }
        return ip;
    }

    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        block7: {
            if (msg instanceof NettyMessage) {
                if (this.threadPoolExecutor != null) {
                    try {
                        this.threadPoolExecutor.execute(new Runnable(){

                            @Override
                            public void run() {
                                NettyChannelHandler.this.processMessage(ctx, (NettyMessage)msg);
                            }
                        });
                    }
                    catch (RejectedExecutionException rejectException) {
                        if (((NettyMessage)msg).isRequest()) {
                            this.rejectMessage(ctx, (NettyMessage)msg);
                            break block7;
                        }
                        LoggerUtil.warn((String)"process thread pool is full, run in io thread, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}", (Object[])new Object[]{this.threadPoolExecutor.getActiveCount(), this.threadPoolExecutor.getPoolSize(), this.threadPoolExecutor.getCorePoolSize(), this.threadPoolExecutor.getMaximumPoolSize(), this.threadPoolExecutor.getTaskCount(), ((NettyMessage)msg).getRequestId()});
                        this.processMessage(ctx, (NettyMessage)msg);
                    }
                } else {
                    this.processMessage(ctx, (NettyMessage)msg);
                }
            } else {
                LoggerUtil.error((String)("NettyChannelHandler messageReceived type not support: class=" + msg.getClass()));
                throw new MotanFrameworkException("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
            }
        }
    }

    private void rejectMessage(ChannelHandlerContext ctx, NettyMessage msg) {
        if (msg.isRequest()) {
            this.sendResponse(ctx, (Response)MotanFrameworkUtil.buildErrorResponse((Request)((Request)msg), (Exception)((Object)new MotanServiceException("process thread pool is full, reject by server: " + ctx.channel().localAddress(), MotanErrorMsgConstant.SERVICE_REJECT))));
            LoggerUtil.error((String)"process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}", (Object[])new Object[]{this.threadPoolExecutor.getActiveCount(), this.threadPoolExecutor.getPoolSize(), this.threadPoolExecutor.getCorePoolSize(), this.threadPoolExecutor.getMaximumPoolSize(), this.threadPoolExecutor.getTaskCount(), msg.getRequestId()});
            if (this.channel instanceof NettyServer) {
                ((NettyServer)this.channel).getRejectCounter().incrementAndGet();
            }
        }
    }

    private void processMessage(ChannelHandlerContext ctx, NettyMessage msg) {
        Object result;
        long startTime = System.currentTimeMillis();
        String remoteIp = this.getRemoteIp(ctx);
        try {
            result = this.codec.decode(this.channel, remoteIp, msg.getData());
        }
        catch (Exception e) {
            LoggerUtil.error((String)("NettyDecoder decode fail! requestid" + msg.getRequestId() + ", size:" + msg.getData().length + ", ip:" + remoteIp + ", e:" + e.getMessage()));
            DefaultResponse response = MotanFrameworkUtil.buildErrorResponse((long)msg.getRequestId(), (byte)msg.getVersion().getVersion(), (Exception)e);
            if (msg.isRequest()) {
                this.sendResponse(ctx, (Response)response);
            } else {
                this.processResponse(response);
            }
            return;
        }
        if (result instanceof Request) {
            MotanFrameworkUtil.logEvent((Request)((Request)result), (String)"TRACE_SRECEIVE", (long)msg.getStartTime());
            MotanFrameworkUtil.logEvent((Request)((Request)result), (String)"TRACE_SEXECUTOR_START", (long)startTime);
            MotanFrameworkUtil.logEvent((Request)((Request)result), (String)"TRACE_SDECODE");
            this.processRequest(ctx, (Request)result);
        } else if (result instanceof Response) {
            MotanFrameworkUtil.logEvent((Response)((Response)result), (String)"TRACE_CRECEIVE", (long)msg.getStartTime());
            MotanFrameworkUtil.logEvent((Response)((Response)result), (String)"TRACE_CDECODE");
            this.processResponse(result);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processRequest(ChannelHandlerContext ctx, Request request) {
        request.setAttachment(URLParamType.host.getName(), NetUtils.getHostName((SocketAddress)ctx.channel().remoteAddress()));
        long processStartTime = System.currentTimeMillis();
        try {
            DefaultResponse response;
            Object result;
            RpcContext.init((Request)request);
            try {
                result = this.messageHandler.handle(this.channel, (Object)request);
            }
            catch (Exception e) {
                LoggerUtil.error((String)("NettyChannelHandler processRequest fail! request:" + MotanFrameworkUtil.toString((Request)request)), (Throwable)e);
                result = MotanFrameworkUtil.buildErrorResponse((Request)request, (Exception)((Object)new MotanServiceException("process request fail. errmsg:" + e.getMessage())));
            }
            if (result instanceof Response) {
                MotanFrameworkUtil.logEvent((Response)((Response)result), (String)"TRACE_PROCESS");
            }
            if (result instanceof DefaultResponse) {
                response = (DefaultResponse)result;
                response.setRpcProtocolVersion(request.getRpcProtocolVersion());
            } else {
                response = new DefaultResponse(result);
            }
            response.setRequestId(request.getRequestId());
            response.setProcessTime(System.currentTimeMillis() - processStartTime);
            ChannelFuture channelFuture = this.sendResponse(ctx, (Response)response);
            if (channelFuture != null) {
                channelFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        MotanFrameworkUtil.logEvent((Response)response, (String)"TRACE_SSEND", (long)System.currentTimeMillis());
                        response.onFinish();
                    }
                });
            }
        }
        finally {
            RpcContext.destroy();
        }
    }

    private ChannelFuture sendResponse(ChannelHandlerContext ctx, Response response) {
        byte[] msg = CodecUtil.encodeObjectToBytes(this.channel, this.codec, response);
        response.setAttachment("Content-Length", String.valueOf(msg.length));
        if (ctx.channel().isActive()) {
            return ctx.channel().writeAndFlush((Object)msg);
        }
        return null;
    }

    private void processResponse(Object msg) {
        this.messageHandler.handle(this.channel, msg);
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LoggerUtil.info((String)"NettyChannelHandler channelActive: remote={} local={}", (Object[])new Object[]{ctx.channel().remoteAddress(), ctx.channel().localAddress()});
        ctx.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LoggerUtil.info((String)"NettyChannelHandler channelInactive: remote={} local={}", (Object[])new Object[]{ctx.channel().remoteAddress(), ctx.channel().localAddress()});
        ctx.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LoggerUtil.error((String)"NettyChannelHandler exceptionCaught: remote={} local={} event={}", (Object[])new Object[]{ctx.channel().remoteAddress(), ctx.channel().localAddress(), cause.getMessage(), cause});
        ctx.channel().close();
    }
}

