/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.impl.pool.ConnectionListener;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import io.vertx.redis.client.impl.ArrayQueue;
import io.vertx.redis.client.impl.ParserHandler;
import io.vertx.redis.client.impl.RequestImpl;
import io.vertx.redis.client.impl.types.ErrorType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class RedisConnectionImpl
implements RedisConnection,
ParserHandler {
    private static final String BASE_ADDRESS = "io.vertx.redis";
    private static final Logger LOG = LoggerFactory.getLogger(RedisConnectionImpl.class);
    private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
    private final ConnectionListener<RedisConnection> listener;
    private final ContextInternal context;
    private final EventBus eventBus;
    private final NetSocket netSocket;
    private final ArrayQueue waiting;
    private final int recycleTimeout;
    private Handler<Throwable> onException;
    private Handler<Void> onEnd;
    private Handler<Response> onMessage;

    public RedisConnectionImpl(Vertx vertx, ContextInternal context, ConnectionListener<RedisConnection> connectionListener, NetSocket netSocket, RedisOptions options) {
        this.listener = connectionListener;
        this.eventBus = vertx.eventBus();
        this.context = context;
        this.netSocket = netSocket;
        this.waiting = new ArrayQueue(options.getMaxWaitingHandlers());
        this.recycleTimeout = options.getPoolRecycleTimeout();
    }

    void forceClose() {
        this.listener.onEvict();
        this.netSocket.close();
    }

    @Override
    public void close() {
        long expired = this.recycleTimeout > 0 ? System.currentTimeMillis() + (long)this.recycleTimeout : 0L;
        this.listener.onRecycle(expired);
    }

    @Override
    public boolean pendingQueueFull() {
        return this.waiting.isFull();
    }

    @Override
    public RedisConnection exceptionHandler(Handler<Throwable> handler) {
        this.onException = handler;
        return this;
    }

    @Override
    public RedisConnection endHandler(Handler<Void> handler) {
        this.onEnd = handler;
        return this;
    }

    @Override
    public RedisConnection handler(Handler<Response> handler) {
        this.onMessage = handler;
        return this;
    }

    @Override
    public RedisConnection pause() {
        this.netSocket.pause();
        return this;
    }

    @Override
    public RedisConnection resume() {
        this.netSocket.resume();
        return this;
    }

    @Override
    public RedisConnection fetch(long size) {
        return this;
    }

    @Override
    public RedisConnection send(Request request, Handler<AsyncResult<Response>> handler) {
        boolean voidCmd = request.command().isVoid();
        if (!voidCmd && this.waiting.isFull()) {
            handler.handle((Object)Future.failedFuture((String)"Redis waiting Queue is full"));
            return this;
        }
        Buffer message = ((RequestImpl)request).encode();
        Promise promise = Promise.promise();
        promise.future().setHandler(handler);
        this.context.runOnContext(v -> {
            if (!voidCmd) {
                if (this.waiting.isFull()) {
                    handler.handle((Object)Future.failedFuture((String)"Redis waiting Queue is full"));
                    return;
                }
                this.waiting.offer(promise);
            }
            this.netSocket.write(message, write -> {
                if (write.failed()) {
                    this.fatal(write.cause());
                } else if (voidCmd) {
                    promise.complete();
                }
            });
        });
        return this;
    }

    @Override
    public RedisConnection batch(List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
        if (this.waiting.freeSlots() < commands.size()) {
            handler.handle((Object)Future.failedFuture((String)"Redis waiting Queue is full"));
            return this;
        }
        ArrayList<Promise> callbacks = new ArrayList<Promise>(commands.size());
        ArrayList replies = new ArrayList(commands.size());
        AtomicInteger count = new AtomicInteger(commands.size());
        AtomicBoolean failed = new AtomicBoolean(false);
        Buffer messages = Buffer.buffer();
        int i = 0;
        while (i < commands.size()) {
            int index = i++;
            RequestImpl req = (RequestImpl)commands.get(index);
            req.encode(messages);
            Promise p = Promise.promise();
            callbacks.add(index, p);
            p.future().setHandler(command -> {
                if (!failed.get()) {
                    if (command.failed()) {
                        failed.set(true);
                        if (handler != null) {
                            handler.handle((Object)Future.failedFuture((Throwable)command.cause()));
                        }
                        return;
                    }
                    replies.add(index, command.result());
                    if (count.decrementAndGet() == 0 && handler != null) {
                        handler.handle((Object)Future.succeededFuture((Object)replies));
                    }
                }
            });
        }
        this.context.runOnContext(v -> {
            if (this.waiting.freeSlots() < callbacks.size()) {
                handler.handle((Object)Future.failedFuture((String)"Redis waiting Queue is full"));
                return;
            }
            for (Promise callback : callbacks) {
                this.waiting.offer(callback);
            }
            this.netSocket.write(messages, write -> {
                if (write.failed()) {
                    this.fatal(write.cause());
                }
            });
        });
        return this;
    }

    @Override
    public void handle(Response reply) {
        if (this.waiting.isEmpty()) {
            if (this.onMessage != null) {
                this.context.runOnContext(v -> this.onMessage.handle((Object)reply));
            } else {
                if (reply.type() == ResponseType.MULTI) {
                    if (reply.size() == 3 && "message".equals(reply.get(0).toString())) {
                        this.eventBus.send("io.vertx.redis." + reply.get(1).toString(), (Object)new JsonObject().put("status", "OK").put("value", new JsonObject().put("channel", reply.get(1).toString()).put("message", reply.get(2).toString())));
                        return;
                    }
                    if (reply.size() == 4 && "pmessage".equals(reply.get(0).toString())) {
                        this.eventBus.send("io.vertx.redis." + reply.get(1).toString(), (Object)new JsonObject().put("status", "OK").put("value", new JsonObject().put("pattern", reply.get(1).toString()).put("channel", reply.get(2).toString()).put("message", reply.get(3).toString())));
                        return;
                    }
                }
                LOG.warn((Object)("No handler waiting for message: " + reply));
            }
            return;
        }
        this.context.runOnContext(v -> {
            Promise req = (Promise)this.waiting.poll();
            if (req != null) {
                if (reply == null) {
                    try {
                        req.complete();
                    }
                    catch (RuntimeException e) {
                        this.fail(e);
                    }
                    return;
                }
                if (reply.type() == ResponseType.ERROR) {
                    try {
                        req.fail((Throwable)((ErrorType)reply));
                    }
                    catch (RuntimeException e) {
                        this.fail(e);
                    }
                    return;
                }
                try {
                    req.complete((Object)reply);
                }
                catch (RuntimeException e) {
                    this.fail(e);
                }
            } else {
                LOG.error((Object)("No handler waiting for message: " + reply));
            }
        });
    }

    public void end(Void v) {
        this.cleanupQueue(CONNECTION_CLOSED);
        if (this.onEnd != null) {
            this.context.runOnContext(x -> this.onEnd.handle((Object)v));
        }
    }

    @Override
    public void fail(Throwable t) {
        block3: {
            try {
                this.listener.onEvict();
            }
            catch (RejectedExecutionException e) {
                if (this.onException == null) break block3;
                this.context.runOnContext(x -> this.onException.handle((Object)e));
            }
        }
        if (this.onException != null) {
            this.context.runOnContext(x -> this.onException.handle((Object)t));
        }
    }

    @Override
    public void fatal(Throwable t) {
        block3: {
            this.cleanupQueue(t);
            try {
                this.listener.onEvict();
            }
            catch (RejectedExecutionException e) {
                if (this.onException == null) break block3;
                this.context.runOnContext(x -> this.onException.handle((Object)e));
            }
        }
        if (this.onException != null) {
            this.context.runOnContext(x -> this.onException.handle((Object)t));
        }
    }

    private void cleanupQueue(Throwable t) {
        this.context.runOnContext(v -> {
            Promise req;
            while ((req = (Promise)this.waiting.poll()) != null) {
                if (t == null) continue;
                try {
                    req.fail(t);
                }
                catch (RuntimeException e) {
                    LOG.warn((Object)"Exception during cleanup", (Throwable)e);
                }
            }
        });
    }
}

