/*
 * Decompiled with CFR 0.152.
 */
package net.oschina.j2cache.redis;

import java.util.Properties;
import net.oschina.j2cache.ClusterPolicy;
import net.oschina.j2cache.Command;
import net.oschina.j2cache.redis.RedisUtils;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;

public class RedisPubSubClusterPolicy
extends JedisPubSub
implements ClusterPolicy {
    private static final Logger log = LoggerFactory.getLogger(RedisPubSubClusterPolicy.class);
    private JedisPool client;
    private String channel;
    private String host;
    private int port;
    private int timeout;
    private String password;

    public RedisPubSubClusterPolicy(String channel, Properties props) {
        this.channel = channel;
        String node = props.getProperty("channel.host");
        if (node == null || node.trim().length() == 0) {
            node = props.getProperty("hosts").split(",")[0];
        }
        String[] infos = node.split(":");
        this.host = infos[0];
        this.port = infos.length > 1 ? Integer.parseInt(infos[1]) : 6379;
        this.timeout = Integer.parseInt((String)props.getOrDefault((Object)"timeout", "2000"));
        this.password = props.getProperty("password");
        if (this.password != null && this.password.trim().length() == 0) {
            this.password = null;
        }
        JedisPoolConfig config = RedisUtils.newPoolConfig(props, null);
        this.client = new JedisPool((GenericObjectPoolConfig)config, this.host, this.port, this.timeout, this.password);
    }

    @Override
    public void connect(Properties props) {
        long ct = System.currentTimeMillis();
        try (Jedis jedis = this.client.getResource();){
            jedis.publish(this.channel, Command.join().json());
        }
        new Thread(() -> {
            while (true) {
                try (Jedis jedis = this.client.getResource();){
                    jedis.subscribe((JedisPubSub)this, new String[]{this.channel});
                    log.info("Disconnect to redis channel:" + this.channel);
                    return;
                }
                catch (JedisConnectionException e) {
                    log.error("Failed connect to redis, reconnect it.", (Throwable)e);
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException ie) {
                        return;
                    }
                }
            }
        }, "RedisSubscribeThread").start();
        log.info("Connected to redis channel:" + this.channel + ", time " + (System.currentTimeMillis() - ct) + " ms.");
    }

    @Override
    public void disconnect() {
        try (Jedis jedis = this.client.getResource();){
            this.unsubscribe();
            jedis.publish(this.channel, Command.quit().json());
        }
    }

    @Override
    public void sendEvictCmd(String region, String ... keys) {
        try (Jedis jedis = this.client.getResource();){
            jedis.publish(this.channel, new Command(2, region, keys).json());
        }
    }

    @Override
    public void sendClearCmd(String region) {
        try (Jedis jedis = this.client.getResource();){
            jedis.publish(this.channel, new Command(3, region, "").json());
        }
    }

    public void onMessage(String channel, String message) {
        try {
            Command cmd = Command.parse(message);
            if (cmd == null || cmd.isLocal()) {
                return;
            }
            switch (cmd.getOperator()) {
                case 1: {
                    log.info("Node-" + cmd.getSrc() + " joined to " + this.channel);
                    break;
                }
                case 2: {
                    this.evict(cmd.getRegion(), cmd.getKeys());
                    log.debug("Received cache evict message, region=" + cmd.getRegion() + ",key=" + String.join((CharSequence)",", cmd.getKeys()));
                    break;
                }
                case 3: {
                    this.clear(cmd.getRegion());
                    log.debug("Received cache clear message, region=" + cmd.getRegion());
                    break;
                }
                case 4: {
                    log.info("Node-" + cmd.getSrc() + " quit to " + this.channel);
                    break;
                }
                default: {
                    log.warn("Unknown message type = " + cmd.getOperator());
                    break;
                }
            }
        }
        catch (Exception e) {
            log.error("Failed to handle received msg", (Throwable)e);
        }
    }
}

