/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.GeoArgs;
import com.lambdaworks.redis.GeoWithin;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.AbstractNodeSelection;
import com.lambdaworks.redis.cluster.AsyncClusterConnectionProvider;
import com.lambdaworks.redis.cluster.ClusterConnectionProvider;
import com.lambdaworks.redis.cluster.ClusterDistributionChannelWriter;
import com.lambdaworks.redis.cluster.NodeSelectionInvocationHandler;
import com.lambdaworks.redis.cluster.StatefulRedisClusterPubSubConnectionImpl;
import com.lambdaworks.redis.cluster.api.NodeSelectionSupport;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import com.lambdaworks.redis.cluster.pubsub.api.rx.NodeSelectionPubSubReactiveCommands;
import com.lambdaworks.redis.cluster.pubsub.api.rx.PubSubReactiveNodeSelection;
import com.lambdaworks.redis.cluster.pubsub.api.rx.RedisClusterPubSubReactiveCommands;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.pubsub.RedisPubSubReactiveCommandsImpl;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;
import com.lambdaworks.redis.pubsub.api.rx.RedisPubSubReactiveCommands;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import rx.Observable;

class RedisClusterPubSubReactiveCommandsImpl<K, V>
extends RedisPubSubReactiveCommandsImpl<K, V>
implements RedisClusterPubSubReactiveCommands<K, V> {
    public RedisClusterPubSubReactiveCommandsImpl(StatefulRedisClusterPubSubConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
    }

    @Override
    public Observable<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
        if (((StatefulRedisClusterPubSubConnectionImpl)this.getStatefulConnection()).getState().hasCommand(CommandType.GEORADIUS_RO)) {
            return super.georadius_ro(key, longitude, latitude, distance, unit);
        }
        return super.georadius(key, longitude, latitude, distance, unit);
    }

    @Override
    public Observable<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
        if (((StatefulRedisClusterPubSubConnectionImpl)this.getStatefulConnection()).getState().hasCommand(CommandType.GEORADIUS_RO)) {
            return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
        }
        return super.georadius(key, longitude, latitude, distance, unit, geoArgs);
    }

    @Override
    public Observable<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
        if (((StatefulRedisClusterPubSubConnectionImpl)this.getStatefulConnection()).getState().hasCommand(CommandType.GEORADIUS_RO)) {
            return super.georadiusbymember_ro(key, member, distance, unit);
        }
        return super.georadiusbymember(key, member, distance, unit);
    }

    @Override
    public Observable<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
        if (((StatefulRedisClusterPubSubConnectionImpl)this.getStatefulConnection()).getState().hasCommand(CommandType.GEORADIUS_RO)) {
            return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
        }
        return super.georadiusbymember(key, member, distance, unit, geoArgs);
    }

    @Override
    public StatefulRedisClusterPubSubConnectionImpl<K, V> getStatefulConnection() {
        return (StatefulRedisClusterPubSubConnectionImpl)super.getStatefulConnection();
    }

    @Override
    public PubSubReactiveNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate) {
        StaticPubSubReactiveNodeSelection selection = new StaticPubSubReactiveNodeSelection(this.getStatefulConnection(), predicate);
        NodeSelectionInvocationHandler h = new NodeSelectionInvocationHandler(selection, RedisPubSubReactiveCommands.class, NodeSelectionInvocationHandler.ExecutionModel.REACTIVE);
        return (PubSubReactiveNodeSelection)Proxy.newProxyInstance(NodeSelectionSupport.class.getClassLoader(), new Class[]{NodeSelectionPubSubReactiveCommands.class, PubSubReactiveNodeSelection.class}, (InvocationHandler)h);
    }

    private static class StaticPubSubReactiveNodeSelection<K, V>
    extends AbstractNodeSelection<RedisPubSubReactiveCommands<K, V>, NodeSelectionPubSubReactiveCommands<K, V>, K, V>
    implements PubSubReactiveNodeSelection<K, V> {
        private final List<RedisClusterNode> redisClusterNodes;
        private final ClusterDistributionChannelWriter<K, V> writer;

        public StaticPubSubReactiveNodeSelection(StatefulRedisClusterPubSubConnection<K, V> globalConnection, Predicate<RedisClusterNode> selector) {
            this.redisClusterNodes = globalConnection.getPartitions().getPartitions().stream().filter(selector).collect(Collectors.toList());
            this.writer = ((StatefulRedisClusterPubSubConnectionImpl)globalConnection).getClusterDistributionChannelWriter();
        }

        @Override
        protected CompletableFuture<RedisPubSubReactiveCommands<K, V>> getApi(RedisClusterNode redisClusterNode) {
            return this.getConnection(redisClusterNode).thenApply(StatefulRedisPubSubConnection::reactive);
        }

        @Override
        protected List<RedisClusterNode> nodes() {
            return this.redisClusterNodes;
        }

        @Override
        protected CompletableFuture<StatefulRedisPubSubConnection<K, V>> getConnection(RedisClusterNode redisClusterNode) {
            RedisURI uri = redisClusterNode.getUri();
            AsyncClusterConnectionProvider async = (AsyncClusterConnectionProvider)((Object)this.writer.getClusterConnectionProvider());
            return async.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, uri.getHost(), uri.getPort()).thenApply(it -> (StatefulRedisPubSubConnection)it);
        }
    }
}

