/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.rpc.protocol.tri.stream;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.rpc.DebugInfo;
import com.google.rpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.PathResolver;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.call.ReflectionAbstractServerCall;
import org.apache.dubbo.rpc.protocol.tri.call.StubAbstractServerCall;
import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.HeaderQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.TextDataQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.frame.Deframer;
import org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder;
import org.apache.dubbo.rpc.protocol.tri.stream.AbstractStream;
import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
import org.apache.dubbo.rpc.protocol.tri.transport.AbstractH2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;

public class TripleServerStream
extends AbstractStream
implements ServerStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(TripleServerStream.class);
    public final ServerTransportObserver transportObserver = new ServerTransportObserver();
    private final WriteQueue writeQueue;
    private final PathResolver pathResolver;
    private final List<HeaderFilter> filters;
    private final String acceptEncoding;
    private boolean headerSent;
    private boolean trailersSent;
    private volatile boolean reset;
    private ServerStream.Listener listener;
    private final InetSocketAddress remoteAddress;
    private final Channel channel;
    private Deframer deframer;

    public TripleServerStream(Channel channel, FrameworkModel frameworkModel, Executor executor, PathResolver pathResolver, String acceptEncoding, List<HeaderFilter> filters) {
        super(executor, frameworkModel);
        this.channel = channel;
        this.pathResolver = pathResolver;
        this.acceptEncoding = acceptEncoding;
        this.filters = filters;
        this.writeQueue = new WriteQueue(channel);
        this.remoteAddress = (InetSocketAddress)channel.remoteAddress();
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    @Override
    public void request(int n) {
        this.deframer.request(n);
    }

    public ChannelFuture reset(Http2Error cause) {
        return this.writeQueue.enqueue(CancelQueueCommand.createCommand(cause), true);
    }

    public ChannelFuture sendHeader(Http2Headers headers) {
        if (this.reset) {
            return this.writeQueue.failure(new IllegalStateException("Stream already reset, no more headers allowed"));
        }
        if (this.headerSent) {
            return this.writeQueue.failure(new IllegalStateException("Header already sent"));
        }
        if (this.trailersSent) {
            return this.writeQueue.failure(new IllegalStateException("Trailers already sent"));
        }
        this.headerSent = true;
        return this.writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, false)).addListener(f -> {
            if (!f.isSuccess()) {
                this.reset(Http2Error.INTERNAL_ERROR);
            }
        });
    }

    @Override
    public Future<?> cancelByLocal(TriRpcStatus status) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Cancel stream:%s by local: %s", this.channel, status));
        }
        return this.reset(Http2Error.CANCEL);
    }

    public ChannelFuture complete(TriRpcStatus status, Map<String, Object> attachments) {
        Http2Headers trailers = this.getTrailers(status, attachments);
        return this.sendTrailers(trailers);
    }

    private ChannelFuture sendTrailers(Http2Headers trailers) {
        if (this.reset) {
            return this.writeQueue.failure(new IllegalStateException("Stream already reset, no more trailers allowed"));
        }
        if (this.trailersSent) {
            return this.writeQueue.failure(new IllegalStateException("Trailers already sent"));
        }
        this.headerSent = true;
        this.trailersSent = true;
        return this.writeQueue.enqueue(HeaderQueueCommand.createHeaders(trailers, true)).addListener(f -> {
            if (!f.isSuccess()) {
                this.reset(Http2Error.INTERNAL_ERROR);
            }
        });
    }

    private Http2Headers getTrailers(TriRpcStatus rpcStatus, Map<String, Object> attachments) {
        DefaultHttp2Headers headers = new DefaultHttp2Headers();
        if (!this.headerSent) {
            headers.status((CharSequence)HttpResponseStatus.OK.codeAsText());
            headers.set((Object)HttpHeaderNames.CONTENT_TYPE, (Object)"application/grpc+proto");
        }
        StreamUtils.convertAttachment(headers, attachments);
        headers.set((Object)TripleHeaderEnum.STATUS_KEY.getHeader(), (Object)String.valueOf(rpcStatus.code.code));
        if (rpcStatus.isOk()) {
            return headers;
        }
        String grpcMessage = this.getGrpcMessage(rpcStatus);
        grpcMessage = TriRpcStatus.encodeMessage(TriRpcStatus.limitSizeTo1KB(grpcMessage));
        headers.set((Object)TripleHeaderEnum.MESSAGE_KEY.getHeader(), (Object)grpcMessage);
        Status.Builder builder = Status.newBuilder().setCode(rpcStatus.code.code).setMessage(grpcMessage);
        Throwable throwable = rpcStatus.cause;
        if (throwable == null) {
            Status status = builder.build();
            headers.set((Object)TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(), (Object)StreamUtils.encodeBase64ASCII(status.toByteArray()));
            return headers;
        }
        DebugInfo debugInfo = DebugInfo.newBuilder().addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 6)).build();
        builder.addDetails(Any.pack((Message)debugInfo));
        Status status = builder.build();
        headers.set((Object)TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(), (Object)StreamUtils.encodeBase64ASCII(status.toByteArray()));
        return headers;
    }

    private String getGrpcMessage(TriRpcStatus status) {
        if (StringUtils.isNotEmpty(status.description)) {
            return status.description;
        }
        return Optional.ofNullable(status.cause).map(Throwable::getMessage).orElse("unknown");
    }

    public ChannelFuture sendMessage(byte[] message, int compressFlag) {
        if (this.reset) {
            return this.writeQueue.failure(new IllegalStateException("Stream already reset, no more body allowed"));
        }
        if (!this.headerSent) {
            return this.writeQueue.failure(new IllegalStateException("Headers did not sent before send body"));
        }
        if (this.trailersSent) {
            return this.writeQueue.failure(new IllegalStateException("Trailers already sent, no more body allowed"));
        }
        return this.writeQueue.enqueue(DataQueueCommand.createGrpcCommand(message, false, compressFlag));
    }

    private void responsePlainTextError(int code, TriRpcStatus status) {
        Http2Headers headers = (Http2Headers)((Http2Headers)((Http2Headers)new DefaultHttp2Headers(true).status((CharSequence)String.valueOf(code)).setInt((Object)TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)).set((Object)TripleHeaderEnum.MESSAGE_KEY.getHeader(), (Object)status.description)).set((Object)TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), (Object)"text/plain; encoding=utf-8");
        this.writeQueue.enqueue(HeaderQueueCommand.createHeaders(headers, false));
        this.writeQueue.enqueue(TextDataQueueCommand.createCommand(status.description, true));
    }

    private void responseErr(TriRpcStatus status) {
        Http2Headers trailers = (Http2Headers)((Http2Headers)((Http2Headers)new DefaultHttp2Headers().status((CharSequence)HttpResponseStatus.OK.codeAsText()).set((Object)HttpHeaderNames.CONTENT_TYPE, (Object)"application/grpc+proto")).setInt((Object)TripleHeaderEnum.STATUS_KEY.getHeader(), status.code.code)).set((Object)TripleHeaderEnum.MESSAGE_KEY.getHeader(), (Object)status.toEncodedMessage());
        this.sendTrailers(trailers);
    }

    private Invoker<?> getInvoker(Http2Headers headers, String serviceName) {
        String version = headers.contains((Object)TripleHeaderEnum.SERVICE_VERSION.getHeader()) ? ((CharSequence)headers.get((Object)TripleHeaderEnum.SERVICE_VERSION.getHeader())).toString() : null;
        String group = headers.contains((Object)TripleHeaderEnum.SERVICE_GROUP.getHeader()) ? ((CharSequence)headers.get((Object)TripleHeaderEnum.SERVICE_GROUP.getHeader())).toString() : null;
        String key = URL.buildKey(serviceName, group, version);
        Invoker<?> invoker = this.pathResolver.resolve(key);
        if (invoker == null) {
            invoker = this.pathResolver.resolve(URL.buildKey(serviceName, group, "1.0.0"));
        }
        if (invoker == null) {
            invoker = this.pathResolver.resolve(serviceName);
        }
        return invoker;
    }

    public class ServerTransportObserver
    extends AbstractH2TransportListener
    implements H2TransportListener {
        private boolean supportContentType(String contentType) {
            if (contentType == null) {
                return false;
            }
            return contentType.startsWith("application/grpc");
        }

        @Override
        public void onHeader(Http2Headers headers, boolean endStream) {
            TripleServerStream.this.executor.execute(() -> this.processHeader(headers, endStream));
        }

        private void processHeader(Http2Headers headers, boolean endStream) {
            String compressorStr;
            if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
                TripleServerStream.this.responsePlainTextError(HttpResponseStatus.METHOD_NOT_ALLOWED.code(), TriRpcStatus.INTERNAL.withDescription(String.format("Method '%s' is not supported", headers.method())));
                return;
            }
            if (headers.path() == null) {
                TripleServerStream.this.responsePlainTextError(HttpResponseStatus.NOT_FOUND.code(), TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
                return;
            }
            String path = headers.path().toString();
            if (path.charAt(0) != '/') {
                TripleServerStream.this.responsePlainTextError(HttpResponseStatus.NOT_FOUND.code(), TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code).withDescription(String.format("Expected path to start with /: %s", path)));
                return;
            }
            CharSequence contentType = HttpUtil.getMimeType((CharSequence)((CharSequence)headers.get((Object)HttpHeaderNames.CONTENT_TYPE)));
            if (contentType == null) {
                TripleServerStream.this.responsePlainTextError(HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(), TriRpcStatus.fromCode(TriRpcStatus.Code.INTERNAL.code).withDescription("Content-Type is missing from the request"));
                return;
            }
            String contentString = contentType.toString();
            if (!this.supportContentType(contentString)) {
                TripleServerStream.this.responsePlainTextError(HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(), TriRpcStatus.fromCode(TriRpcStatus.Code.INTERNAL.code).withDescription(String.format("Content-Type '%s' is not supported", contentString)));
                return;
            }
            String[] parts = path.split("/");
            if (parts.length != 3) {
                TripleServerStream.this.responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Bad path format:" + path));
                return;
            }
            String serviceName = parts[1];
            String originalMethodName = parts[2];
            Invoker invoker = TripleServerStream.this.getInvoker(headers, serviceName);
            if (invoker == null) {
                TripleServerStream.this.responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Service not found:" + serviceName));
                return;
            }
            if (endStream) {
                return;
            }
            DeCompressor deCompressor = DeCompressor.NONE;
            CharSequence messageEncoding = (CharSequence)headers.get((Object)TripleHeaderEnum.GRPC_ENCODING.getHeader());
            if (null != messageEncoding && !"identity".equals(compressorStr = messageEncoding.toString())) {
                DeCompressor compressor = DeCompressor.getCompressor(TripleServerStream.this.frameworkModel, compressorStr);
                if (null == compressor) {
                    TripleServerStream.this.responseErr(TriRpcStatus.fromCode(TriRpcStatus.Code.UNIMPLEMENTED.code).withDescription(String.format("Grpc-encoding '%s' is not supported", compressorStr)));
                    return;
                }
                deCompressor = compressor;
            }
            try {
                ServerDecoderListener listener = new ServerDecoderListener();
                TripleServerStream.this.deframer = new TriDecoder(deCompressor, listener);
            }
            catch (Throwable t) {
                TripleServerStream.this.responseErr(TriRpcStatus.INTERNAL.withCause(t));
                return;
            }
            Map<String, Object> requestMetadata = this.headersToMap(headers);
            boolean hasStub = TripleServerStream.this.pathResolver.hasNativeStub(path);
            if (hasStub) {
                TripleServerStream.this.listener = new StubAbstractServerCall(invoker, TripleServerStream.this, TripleServerStream.this.frameworkModel, TripleServerStream.this.acceptEncoding, serviceName, originalMethodName, TripleServerStream.this.executor);
            } else {
                TripleServerStream.this.listener = new ReflectionAbstractServerCall(invoker, (ServerStream)TripleServerStream.this, TripleServerStream.this.frameworkModel, TripleServerStream.this.acceptEncoding, serviceName, originalMethodName, TripleServerStream.this.filters, TripleServerStream.this.executor);
            }
            TripleServerStream.this.listener.onHeader(requestMetadata);
            if (TripleServerStream.this.listener == null) {
                TripleServerStream.this.deframer.close();
            }
        }

        @Override
        public void onData(ByteBuf data, boolean endStream) {
            TripleServerStream.this.executor.execute(() -> this.doOnData(data, endStream));
        }

        private void doOnData(ByteBuf data, boolean endStream) {
            if (TripleServerStream.this.deframer == null) {
                return;
            }
            TripleServerStream.this.deframer.deframe(data);
            if (endStream) {
                TripleServerStream.this.deframer.close();
            }
        }

        @Override
        public void cancelByRemote(long errorCode) {
            TripleServerStream.this.reset = true;
            if (!TripleServerStream.this.trailersSent) {
                TripleServerStream.this.reset(Http2Error.valueOf((long)errorCode));
            }
            if (TripleServerStream.this.listener == null) {
                return;
            }
            TripleServerStream.this.executor.execute(() -> TripleServerStream.this.listener.onCancelByRemote(TriRpcStatus.CANCELLED.withDescription("Canceled by client ,errorCode=" + errorCode)));
        }

        private class ServerDecoderListener
        implements TriDecoder.Listener {
            private ServerDecoderListener() {
            }

            @Override
            public void onRawMessage(byte[] data) {
                TripleServerStream.this.listener.onMessage(data);
            }

            @Override
            public void close() {
                if (TripleServerStream.this.listener != null) {
                    TripleServerStream.this.listener.onComplete();
                }
            }
        }
    }
}

