/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.redismq;

import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.components.mq.redismq.JbootRedismqConfig;
import io.jboot.components.mq.redismq.RedismqMessageContext;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.support.redis.JbootRedis;
import io.jboot.support.redis.JbootRedisManager;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.util.Map;
import redis.clients.jedis.BinaryJedisPubSub;

public class JbootRedismqImpl
extends JbootmqBase
implements Jbootmq,
Runnable {
    private static final Log LOG = Log.getLog(JbootRedismqImpl.class);
    private JbootRedis redis;
    private Thread dequeueThread;
    private BinaryJedisPubSub jedisPubSub;

    public JbootRedismqImpl(JbootmqConfig config) {
        super(config);
        JbootRedismqConfig redisConfig = null;
        String typeName = config.getTypeName();
        if (StrUtil.isNotBlank(typeName)) {
            Map<String, JbootRedismqConfig> configModels = ConfigUtil.getConfigModels(JbootRedismqConfig.class);
            if (!configModels.containsKey(typeName)) {
                throw new JbootIllegalConfigException("Please config \"jboot.mq.redis." + typeName + ".host\" in your jboot.properties.");
            }
            redisConfig = configModels.get(typeName);
        } else {
            redisConfig = Jboot.config(JbootRedismqConfig.class);
        }
        this.redis = redisConfig.isConfigOk() ? JbootRedisManager.me().getRedis(redisConfig) : Jboot.getRedis();
        if (this.redis == null) {
            throw new JbootIllegalConfigException("can not use redis mq (redis mq is default), please config jboot.redis.host=your-host , or use other mq component. ");
        }
    }

    @Override
    protected void onStartListening() {
        Object[] channels = this.channels.toArray(new String[0]);
        this.jedisPubSub = new BinaryJedisPubSub(){

            public void onMessage(byte[] channel, byte[] message) {
                JbootRedismqImpl.this.notifyListeners(JbootRedismqImpl.this.redis.bytesToKey(channel), JbootRedismqImpl.this.getSerializer().deserialize(message), new RedismqMessageContext(JbootRedismqImpl.this));
            }
        };
        this.redis.subscribe(this.jedisPubSub, this.redis.keysToBytesArray(channels));
        this.dequeueThread = new Thread((Runnable)this, "redis-dequeue-thread");
        this.dequeueThread.start();
    }

    @Override
    protected void onStopListening() {
        if (this.jedisPubSub != null) {
            this.jedisPubSub.unsubscribe();
        }
        this.dequeueThread.interrupt();
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        this.redis.lpush(toChannel, message);
    }

    @Override
    public void publish(Object message, String toChannel) {
        this.redis.publish(this.redis.keyToBytes(toChannel), this.getSerializer().serialize(message));
    }

    @Override
    public void run() {
        while (this.isStarted) {
            try {
                this.doExecuteDequeue();
                Thread.sleep(1L);
            }
            catch (Exception ex) {
                LOG.error(ex.toString(), (Throwable)ex);
            }
        }
    }

    public void doExecuteDequeue() {
        for (String channel : this.channels) {
            Object data = this.redis.lpop(channel);
            if (data == null) continue;
            this.notifyListeners(channel, data, new RedismqMessageContext(this));
        }
    }
}

