/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.aliyunmq;

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.components.mq.aliyunmq.AliyunmqMessageContext;
import io.jboot.components.mq.aliyunmq.JbootAliyunmqConfig;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.util.Map;
import java.util.Properties;

public class JbootAliyunmqImpl
extends JbootmqBase
implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootAliyunmqImpl.class);
    private Producer producer;
    private Consumer consumer;
    private JbootAliyunmqConfig aliyunmqConfig;

    public JbootAliyunmqImpl(JbootmqConfig config) {
        super(config);
        String typeName = config.getTypeName();
        if (StrUtil.isNotBlank(typeName)) {
            Map<String, JbootAliyunmqConfig> configModels = ConfigUtil.getConfigModels(JbootAliyunmqConfig.class);
            if (!configModels.containsKey(typeName)) {
                throw new JbootIllegalConfigException("Please config \"jboot.mq.aliyun." + typeName + ".addr\" in your jboot.properties.");
            }
            this.aliyunmqConfig = configModels.get(typeName);
        } else {
            this.aliyunmqConfig = Jboot.config(JbootAliyunmqConfig.class);
        }
    }

    @Override
    protected void onStartListening() {
        this.startQueueConsumer();
        this.startBroadCastConsumer();
    }

    @Override
    protected void onStopListening() {
        if (this.consumer != null) {
            this.consumer.shutdown();
            this.consumer = null;
        }
    }

    public void startQueueConsumer() {
        Properties properties = this.createProperties();
        this.consumer = ONSFactory.createConsumer((Properties)properties);
        for (String channel : this.channels) {
            this.consumer.subscribe(this.aliyunmqConfig.getBroadcastChannelPrefix() + channel, this.aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> {
                AliyunmqMessageContext context = new AliyunmqMessageContext(this, message, consumeContext);
                this.notifyListeners(channel, this.getSerializer().deserialize(message.getBody()), context);
                return context.getReturnAction();
            });
        }
        this.consumer.start();
    }

    public void startBroadCastConsumer() {
        Properties properties = this.createProperties();
        properties.put("MessageModel", "BROADCASTING");
        this.consumer = ONSFactory.createConsumer((Properties)properties);
        for (String channel : this.channels) {
            this.consumer.subscribe(channel, this.aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> {
                AliyunmqMessageContext aliyunmqMessageInfo = new AliyunmqMessageContext(this, message, consumeContext);
                this.notifyListeners(channel, this.getSerializer().deserialize(message.getBody()), aliyunmqMessageInfo);
                return aliyunmqMessageInfo.getReturnAction();
            });
        }
        this.consumer.start();
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        Message sendMsg = null;
        if (message instanceof Message) {
            sendMsg = (Message)message;
        } else {
            byte[] bytes = this.getSerializer().serialize(message);
            sendMsg = new Message(toChannel, "*", bytes);
        }
        SendResult result = this.getProducer().send(sendMsg);
        if (result == null) {
            LOG.warn("Rockect mq send message fail!!!");
        }
    }

    @Override
    public void publish(Object message, String toChannel) {
        Message sendMsg = null;
        if (message instanceof Message) {
            sendMsg = (Message)message;
        } else {
            byte[] bytes = this.getSerializer().serialize(message);
            sendMsg = new Message(this.aliyunmqConfig.getBroadcastChannelPrefix() + toChannel, "*", bytes);
        }
        SendResult result = this.getProducer().send(sendMsg);
        if (result == null) {
            LOG.warn("Rockect mq send message fail!!!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Producer getProducer() {
        if (this.producer == null) {
            JbootAliyunmqImpl jbootAliyunmqImpl = this;
            synchronized (jbootAliyunmqImpl) {
                if (this.producer == null) {
                    this.createProducer();
                }
            }
        }
        return this.producer;
    }

    public void createProducer() {
        Properties properties = this.createProperties();
        this.producer = ONSFactory.createProducer((Properties)properties);
        this.producer.start();
    }

    public Properties createProperties() {
        Properties properties = new Properties();
        properties.put("AccessKey", this.aliyunmqConfig.getAccessKey());
        properties.put("SecretKey", this.aliyunmqConfig.getSecretKey());
        properties.put("ProducerId", this.aliyunmqConfig.getProducerId());
        properties.put("NAMESRV_ADDR", this.aliyunmqConfig.getAddr());
        properties.put("InstanceName", this.aliyunmqConfig.getInstanceName());
        properties.setProperty("SendMsgTimeoutMillis", this.aliyunmqConfig.getSendMsgTimeoutMillis());
        return properties;
    }
}

