/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.qpidmq;

import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.components.mq.qpidmq.JbootQpidmqConfig;
import io.jboot.components.mq.qpidmq.QpidmqMessageContext;
import io.jboot.exception.JbootException;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.utils.ArrayUtil;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.Connection;

public class JbootQpidmqImpl
extends JbootmqBase
implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootQpidmqImpl.class);
    private Connection connection = null;
    private boolean serializerEnable = true;
    private JbootQpidmqConfig qpidConfig = null;
    private Thread queueThread;
    private Thread topicThread;

    public JbootQpidmqImpl(JbootmqConfig config) {
        super(config);
        String typeName = config.getTypeName();
        if (StrUtil.isNotBlank(typeName)) {
            Map<String, JbootQpidmqConfig> configModels = ConfigUtil.getConfigModels(JbootQpidmqConfig.class);
            if (!configModels.containsKey(typeName)) {
                throw new JbootIllegalConfigException("Please config \"jboot.mq.qpid." + typeName + ".host\" in your jboot.properties.");
            }
            this.qpidConfig = configModels.get(typeName);
        } else {
            this.qpidConfig = Jboot.config(JbootQpidmqConfig.class);
        }
        this.serializerEnable = this.qpidConfig.isSerializerEnable();
        try {
            String url = this.getConnectionUrl();
            this.connection = new AMQConnection(url);
            this.connection.start();
        }
        catch (Exception e) {
            throw new JbootException("can not connection qpidmq server", e);
        }
    }

    @Override
    protected void onStartListening() {
        try {
            this.startReceiveMsgThread();
        }
        catch (Exception e) {
            throw new JbootException(e.toString(), e);
        }
    }

    @Override
    protected void onStopListening() {
        this.queueThread.interrupt();
        this.topicThread.interrupt();
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        String addr = this.getQueueAddr(toChannel);
        this.sendMsg(addr, message);
    }

    @Override
    public void publish(Object message, String toChannel) {
        String addr = this.getTopicAddr(toChannel);
        this.sendMsg(addr, message);
    }

    public void sendMsg(String addr, Object message) {
        try {
            Session session = this.connection.createSession(false, 1);
            AMQAnyDestination destination = new AMQAnyDestination(addr.toString());
            MessageProducer producer = session.createProducer((Destination)destination);
            producer.setTimeToLive(30000L);
            Message sendMsg = null;
            if (message instanceof Message) {
                sendMsg = (Message)message;
            } else if (!this.serializerEnable) {
                sendMsg = session.createTextMessage((String)message);
            } else {
                byte[] data = this.getSerializer().serialize(message);
                sendMsg = session.createBytesMessage();
                sendMsg.setIntProperty("data-len", data.length);
                ((BytesMessage)sendMsg).writeBytes(data);
            }
            producer.send(sendMsg);
        }
        catch (Exception e) {
            LOG.error(e.toString(), (Throwable)e);
        }
    }

    public String getConnectionUrl() {
        String[] hosts;
        StringBuffer url = new StringBuffer();
        url.append("amqp://");
        url.append(this.qpidConfig.getUsername());
        url.append(":");
        url.append(this.qpidConfig.getPassword());
        url.append("@");
        url.append("/");
        url.append(this.qpidConfig.getVirtualHost());
        url.append("?failover='roundrobin'");
        url.append("&brokerlist='");
        String host = this.qpidConfig.getHost();
        for (String h : hosts = host.split(",")) {
            if (StrUtil.isBlank((String)h)) continue;
            url.append("tcp://" + h + ";");
        }
        url.append("'");
        return url.toString();
    }

    public String getQueueAddr(String channel) {
        StringBuffer addr = new StringBuffer();
        addr.append("ADDR:");
        addr.append(channel);
        addr.append(";{create:always}");
        return addr.toString();
    }

    public String getTopicAddr(String channel) {
        StringBuffer addr = new StringBuffer();
        addr.append("ADDR:amq.topic/");
        addr.append(channel);
        return addr.toString();
    }

    public void startReceiveMsgThread() throws Exception {
        if (ArrayUtil.isNullOrEmpty(this.channels)) {
            return;
        }
        for (String channel : this.channels) {
            Session session = this.connection.createSession(false, 1);
            String queueAddr = this.getQueueAddr(channel);
            AMQAnyDestination queue = new AMQAnyDestination(queueAddr);
            MessageConsumer queueConsumer = session.createConsumer((Destination)queue);
            this.queueThread = new Thread(new ReceiveMsgThread(queueConsumer, channel, this.serializerEnable));
            this.queueThread.start();
            String topicAddr = this.getTopicAddr(channel);
            AMQAnyDestination topic = new AMQAnyDestination(topicAddr);
            MessageConsumer topicConsumer = session.createConsumer((Destination)topic);
            this.topicThread = new Thread(new ReceiveMsgThread(topicConsumer, channel, this.serializerEnable));
            this.topicThread.start();
        }
    }

    private class ReceiveMsgThread
    implements Runnable {
        private MessageConsumer consumer;
        private String channel;
        private boolean serializerEnable;

        public ReceiveMsgThread(MessageConsumer consumer, String channel, boolean serializerEnable) {
            this.consumer = consumer;
            this.channel = channel;
            this.serializerEnable = serializerEnable;
        }

        @Override
        public void run() {
            try {
                while (JbootQpidmqImpl.this.isStarted) {
                    Message message = this.consumer.receive();
                    if (message == null) continue;
                    Object object = null;
                    if (!this.serializerEnable) {
                        TextMessage textMessage = (TextMessage)message;
                        object = textMessage.getText();
                    } else {
                        byte[] data;
                        BytesMessage bytesMessage = (BytesMessage)message;
                        int dataLen = bytesMessage.getIntProperty("data-len");
                        if (dataLen != bytesMessage.readBytes(data = new byte[dataLen])) continue;
                        object = JbootQpidmqImpl.this.getSerializer().deserialize(data);
                    }
                    if (object == null) continue;
                    JbootQpidmqImpl.this.notifyListeners(this.channel, object, new QpidmqMessageContext(JbootQpidmqImpl.this, message));
                }
            }
            catch (Exception e) {
                LOG.error(e.toString(), (Throwable)e);
            }
        }
    }
}

