/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.IOException;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Compression;
import org.elasticsearch.transport.OutboundMessage;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.StatsTracker;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportLogger;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;

final class OutboundHandler {
    private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
    private final String nodeName;
    private final Version version;
    private final String[] features;
    private final StatsTracker statsTracker;
    private final ThreadPool threadPool;
    private final BigArrays bigArrays;
    private final Compression.Scheme configuredCompressionScheme;
    private volatile long slowLogThresholdMs = Long.MAX_VALUE;
    private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;

    OutboundHandler(String nodeName, Version version, String[] features, StatsTracker statsTracker, ThreadPool threadPool, BigArrays bigArrays, Compression.Scheme compressionScheme) {
        this.nodeName = nodeName;
        this.version = version;
        this.features = features;
        this.statsTracker = statsTracker;
        this.threadPool = threadPool;
        this.bigArrays = bigArrays;
        this.configuredCompressionScheme = compressionScheme;
    }

    void setSlowLogThreshold(TimeValue slowLogThreshold) {
        this.slowLogThresholdMs = slowLogThreshold.getMillis();
    }

    void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
        this.internalSend(channel, bytes, null, listener);
    }

    void sendRequest(DiscoveryNode node, TcpChannel channel, long requestId, String action, TransportRequest request, TransportRequestOptions options, Version channelVersion, boolean compressRequest, boolean isHandshake) throws IOException, TransportException {
        Version version = Version.min(this.version, channelVersion);
        Compression.Scheme compressionScheme = compressRequest ? this.configuredCompressionScheme : null;
        OutboundMessage.Request message = new OutboundMessage.Request(this.threadPool.getThreadContext(), this.features, request, version, action, requestId, isHandshake, compressionScheme);
        if (!request.tryIncRef()) {
            assert (false) : "request [" + request + "] has been released already";
            throw new AlreadyClosedException("request [" + request + "] has been released already");
        }
        ActionListener<Void> listener = ActionListener.wrap(() -> {
            try {
                this.messageListener.onRequestSent(node, requestId, action, request, options);
            }
            finally {
                request.decRef();
            }
        });
        this.sendMessage(channel, message, listener);
    }

    void sendResponse(Version nodeVersion, Set<String> features, TcpChannel channel, long requestId, String action, TransportResponse response, boolean compressResponse, boolean isHandshake) throws IOException {
        Version version = Version.min(this.version, nodeVersion);
        Compression.Scheme compressionScheme = compressResponse ? this.configuredCompressionScheme : null;
        OutboundMessage.Response message = new OutboundMessage.Response(this.threadPool.getThreadContext(), features, response, version, requestId, isHandshake, compressionScheme);
        ActionListener<Void> listener = ActionListener.wrap(() -> this.messageListener.onResponseSent(requestId, action, response));
        this.sendMessage(channel, message, listener);
    }

    void sendErrorResponse(Version nodeVersion, Set<String> features, TcpChannel channel, long requestId, String action, Exception error) throws IOException {
        Version version = Version.min(this.version, nodeVersion);
        TransportAddress address = new TransportAddress(channel.getLocalAddress());
        RemoteTransportException tx = new RemoteTransportException(this.nodeName, address, action, error);
        OutboundMessage.Response message = new OutboundMessage.Response(this.threadPool.getThreadContext(), features, tx, version, requestId, false, null);
        ActionListener<Void> listener = ActionListener.wrap(() -> this.messageListener.onResponseSent(requestId, action, error));
        this.sendMessage(channel, message, listener);
    }

    private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
        BytesReference message;
        ReleasableBytesStreamOutput bytesStreamOutput = new ReleasableBytesStreamOutput(this.bigArrays);
        ActionListener<Void> wrappedListener = ActionListener.runBefore(listener, bytesStreamOutput::close);
        try {
            message = networkMessage.serialize(bytesStreamOutput);
        }
        catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("failed to serialize outbound message [{}]", (Object)networkMessage), (Throwable)e);
            wrappedListener.onFailure(e);
            throw e;
        }
        this.internalSend(channel, message, networkMessage, wrappedListener);
    }

    private void internalSend(final TcpChannel channel, BytesReference reference, final @Nullable OutboundMessage message, final ActionListener<Void> listener) {
        final long startTime = this.threadPool.relativeTimeInMillis();
        channel.getChannelStats().markAccessed(startTime);
        final long messageSize = reference.length();
        TransportLogger.logOutboundMessage(channel, reference);
        try (ThreadContext.StoredContext existing = this.threadPool.getThreadContext().stashContext();){
            channel.sendMessage(reference, new ActionListener<Void>(){

                @Override
                public void onResponse(Void v) {
                    OutboundHandler.this.statsTracker.markBytesWritten(messageSize);
                    listener.onResponse(v);
                    this.maybeLogSlowMessage(true);
                }

                @Override
                public void onFailure(Exception e) {
                    if (NetworkExceptionHelper.isCloseConnectionException(e)) {
                        logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", (Object)channel), (Throwable)e);
                    } else {
                        logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", (Object)channel), (Throwable)e);
                    }
                    listener.onFailure(e);
                    this.maybeLogSlowMessage(false);
                }

                private void maybeLogSlowMessage(boolean success) {
                    long took;
                    long logThreshold = OutboundHandler.this.slowLogThresholdMs;
                    if (logThreshold > 0L && (took = OutboundHandler.this.threadPool.relativeTimeInMillis() - startTime) > logThreshold) {
                        logger.warn("sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn threshold of [{}ms] with success [{}]", (Object)message, (Object)messageSize, (Object)channel, (Object)took, (Object)logThreshold, (Object)success);
                    }
                }
            });
        }
        catch (RuntimeException ex) {
            listener.onFailure(ex);
            CloseableChannel.closeChannel(channel);
            throw ex;
        }
    }

    void setMessageListener(TransportMessageListener listener) {
        if (this.messageListener != TransportMessageListener.NOOP_LISTENER) {
            throw new IllegalStateException("Cannot set message listener twice");
        }
        this.messageListener = listener;
    }
}

