/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.AbstractRedisClient;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisCommandExecutionException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.ClusterConnectionProvider;
import com.lambdaworks.redis.cluster.ClusterDistributionChannelWriter;
import com.lambdaworks.redis.cluster.ClusterFutureSyncInvocationHandler;
import com.lambdaworks.redis.cluster.RedisClusterPubSubAsyncCommandsImpl;
import com.lambdaworks.redis.cluster.RedisClusterPubSubReactiveCommandsImpl;
import com.lambdaworks.redis.cluster.RedisState;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.cluster.pubsub.RedisClusterPubSubAdapter;
import com.lambdaworks.redis.cluster.pubsub.RedisClusterPubSubListener;
import com.lambdaworks.redis.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import com.lambdaworks.redis.cluster.pubsub.api.async.RedisClusterPubSubAsyncCommands;
import com.lambdaworks.redis.cluster.pubsub.api.rx.RedisClusterPubSubReactiveCommands;
import com.lambdaworks.redis.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import com.lambdaworks.redis.cluster.pubsub.api.sync.PubSubNodeSelection;
import com.lambdaworks.redis.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.models.command.CommandDetailParser;
import com.lambdaworks.redis.pubsub.PubSubOutput;
import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl;
import com.lambdaworks.redis.pubsub.RedisPubSubReactiveCommandsImpl;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnectionImpl;
import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands;
import com.lambdaworks.redis.pubsub.api.sync.RedisPubSubCommands;
import io.netty.channel.ChannelHandler;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

