/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.internal.LettuceAssert;
import com.lambdaworks.redis.internal.LettuceClassUtils;
import com.lambdaworks.redis.internal.LettuceFactories;
import com.lambdaworks.redis.internal.LettuceSets;
import com.lambdaworks.redis.protocol.ChannelLogDescriptor;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.protocol.LatencyMeteredCommand;
import com.lambdaworks.redis.protocol.ProtocolKeyword;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import com.lambdaworks.redis.protocol.WithLatency;
import com.lambdaworks.redis.resource.ClientResources;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.local.LocalAddress;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

@ChannelHandler.Sharable
public class CommandHandler<K, V>
extends ChannelDuplexHandler
implements RedisChannelWriter<K, V> {
    private static final InternalLogger logger;
    private static final AtomicLong CHANNEL_COUNTER;
    private static final Class<?> VOID_PROMISE_CLASS;
    private static final AtomicIntegerFieldUpdater<CommandHandler> QUEUE_SIZE;
    private static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES;
    protected final long commandHandlerId = CHANNEL_COUNTER.incrementAndGet();
    protected final ClientOptions clientOptions;
    protected final ClientResources clientResources;
    protected final Queue<RedisCommand<K, V, ?>> disconnectedBuffer;
    protected final Queue<RedisCommand<K, V, ?>> commandBuffer;
    protected final AtomicLong writers = new AtomicLong();
    protected final Object stateLock = new Object();
    private final boolean latencyMetricsEnabled;
    private final boolean boundedQueue;
    protected final Deque<RedisCommand<K, V, ?>> stack = new ArrayDeque();
    protected final ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(65536);
    protected final RedisStateMachine<K, V> rsm = new RedisStateMachine();
    protected volatile Channel channel;
    private volatile ConnectionWatchdog connectionWatchdog;
    private final boolean traceEnabled;
    private final boolean debugEnabled;
    private final Reliability reliability;
    private volatile LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED;
    private volatile int queueSize = 0;
    private Thread exclusiveLockOwner;
    private RedisChannelHandler<K, V> redisChannelHandler;
    private volatile Throwable connectionError;
    private String logPrefix;
    private boolean autoFlushCommands = true;

    public CommandHandler(ClientOptions clientOptions, ClientResources clientResources) {
        LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
        LettuceAssert.notNull(clientResources, "ClientResources must not be null");
        this.clientOptions = clientOptions;
        this.clientResources = clientResources;
        this.traceEnabled = logger.isTraceEnabled();
        this.debugEnabled = logger.isDebugEnabled();
        this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
        this.latencyMetricsEnabled = clientResources.commandLatencyCollector().isEnabled();
        this.disconnectedBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
        this.commandBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
        this.boundedQueue = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
    }

    @Override
    public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {
        this.redisChannelHandler = redisChannelHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        Object object = this.stateLock;
        synchronized (object) {
            this.autoFlushCommands = autoFlush;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setState(LifecycleState lifecycleState) {
        if (this.lifecycleState != LifecycleState.CLOSED) {
            Object object = this.stateLock;
            synchronized (object) {
                this.lifecycleState = lifecycleState;
            }
        }
    }

    protected LifecycleState getState() {
        return this.lifecycleState;
    }

    public boolean isClosed() {
        return this.lifecycleState == LifecycleState.CLOSED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.isClosed()) {
            logger.debug("{} Dropping register for a closed channel", (Object)this.logPrefix());
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.channel = ctx.channel();
        }
        if (this.debugEnabled) {
            this.logPrefix = null;
            logger.debug("{} channelRegistered()", (Object)this.logPrefix());
        }
        this.setState(LifecycleState.REGISTERED);
        this.buffer.clear();
        ctx.fireChannelRegistered();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelUnregistered()", (Object)this.logPrefix());
        }
        if (this.channel != null && ctx.channel() != this.channel) {
            logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners", (Object)this.logPrefix());
            ctx.fireChannelUnregistered();
            return;
        }
        if (this.isClosed()) {
            this.cancelCommands("Connection closed");
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.channel = null;
        }
        ctx.fireChannelUnregistered();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        if (!input.isReadable() || input.refCnt() == 0) {
            logger.warn("{} Input not readable {}, {}", new Object[]{this.logPrefix(), input.isReadable(), input.refCnt()});
            return;
        }
        if (this.debugEnabled) {
            logger.debug("{} Received: {} bytes, {} commands in the stack", new Object[]{this.logPrefix(), input.readableBytes(), this.stack.size()});
        }
        try {
            if (this.buffer.refCnt() < 1) {
                logger.warn("{} Ignoring received data for closed or abandoned connection", (Object)this.logPrefix());
                return;
            }
            if (this.debugEnabled && ctx.channel() != this.channel) {
                logger.debug("{} Ignoring data for a non-registered channel {}", (Object)this.logPrefix(), (Object)ctx.channel());
                return;
            }
            if (this.traceEnabled) {
                logger.trace("{} Buffer: {}", (Object)this.logPrefix(), (Object)input.toString(Charset.defaultCharset()).trim());
            }
            this.buffer.writeBytes(input);
            this.decode(ctx, this.buffer);
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
        while (this.canDecode(buffer)) {
            RedisCommand<K, V, ?> command = this.stack.peek();
            if (this.debugEnabled) {
                logger.debug("{} Stack contains: {} commands", (Object)this.logPrefix(), (Object)this.stack.size());
            }
            try {
                if (!this.decode(buffer, command)) {
                    return;
                }
            }
            catch (Exception e) {
                ctx.close();
                throw e;
            }
            this.stack.poll();
            try {
                command.complete();
            }
            catch (Exception e) {
                logger.warn("{} Unexpected exception during command completion: {}", new Object[]{this.logPrefix, e.toString(), e});
            }
            if (buffer.refCnt() != 0) {
                buffer.discardReadBytes();
            }
            this.afterComplete(ctx, command);
        }
    }

    protected void afterComplete(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command) {
    }

    protected boolean canDecode(ByteBuf buffer) {
        return !this.stack.isEmpty() && buffer.isReadable();
    }

    private boolean decode(ByteBuf buffer, RedisCommand<K, V, ?> command) {
        if (this.latencyMetricsEnabled && command instanceof WithLatency) {
            WithLatency withLatency = (WithLatency)((Object)command);
            if (withLatency.getFirstResponse() == -1L) {
                withLatency.firstResponse(CommandHandler.nanoTime());
            }
            if (!this.rsm.decode(buffer, command, command.getOutput())) {
                return false;
            }
            this.recordLatency(withLatency, command.getType());
            return true;
        }
        return this.rsm.decode(buffer, command, command.getOutput());
    }

    private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
        if (withLatency != null && this.latencyMetricsEnabled && this.channel != null && this.remote() != null) {
            long firstResponseLatency = withLatency.getSent() - withLatency.getFirstResponse();
            long completionLatency = CommandHandler.nanoTime() - withLatency.getSent();
            this.clientResources.commandLatencyCollector().recordCommandLatency(this.local(), this.remote(), commandType, firstResponseLatency, completionLatency);
        }
    }

    private SocketAddress remote() {
        return this.channel.remoteAddress();
    }

    private SocketAddress local() {
        if (this.channel.localAddress() != null) {
            return this.channel.localAddress();
        }
        return LocalAddress.ANY;
    }

    @Override
    public <T, C extends RedisCommand<K, V, T>> C write(C command) {
        LettuceAssert.notNull(command, "Command must not be null");
        try {
            this.incrementWriters();
            this.validateWrite();
            RedisCommand<K, V, T> commandToSend = this.potentiallyWrapLatencyCommand(command);
            if (this.autoFlushCommands) {
                if (this.isConnected()) {
                    this.writeToChannel(commandToSend);
                } else {
                    this.writeToDisconnectedBuffer(commandToSend);
                }
            } else {
                this.bufferCommand(commandToSend);
            }
        }
        finally {
            this.decrementWriters();
            if (this.debugEnabled) {
                logger.debug("{} write() done", (Object)this.logPrefix());
            }
        }
        return command;
    }

    private void validateWrite() {
        if (this.lifecycleState == LifecycleState.CLOSED) {
            throw new RedisException("Connection is closed");
        }
        if (this.usesBoundedQueues()) {
            if (QUEUE_SIZE.get(this) + 1 > this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Request queue size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
            if (this.disconnectedBuffer.size() + 1 > this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Request queue size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
            if (this.commandBuffer.size() + 1 > this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Command buffer size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
        }
        if (!this.isConnected() && this.isRejectCommand()) {
            throw new RedisException("Currently not connected. Commands are rejected.");
        }
    }

    protected <C extends RedisCommand<K, V, T>, T> void writeToDisconnectedBuffer(C command) {
        if (this.connectionError != null) {
            if (this.debugEnabled) {
                logger.debug("{} disconnectedBufferCommand() Completing command {} due to connection error", (Object)this.logPrefix(), command);
            }
            command.completeExceptionally(this.connectionError);
            return;
        }
        if (this.debugEnabled) {
            logger.debug("{} disconnectedBufferCommand() buffering (disconnected) command {}", (Object)this.logPrefix(), command);
        }
        this.disconnectedBuffer.add(command);
    }

    private <C extends RedisCommand<K, V, T>, T> void writeToChannel(C command) {
        QUEUE_SIZE.incrementAndGet(this);
        if (this.reliability == Reliability.AT_MOST_ONCE) {
            this.writeAndFlush(command).addListener((GenericFutureListener)new AtMostOnceWriteListener(command));
        }
        if (this.reliability == Reliability.AT_LEAST_ONCE) {
            this.writeAndFlush(command).addListener((GenericFutureListener)new RetryListener(command));
        }
    }

    private void writeToChannel(Collection<? extends RedisCommand<K, V, ?>> commands) {
        QUEUE_SIZE.addAndGet(this, commands.size());
        if (this.reliability == Reliability.AT_MOST_ONCE) {
            this.writeAndFlush(commands).addListener((GenericFutureListener)new AtMostOnceWriteListener(commands));
        }
        if (this.reliability == Reliability.AT_LEAST_ONCE) {
            this.writeAndFlush(commands).addListener((GenericFutureListener)new RetryListener(commands));
        }
    }

    private ChannelFuture writeAndFlush(RedisCommand<?, ?, ?> command) {
        if (this.debugEnabled) {
            logger.debug("{} write() writeAndFlush command {}", (Object)this.logPrefix(), command);
        }
        return this.channel.writeAndFlush(command);
    }

    private ChannelFuture writeAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
        if (this.debugEnabled) {
            logger.debug("{} write() writeAndFlush commands {}", (Object)this.logPrefix(), commands);
        }
        return this.channel.writeAndFlush(commands);
    }

    private void bufferCommand(RedisCommand<K, V, ?> command) {
        if (this.debugEnabled) {
            logger.debug("{} bufferCommand() buffering command {}", (Object)this.logPrefix(), command);
        }
        this.commandBuffer.add(command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void incrementWriters() {
        if (this.exclusiveLockOwner == Thread.currentThread()) {
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            while (this.writers.get() < 0L) {
            }
            this.writers.incrementAndGet();
            return;
        }
    }

    protected void decrementWriters() {
        if (this.exclusiveLockOwner == Thread.currentThread()) {
            return;
        }
        this.writers.decrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void lockWritersExclusive() {
        if (this.exclusiveLockOwner == Thread.currentThread()) {
            this.writers.decrementAndGet();
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            while (!this.writers.compareAndSet(0L, -1L)) {
            }
            this.exclusiveLockOwner = Thread.currentThread();
            return;
        }
    }

    protected void unlockWritersExclusive() {
        if (this.exclusiveLockOwner == Thread.currentThread() && this.writers.incrementAndGet() == 0L) {
            this.exclusiveLockOwner = null;
        }
    }

    private boolean isRejectCommand() {
        if (this.clientOptions == null) {
            return false;
        }
        switch (this.clientOptions.getDisconnectedBehavior()) {
            case REJECT_COMMANDS: {
                return true;
            }
            case ACCEPT_COMMANDS: {
                return false;
            }
        }
        return !this.clientOptions.isAutoReconnect();
    }

    boolean isConnected() {
        LifecycleState state = this.lifecycleState;
        return state.ordinal() >= LifecycleState.CONNECTED.ordinal() && state.ordinal() < LifecycleState.DISCONNECTED.ordinal();
    }

    @Override
    public void flushCommands() {
        this.flushCommands(this.commandBuffer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushCommands(Queue<RedisCommand<K, V, ?>> commands) {
        if (this.debugEnabled) {
            logger.debug("{} flushCommands()", (Object)this.logPrefix());
        }
        if (this.channel != null && this.isConnected()) {
            ArrayList queuedCommands;
            Object object = this.stateLock;
            synchronized (object) {
                try {
                    this.lockWritersExclusive();
                    if (commands.isEmpty()) {
                        return;
                    }
                    queuedCommands = new ArrayList(commands.size());
                    CommandHandler.drainCommands(commands, queuedCommands);
                }
                finally {
                    this.unlockWritersExclusive();
                }
            }
            if (this.debugEnabled) {
                logger.debug("{} flushCommands() Flushing {} commands", (Object)this.logPrefix(), (Object)queuedCommands.size());
            }
            this.writeToChannel(queuedCommands);
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} write(ctx, {}, promise)", (Object)this.logPrefix(), msg);
        }
        if (msg instanceof RedisCommand) {
            this.writeSingleCommand(ctx, (RedisCommand)msg, promise);
            return;
        }
        if (msg instanceof Collection) {
            this.writeBatch(ctx, (Collection)msg, promise);
        }
    }

    private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command, ChannelPromise promise) throws Exception {
        if (!CommandHandler.isWriteable(command)) {
            return;
        }
        this.addToStack(command, promise);
        ctx.write(command, promise);
    }

    private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V, ?>> batch, ChannelPromise promise) throws Exception {
        Collection<RedisCommand<K, V, ?>> toWrite = batch;
        int commandsToWrite = 0;
        boolean cancelledCommands = false;
        for (RedisCommand<K, V, ?> command : batch) {
            if (!CommandHandler.isWriteable(command)) {
                cancelledCommands = true;
                break;
            }
            ++commandsToWrite;
        }
        try {
            this.validateWrite(commandsToWrite);
        }
        catch (Exception e) {
            for (RedisCommand redisCommand : toWrite) {
                redisCommand.completeExceptionally(e);
            }
            promise.setFailure((Throwable)e);
            return;
        }
        if (cancelledCommands) {
            toWrite = new ArrayList(batch.size());
            for (RedisCommand<K, V, ?> command : batch) {
                if (!CommandHandler.isWriteable(command)) continue;
                toWrite.add(command);
            }
        }
        for (RedisCommand<K, V, ?> command : toWrite) {
            this.addToStack(command, promise);
        }
        if (!toWrite.isEmpty()) {
            ctx.write(toWrite, promise);
        }
    }

    private void addToStack(RedisCommand<K, V, ?> command, ChannelPromise promise) {
        try {
            this.validateWrite(1);
            if (command.getOutput() == null) {
                command.complete();
            } else {
                if (this.usesBoundedQueues() && this.stack.size() >= this.clientOptions.getRequestQueueSize()) {
                    throw new RedisException("Internal stack size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the stack size drops.");
                }
                RedisCommand<K, V, ?> commandToUse = this.potentiallyWrapLatencyCommand(command);
                if (this.stack.contains(command)) {
                    throw new RedisException("Attempting to write duplicate command that is already enqueued: " + command);
                }
                if (promise.getClass() == VOID_PROMISE_CLASS) {
                    this.stack.add(commandToUse);
                } else {
                    promise.addListener(future -> {
                        if (future.isSuccess()) {
                            this.stack.add(commandToUse);
                        }
                    });
                }
            }
        }
        catch (RuntimeException e) {
            command.completeExceptionally(e);
            promise.setFailure((Throwable)e);
            throw e;
        }
    }

    private boolean usesBoundedQueues() {
        return this.boundedQueue;
    }

    private void validateWrite(int commands) {
        if (this.usesBoundedQueues() && this.stack.size() + commands > this.clientOptions.getRequestQueueSize()) {
            throw new RedisException("Internal stack size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the stack size drops.");
        }
    }

    private static boolean isWriteable(RedisCommand<?, ?, ?> command) {
        return !command.isDone();
    }

    private <T> RedisCommand<K, V, T> potentiallyWrapLatencyCommand(RedisCommand<K, V, T> command) {
        if (!this.latencyMetricsEnabled) {
            return command;
        }
        if (command instanceof WithLatency) {
            WithLatency withLatency = (WithLatency)((Object)command);
            withLatency.firstResponse(-1L);
            withLatency.sent(CommandHandler.nanoTime());
            return command;
        }
        LatencyMeteredCommand<K, V, T> latencyMeteredCommand = new LatencyMeteredCommand<K, V, T>(command);
        latencyMeteredCommand.firstResponse(-1L);
        latencyMeteredCommand.sent(CommandHandler.nanoTime());
        return latencyMeteredCommand;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof Reset) {
            List<RedisCommand<K, V, ?>> toCancel;
            Object object = this.stateLock;
            synchronized (object) {
                try {
                    this.lockWritersExclusive();
                    toCancel = CommandHandler.drainCommands(this.stack);
                }
                finally {
                    this.unlockWritersExclusive();
                }
            }
            this.resetInternals();
            this.cancelCommands(((Reset)evt).message, toCancel);
        }
        if (evt instanceof ConnectionEvents.PingBeforeActivate) {
            ConnectionEvents.PingBeforeActivate pba = (ConnectionEvents.PingBeforeActivate)evt;
            this.stack.addFirst(pba.getCommand());
            ctx.writeAndFlush(pba.getCommand());
        }
        super.userEventTriggered(ctx, evt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.logPrefix = null;
        this.connectionWatchdog = null;
        if (this.debugEnabled) {
            logger.debug("{} channelActive()", (Object)this.logPrefix());
        }
        this.connectionWatchdog = CommandHandler.getConnectionWatchdog(ctx.pipeline());
        Object object = this.stateLock;
        synchronized (object) {
            try {
                this.lockWritersExclusive();
                this.setState(LifecycleState.CONNECTED);
                try {
                    this.rebuildQueue();
                    this.activateCommandHandlerAndExecuteBufferedCommands(ctx);
                }
                catch (Exception e) {
                    if (this.debugEnabled) {
                        logger.debug("{} channelActive() ran into an exception", (Object)this.logPrefix());
                    }
                    if (this.clientOptions.isCancelCommandsOnReconnectFailure()) {
                        this.reset();
                    }
                    throw e;
                }
            }
            finally {
                this.unlockWritersExclusive();
            }
        }
        super.channelActive(ctx);
        if (this.channel != null) {
            this.channel.eventLoop().submit(new Runnable(){

                @Override
                public void run() {
                    CommandHandler.this.channel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Activated());
                }
            });
        }
        if (this.debugEnabled) {
            logger.debug("{} channelActive() done", (Object)this.logPrefix());
        }
    }

    private static ConnectionWatchdog getConnectionWatchdog(ChannelPipeline pipeline) {
        if (pipeline != null) {
            Map map = pipeline.toMap();
            for (ChannelHandler handler : map.values()) {
                if (!(handler instanceof ConnectionWatchdog)) continue;
                return (ConnectionWatchdog)handler;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelInactive()", (Object)this.logPrefix());
        }
        if (this.channel != null && ctx.channel() != this.channel) {
            logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners.", (Object)this.logPrefix());
            super.channelInactive(ctx);
            return;
        }
        Object object = this.stateLock;
        synchronized (object) {
            try {
                this.lockWritersExclusive();
                this.setState(LifecycleState.DISCONNECTED);
                if (this.redisChannelHandler != null) {
                    if (this.debugEnabled) {
                        logger.debug("{} deactivating channel handler", (Object)this.logPrefix());
                    }
                    this.setState(LifecycleState.DEACTIVATING);
                    this.redisChannelHandler.deactivated();
                }
                this.rebuildQueue();
                this.setState(LifecycleState.DEACTIVATED);
                this.channel = null;
            }
            finally {
                this.unlockWritersExclusive();
            }
        }
        this.rsm.reset();
        if (this.debugEnabled) {
            logger.debug("{} channelInactive() done", (Object)this.logPrefix());
        }
        super.channelInactive(ctx);
    }

    private void rebuildQueue() {
        ArrayList queuedCommands = new ArrayList(this.stack.size() + this.disconnectedBuffer.size());
        CommandHandler.drainCommands(this.stack, queuedCommands);
        CommandHandler.drainCommands(this.disconnectedBuffer, queuedCommands);
        try {
            this.disconnectedBuffer.addAll(queuedCommands);
        }
        catch (RuntimeException e) {
            if (this.debugEnabled) {
                logger.debug("{} rebuildQueue Queue overcommit. Cannot add all commands to buffer (disconnected).", (Object)this.logPrefix(), (Object)queuedCommands.size());
            }
            queuedCommands.removeAll(this.disconnectedBuffer);
            for (RedisCommand command : queuedCommands) {
                command.completeExceptionally(e);
            }
        }
        if (this.debugEnabled) {
            logger.debug("{} rebuildQueue {} command(s) added to buffer", (Object)this.logPrefix(), (Object)this.disconnectedBuffer.size());
        }
    }

    private void activateCommandHandlerAndExecuteBufferedCommands(ChannelHandlerContext ctx) {
        this.connectionError = null;
        if (this.debugEnabled) {
            logger.debug("{} activateCommandHandlerAndExecuteBufferedCommands {} command(s) buffered", (Object)this.logPrefix(), (Object)this.disconnectedBuffer.size());
        }
        this.channel = ctx.channel();
        if (this.redisChannelHandler != null) {
            if (this.debugEnabled) {
                logger.debug("{} activating channel handler", (Object)this.logPrefix());
            }
            this.setState(LifecycleState.ACTIVATING);
            this.redisChannelHandler.activated();
        }
        this.setState(LifecycleState.ACTIVE);
        this.flushCommands(this.disconnectedBuffer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelCommands(String message) {
        List<RedisCommand<K, V, ?>> toCancel;
        Object object = this.stateLock;
        synchronized (object) {
            try {
                this.lockWritersExclusive();
                toCancel = this.prepareReset();
            }
            finally {
                this.unlockWritersExclusive();
            }
        }
        if (this.channel != null) {
            this.channel.pipeline().fireUserEventTriggered((Object)new Reset(message));
        } else {
            this.resetInternals();
        }
        this.cancelCommands(message, toCancel);
    }

    private void cancelCommands(String message, List<RedisCommand<K, V, ?>> toCancel) {
        for (RedisCommand<K, V, ?> cmd : toCancel) {
            if (cmd.getOutput() != null) {
                cmd.getOutput().setError(message);
            }
            cmd.cancel();
        }
    }

    private void resetInternals() {
        this.rsm.reset();
        if (this.buffer.refCnt() > 0) {
            this.buffer.clear();
        }
    }

    protected List<RedisCommand<K, V, ?>> prepareReset() {
        int size = this.disconnectedBuffer.size() + this.commandBuffer.size();
        ArrayList toCancel = new ArrayList(size);
        CommandHandler.drainCommands(this.disconnectedBuffer, toCancel);
        CommandHandler.drainCommands(this.commandBuffer, toCancel);
        return toCancel;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        InternalLogLevel logLevel = InternalLogLevel.WARN;
        if (!this.stack.isEmpty()) {
            RedisCommand<K, V, ?> command = this.stack.poll();
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in {}", (Object)this.logPrefix(), command);
            }
            logLevel = InternalLogLevel.DEBUG;
            try {
                command.completeExceptionally(cause);
            }
            catch (Exception ex) {
                logger.warn("{} Unexpected exception during command completion exceptionally: {}", new Object[]{this.logPrefix, ex.toString(), ex});
            }
        }
        if (this.channel == null || !this.channel.isActive() || !this.isConnected()) {
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in connectionError", (Object)this.logPrefix());
            }
            logLevel = InternalLogLevel.DEBUG;
            this.connectionError = cause;
        }
        if (cause instanceof IOException && logLevel.ordinal() > InternalLogLevel.INFO.ordinal()) {
            logLevel = InternalLogLevel.INFO;
            if (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                logLevel = InternalLogLevel.DEBUG;
            }
        }
        logger.log(logLevel, "{} Unexpected exception during request: {}", new Object[]{this.logPrefix, cause.toString(), cause});
    }

    @Override
    public void close() {
        if (this.debugEnabled) {
            logger.debug("{} close()", (Object)this.logPrefix());
        }
        if (this.isClosed()) {
            return;
        }
        this.setState(LifecycleState.CLOSED);
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            ChannelFuture close = currentChannel.pipeline().close();
            if (currentChannel.isOpen()) {
                close.syncUninterruptibly();
            }
        } else if (this.connectionWatchdog != null) {
            this.connectionWatchdog.prepareClose(new ConnectionEvents.PrepareClose());
        }
        this.rsm.close();
        if (this.buffer.refCnt() > 0) {
            this.buffer.release();
        }
    }

    @Override
    public void reset() {
        if (this.debugEnabled) {
            logger.debug("{} reset()", (Object)this.logPrefix());
        }
        this.cancelCommands("Reset");
    }

    public void initialState() {
        this.setState(LifecycleState.NOT_CONNECTED);
        this.disconnectedBuffer.clear();
        this.stack.clear();
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            currentChannel.pipeline().close();
        }
    }

    protected String logPrefix() {
        String buffer;
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        this.logPrefix = buffer = "[" + ChannelLogDescriptor.logDescriptor(this.channel) + ", chid=0x" + Long.toHexString(this.commandHandlerId) + ']';
        return this.logPrefix;
    }

    protected static <T> List<T> drainCommands(Queue<T> source) {
        ArrayList target = new ArrayList(source.size());
        CommandHandler.drainCommands(source, target);
        return target;
    }

    protected static <T> void drainCommands(Queue<T> source, Collection<T> target) {
        T element;
        while ((element = source.poll()) != null) {
            target.add(element);
        }
    }

    private static long nanoTime() {
        return System.nanoTime();
    }

    static {
        Class<?> voidPromiseClass;
        logger = InternalLoggerFactory.getInstance(CommandHandler.class);
        CHANNEL_COUNTER = new AtomicLong();
        QUEUE_SIZE = AtomicIntegerFieldUpdater.newUpdater(CommandHandler.class, "queueSize");
        SUPPRESS_IO_EXCEPTION_MESSAGES = LettuceSets.unmodifiableSet("Connection reset by peer", "Broken pipe", "Connection timed out");
        try {
            voidPromiseClass = LettuceClassUtils.forName("io.netty.channel.VoidChannelPromise");
        }
        catch (ClassNotFoundException e) {
            voidPromiseClass = null;
        }
        VOID_PROMISE_CLASS = voidPromiseClass;
    }

    private static class Reset {
        final String message;

        public Reset(String message) {
            this.message = message;
        }
    }

    private static enum Reliability {
        AT_MOST_ONCE,
        AT_LEAST_ONCE;

    }

    public static enum LifecycleState {
        NOT_CONNECTED,
        REGISTERED,
        CONNECTED,
        ACTIVATING,
        ACTIVE,
        DISCONNECTED,
        DEACTIVATING,
        DEACTIVATED,
        CLOSED;

    }

    private class RetryListener
    extends ListenerSupport
    implements GenericFutureListener<Future<Void>> {
        RetryListener(RedisCommand<K, V, ?> sentCommand) {
            super(sentCommand);
        }

        RetryListener(Collection<? extends RedisCommand<K, V, ?>> sentCommands) {
            super(sentCommands);
        }

        public void operationComplete(Future<Void> future) throws Exception {
            Throwable cause = future.cause();
            boolean success = future.isSuccess();
            this.dequeue();
            if (!success) {
                Channel channel = CommandHandler.this.channel;
                if (channel != null) {
                    channel.eventLoop().submit(this::requeueCommands);
                } else {
                    CommandHandler.this.clientResources.eventExecutorGroup().submit(this::requeueCommands);
                }
            }
            if (!success && !(cause instanceof ClosedChannelException)) {
                String message = "Unexpected exception during request: {}";
                InternalLogLevel logLevel = InternalLogLevel.WARN;
                if (cause instanceof IOException && SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                    logLevel = InternalLogLevel.DEBUG;
                }
                logger.log(logLevel, message, (Object)cause.toString(), (Object)cause);
            }
        }

        private void requeueCommands() {
            if (this.sentCommand != null) {
                try {
                    CommandHandler.this.write(this.sentCommand);
                }
                catch (Exception e) {
                    this.complete(e);
                }
            }
            if (this.sentCommands != null) {
                for (RedisCommand command : this.sentCommands) {
                    try {
                        CommandHandler.this.write(command);
                    }
                    catch (Exception e) {
                        this.complete(e);
                    }
                }
            }
        }
    }

    private class AtMostOnceWriteListener
    extends ListenerSupport
    implements ChannelFutureListener {
        AtMostOnceWriteListener(RedisCommand<K, V, ?> sentCommand) {
            super(sentCommand);
        }

        AtMostOnceWriteListener(Collection<? extends RedisCommand<K, V, ?>> sentCommands) {
            super(sentCommands);
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            this.dequeue();
            if (future.cause() != null) {
                this.complete(future.cause());
            }
        }
    }

    private class ListenerSupport {
        final Collection<? extends RedisCommand<K, V, ?>> sentCommands;
        final RedisCommand<K, V, ?> sentCommand;

        ListenerSupport(RedisCommand<K, V, ?> sentCommand) {
            this.sentCommand = sentCommand;
            this.sentCommands = null;
        }

        ListenerSupport(Collection<? extends RedisCommand<K, V, ?>> sentCommands) {
            this.sentCommand = null;
            this.sentCommands = sentCommands;
        }

        void dequeue() {
            if (this.sentCommand != null) {
                QUEUE_SIZE.decrementAndGet(CommandHandler.this);
            }
            if (this.sentCommands != null) {
                QUEUE_SIZE.addAndGet(CommandHandler.this, -this.sentCommands.size());
            }
        }

        protected void complete(Throwable t) {
            if (this.sentCommand != null) {
                this.sentCommand.completeExceptionally(t);
            }
            if (this.sentCommands != null) {
                for (RedisCommand sentCommand : this.sentCommands) {
                    sentCommand.completeExceptionally(t);
                }
            }
        }
    }
}

