/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.rocketmq;

import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.components.mq.rocketmq.JbootRocketmqConfig;
import io.jboot.components.mq.rocketmq.RokectmqMessageContext;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.util.Map;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class JbootRocketmqImpl
extends JbootmqBase
implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootRocketmqImpl.class);
    private JbootRocketmqConfig rocketmqConfig;
    private MQProducer mqProducer;
    private DefaultMQPushConsumer queueConsumer;
    private DefaultMQPushConsumer broadcastConsumer;

    public JbootRocketmqImpl(JbootmqConfig config) {
        super(config);
        String typeName = config.getTypeName();
        if (StrUtil.isNotBlank(typeName)) {
            Map<String, JbootRocketmqConfig> configModels = ConfigUtil.getConfigModels(JbootRocketmqConfig.class);
            if (!configModels.containsKey(typeName)) {
                throw new JbootIllegalConfigException("Please config \"jboot.mq.rocket." + typeName + ".namesrvAddr\" in your jboot.properties.");
            }
            this.rocketmqConfig = configModels.get(typeName);
        } else {
            this.rocketmqConfig = Jboot.config(JbootRocketmqConfig.class);
        }
    }

    @Override
    protected void onStartListening() {
        try {
            this.startQueueConsumer();
            this.startBroadcastConsumer();
        }
        catch (MQClientException e) {
            LOG.error(e.toString(), (Throwable)e);
        }
    }

    @Override
    protected void onStopListening() {
        if (this.queueConsumer != null) {
            this.queueConsumer.shutdown();
        }
        if (this.broadcastConsumer != null) {
            this.broadcastConsumer.shutdown();
        }
    }

    public void startQueueConsumer() throws MQClientException {
        this.queueConsumer = new DefaultMQPushConsumer(this.rocketmqConfig.getConsumerGroup());
        this.queueConsumer.setNamesrvAddr(this.rocketmqConfig.getNamesrvAddr());
        this.queueConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        if (StrUtil.isNotBlank(this.rocketmqConfig.getNamespace())) {
            this.queueConsumer.setNamespace(this.rocketmqConfig.getNamespace());
        }
        if (this.rocketmqConfig.getConsumeMessageBatchMaxSize() != null) {
            this.queueConsumer.setConsumeMessageBatchMaxSize(this.rocketmqConfig.getConsumeMessageBatchMaxSize().intValue());
        }
        for (String channel : this.channels) {
            this.queueConsumer.subscribe(channel, this.rocketmqConfig.getSubscribeSubExpression());
        }
        this.queueConsumer.registerMessageListener((msgs, context) -> {
            RokectmqMessageContext msgContext = new RokectmqMessageContext(this, msgs, context);
            if (msgs != null && !msgs.isEmpty()) {
                for (MessageExt messageExt : msgs) {
                    this.notifyListeners(messageExt.getTopic(), this.getSerializer().deserialize(messageExt.getBody()), msgContext);
                }
            }
            return msgContext.getReturnStatus();
        });
        this.queueConsumer.start();
    }

    public void startBroadcastConsumer() throws MQClientException {
        this.broadcastConsumer = new DefaultMQPushConsumer(this.rocketmqConfig.getBroadcastChannelPrefix() + this.rocketmqConfig.getConsumerGroup());
        this.broadcastConsumer.setNamesrvAddr(this.rocketmqConfig.getNamesrvAddr());
        if (StrUtil.isNotBlank(this.rocketmqConfig.getNamespace())) {
            this.broadcastConsumer.setNamespace(this.rocketmqConfig.getNamespace());
        }
        this.broadcastConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.broadcastConsumer.setMessageModel(MessageModel.BROADCASTING);
        if (this.rocketmqConfig.getConsumeMessageBatchMaxSize() != null) {
            this.broadcastConsumer.setConsumeMessageBatchMaxSize(this.rocketmqConfig.getConsumeMessageBatchMaxSize().intValue());
        }
        for (String channel : this.channels) {
            this.broadcastConsumer.subscribe(this.rocketmqConfig.getBroadcastChannelPrefix() + channel, this.rocketmqConfig.getSubscribeSubExpression());
        }
        int len = this.rocketmqConfig.getBroadcastChannelPrefix().length();
        this.broadcastConsumer.registerMessageListener((msgs, context) -> {
            RokectmqMessageContext rokectMqMessageInfo = new RokectmqMessageContext(this, msgs, context);
            if (msgs != null && !msgs.isEmpty()) {
                for (MessageExt messageExt : msgs) {
                    String topic = messageExt.getTopic();
                    this.notifyListeners(topic.substring(len), this.getSerializer().deserialize(messageExt.getBody()), rokectMqMessageInfo);
                }
            }
            return rokectMqMessageInfo.getReturnStatus();
        });
        this.broadcastConsumer.start();
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        this.doSendMessage(message, toChannel);
    }

    @Override
    public void publish(Object message, String toChannel) {
        this.doSendMessage(message, this.rocketmqConfig.getBroadcastChannelPrefix() + toChannel);
    }

    public void doSendMessage(Object message, String topic) {
        try {
            Message sendMsg = null;
            sendMsg = message instanceof Message ? (Message)message : new Message(topic, this.getSerializer().serialize(message));
            SendResult result = this.getMQProducer().send(sendMsg);
            if (result == null) {
                LOG.warn("Rockect mq send message fail!!!");
            }
        }
        catch (Exception e) {
            LOG.error(e.toString(), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MQProducer getMQProducer() throws MQClientException {
        if (this.mqProducer == null) {
            JbootRocketmqImpl jbootRocketmqImpl = this;
            synchronized (jbootRocketmqImpl) {
                if (this.mqProducer == null) {
                    this.createMqProducer();
                }
            }
        }
        return this.mqProducer;
    }

    public void createMqProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(this.rocketmqConfig.getProducerGroup());
        producer.setNamesrvAddr(this.rocketmqConfig.getNamesrvAddr());
        if (StrUtil.isNotBlank(this.rocketmqConfig.getNamespace())) {
            producer.setNamespace(this.rocketmqConfig.getNamespace());
        }
        if (StrUtil.isNotBlank(this.rocketmqConfig.getInstanceName())) {
            producer.setInstanceName(this.rocketmqConfig.getInstanceName());
        }
        if (StrUtil.isNotBlank(this.rocketmqConfig.getClientIP())) {
            producer.setClientIP(this.rocketmqConfig.getClientIP());
        }
        if (StrUtil.isNotBlank(this.rocketmqConfig.getCreateTopicKey())) {
            producer.setCreateTopicKey(this.rocketmqConfig.getCreateTopicKey());
        }
        if (this.rocketmqConfig.getUseTLS() != null) {
            producer.setUseTLS(this.rocketmqConfig.getUseTLS().booleanValue());
        }
        if (this.rocketmqConfig.getSendLatencyFaultEnable() != null) {
            producer.setSendLatencyFaultEnable(this.rocketmqConfig.getSendLatencyFaultEnable().booleanValue());
        }
        if (this.rocketmqConfig.getSendMessageWithVIPChannel() != null) {
            producer.setSendMessageWithVIPChannel(this.rocketmqConfig.getSendMessageWithVIPChannel().booleanValue());
        }
        if (this.rocketmqConfig.getSendMsgTimeout() != null) {
            producer.setSendMsgTimeout(this.rocketmqConfig.getSendMsgTimeout().intValue());
        }
        if (this.rocketmqConfig.getRetryAnotherBrokerWhenNotStoreOK() != null) {
            producer.setRetryAnotherBrokerWhenNotStoreOK(this.rocketmqConfig.getRetryAnotherBrokerWhenNotStoreOK().booleanValue());
        }
        if (this.rocketmqConfig.getRetryTimesWhenSendAsyncFailed() != null) {
            producer.setRetryTimesWhenSendAsyncFailed(this.rocketmqConfig.getRetryTimesWhenSendAsyncFailed().intValue());
        }
        if (this.rocketmqConfig.getRetryTimesWhenSendFailed() != null) {
            producer.setRetryTimesWhenSendFailed(this.rocketmqConfig.getRetryTimesWhenSendFailed().intValue());
        }
        this.mqProducer = producer;
        producer.start();
    }
}