@ChannelHandler.Sharable
class StatefulRedisClusterPubSubConnectionImpl<K, V>
extends StatefulRedisPubSubConnectionImpl<K, V>
implements StatefulRedisClusterPubSubConnection<K, V> {
    private final List<RedisClusterPubSubListener<K, V>> clusterListeners = new CopyOnWriteArrayList<RedisClusterPubSubListener<K, V>>();
    private final NotifyingMessageListener multicast = new NotifyingMessageListener();
    private final UpstreamMessageListener upstream = new UpstreamMessageListener();
    private volatile Partitions partitions;
    private volatile boolean nodeMessagePropagation = false;
    private volatile String nodeId;
    private RedisState state;

    public StatefulRedisClusterPubSubConnectionImpl(RedisChannelWriter<K, V> writer, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
        super(writer, codec, timeout, unit);
    }

    @Override
    public RedisClusterPubSubAsyncCommands<K, V> async() {
        return (RedisClusterPubSubAsyncCommands)super.async();
    }

    @Override
    protected RedisPubSubAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
        return new RedisClusterPubSubAsyncCommandsImpl(this, this.codec);
    }

    @Override
    public RedisClusterPubSubCommands<K, V> sync() {
        return (RedisClusterPubSubCommands)super.sync();
    }

    @Override
    protected RedisPubSubCommands<K, V> newRedisSyncCommandsImpl() {
        return (RedisPubSubCommands)Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), new Class[]{RedisClusterPubSubCommands.class, RedisPubSubCommands.class}, this.syncInvocationHandler());
    }

    private InvocationHandler syncInvocationHandler() {
        return new ClusterFutureSyncInvocationHandler(this, RedisPubSubAsyncCommands.class, PubSubNodeSelection.class, NodeSelectionPubSubCommands.class, this.async());
    }

    @Override
    public RedisClusterPubSubReactiveCommands<K, V> reactive() {
        return (RedisClusterPubSubReactiveCommands)super.reactive();
    }

    @Override
    protected RedisPubSubReactiveCommandsImpl<K, V> newRedisReactiveCommandsImpl() {
        return new RedisClusterPubSubReactiveCommandsImpl(this, this.codec);
    }

    void inspectRedisState() {
        try {
            this.state = new RedisState(CommandDetailParser.parse(this.sync().command()));
        }
        catch (RedisCommandExecutionException e) {
            this.state = new RedisState(Collections.emptyList());
        }
    }

    RedisState getState() {
        return this.state;
    }

    @Override
    public void activated() {
        super.activated();
        this.async().clusterMyId().thenAccept(nodeId -> {
            this.nodeId = nodeId;
        });
    }

    @Override
    public StatefulRedisPubSubConnection<K, V> getConnection(String nodeId) {
        RedisURI redisURI = this.lookup(nodeId);
        if (redisURI == null) {
            throw new RedisException("NodeId " + nodeId + " does not belong to the cluster");
        }
        return (StatefulRedisPubSubConnection)this.getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection(ClusterConnectionProvider.Intent.WRITE, nodeId);
    }

    @Override
    public StatefulRedisPubSubConnection<K, V> getConnection(String host, int port) {
        return (StatefulRedisPubSubConnection)this.getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection(ClusterConnectionProvider.Intent.WRITE, host, port);
    }

    public void setPartitions(Partitions partitions) {
        this.partitions = partitions;
        this.getClusterDistributionChannelWriter().setPartitions(partitions);
    }

    @Override
    public Partitions getPartitions() {
        return this.partitions;
    }

    @Override
    public void setNodeMessagePropagation(boolean enabled) {
        this.nodeMessagePropagation = enabled;
    }

    @Override
    public void addListener(RedisClusterPubSubListener<K, V> listener) {
        this.clusterListeners.add(listener);
    }

    @Override
    public void removeListener(RedisClusterPubSubListener<K, V> listener) {
        this.clusterListeners.remove(listener);
    }

    @Override
    protected void notifyListeners(PubSubOutput<K, V, V> output) {
        switch (output.type()) {
            case message: {
                this.multicast.message(this.getNode(), output.channel(), output.get());
                break;
            }
            case pmessage: {
                this.multicast.message(this.getNode(), output.pattern(), output.channel(), output.get());
                break;
            }
            case psubscribe: {
                this.multicast.psubscribed(this.getNode(), output.pattern(), output.count());
                break;
            }
            case punsubscribe: {
                this.multicast.punsubscribed(this.getNode(), output.pattern(), output.count());
                break;
            }
            case subscribe: {
                this.multicast.subscribed(this.getNode(), output.channel(), output.count());
                break;
            }
            case unsubscribe: {
                this.multicast.unsubscribed(this.getNode(), output.channel(), output.count());
                break;
            }
            default: {
                throw new UnsupportedOperationException("Operation " + (Object)((Object)output.type()) + " not supported");
            }
        }
    }

    protected RedisClusterPubSubListener<K, V> getUpstreamListener() {
        return this.upstream;
    }

    protected ClusterDistributionChannelWriter<K, V> getClusterDistributionChannelWriter() {
        return (ClusterDistributionChannelWriter)super.getChannelWriter();
    }

    private RedisClusterNode getNode() {
        if (this.nodeId == null) {
            return null;
        }
        return this.partitions.getPartitionByNodeId(this.nodeId);
    }

    private RedisURI lookup(String nodeId) {
        for (RedisClusterNode partition : this.partitions) {
            if (!partition.getNodeId().equals(nodeId)) continue;
            return partition.getUri();
        }
        return null;
    }

    private class NotifyingMessageListener
    extends RedisClusterPubSubAdapter<K, V> {
        private NotifyingMessageListener() {
        }

        @Override
        public void message(RedisClusterNode node, K channel, V message) {
            StatefulRedisClusterPubSubConnectionImpl.this.listeners.forEach(listener -> listener.message(channel, message));
            StatefulRedisClusterPubSubConnectionImpl.this.clusterListeners.forEach(listener -> listener.message(node, channel, message));
        }

        @Override
        public void message(RedisClusterNode node, K pattern, K channel, V message) {
            StatefulRedisClusterPubSubConnectionImpl.this.listeners.forEach(listener -> listener.message(pattern, channel, message));
            StatefulRedisClusterPubSubConnectionImpl.this.clusterListeners.forEach(listener -> listener.message(node, pattern, channel, message));
        }

        @Override
        public void subscribed(RedisClusterNode node, K channel, long count) {
            StatefulRedisClusterPubSubConnectionImpl.this.listeners.forEach(listener -> listener.subscribed(channel, count));
            StatefulRedisClusterPubSubConnectionImpl.this.clusterListeners.forEach(listener -> listener.subscribed(node, channel, count));
        }

        @Override
        public void psubscribed(RedisClusterNode node, K pattern, long count) {
            StatefulRedisClusterPubSubConnectionImpl.this.listeners.forEach(listener -> listener.psubscribed(pattern, count));
            StatefulRedisClusterPubSubConnectionImpl.this.clusterListeners.forEach(listener -> listener.psubscribed(node, pattern, count));
        }

        @Override
        public void unsubscribed(RedisClusterNode node, K channel, long count) {
            StatefulRedisClusterPubSubConnectionImpl.this.listeners.forEach(listener -> listener.unsubscribed(channel, count));
            StatefulRedisClusterPubSubConnectionImpl.this.clusterListeners.forEach(listener -> listener.unsubscribed(node, channel, count));
        }

        @Override
        public void punsubscribed(RedisClusterNode node, K pattern, long count) {
            StatefulRedisClusterPubSubConnectionImpl.this.listeners.forEach(listener -> listener.punsubscribed(pattern, count));
            StatefulRedisClusterPubSubConnectionImpl.this.clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
        }
    }

    private class UpstreamMessageListener
    extends NotifyingMessageListener {
        private UpstreamMessageListener() {
        }

        @Override
        public void message(RedisClusterNode node, K channel, V message) {
            if (StatefulRedisClusterPubSubConnectionImpl.this.nodeMessagePropagation) {
                super.message(node, channel, message);
            }
        }

        @Override
        public void message(RedisClusterNode node, K pattern, K channel, V message) {
            if (StatefulRedisClusterPubSubConnectionImpl.this.nodeMessagePropagation) {
                super.message(node, pattern, channel, message);
            }
        }

        @Override
        public void subscribed(RedisClusterNode node, K channel, long count) {
            if (StatefulRedisClusterPubSubConnectionImpl.this.nodeMessagePropagation) {
                super.subscribed(node, channel, count);
            }
        }

        @Override
        public void psubscribed(RedisClusterNode node, K pattern, long count) {
            if (StatefulRedisClusterPubSubConnectionImpl.this.nodeMessagePropagation) {
                super.psubscribed(node, pattern, count);
            }
        }

        @Override
        public void unsubscribed(RedisClusterNode node, K channel, long count) {
            if (StatefulRedisClusterPubSubConnectionImpl.this.nodeMessagePropagation) {
                super.unsubscribed(node, channel, count);
            }
        }

        @Override
        public void punsubscribed(RedisClusterNode node, K pattern, long count) {
            if (StatefulRedisClusterPubSubConnectionImpl.this.nodeMessagePropagation) {
                super.punsubscribed(node, pattern, count);
            }
        }
    }
}

