/*
 * Decompiled with CFR 0.152.
 */
package io.jboot.components.mq.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import io.jboot.Jboot;
import io.jboot.app.JbootApplicationConfig;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.components.mq.rabbitmq.JbootRabbitmqConfig;
import io.jboot.components.mq.rabbitmq.RabbitmqMessageContext;
import io.jboot.exception.JbootException;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class JbootRabbitmqImpl
extends JbootmqBase
implements Jbootmq {
    private Connection connection;
    private Map<String, Channel> channelMap = new ConcurrentHashMap<String, Channel>();
    private JbootRabbitmqConfig rabbitmqConfig;
    private JbootApplicationConfig appConfig;

    public JbootRabbitmqImpl(JbootmqConfig config) {
        super(config);
        String typeName = config.getTypeName();
        if (StrUtil.isNotBlank(typeName)) {
            Map<String, JbootRabbitmqConfig> configModels = ConfigUtil.getConfigModels(JbootRabbitmqConfig.class);
            if (!configModels.containsKey(typeName)) {
                throw new JbootIllegalConfigException("Please config \"jboot.mq.rabbitmq." + typeName + ".host\" in your jboot.properties.");
            }
            this.rabbitmqConfig = configModels.get(typeName);
        } else {
            this.rabbitmqConfig = Jboot.config(JbootRabbitmqConfig.class);
        }
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(this.rabbitmqConfig.getHost());
            factory.setPort(this.rabbitmqConfig.getPort());
            if (StrUtil.isNotBlank(this.rabbitmqConfig.getVirtualHost())) {
                factory.setVirtualHost(this.rabbitmqConfig.getVirtualHost());
            }
            if (StrUtil.isNotBlank(this.rabbitmqConfig.getUsername())) {
                factory.setUsername(this.rabbitmqConfig.getUsername());
            }
            if (StrUtil.isNotBlank(this.rabbitmqConfig.getPassword())) {
                factory.setPassword(this.rabbitmqConfig.getPassword());
            }
            this.connection = factory.newConnection();
        }
        catch (Exception e) {
            throw new JbootException("Can not connection rabbitmq server", e);
        }
    }

    @Override
    protected void onStartListening() {
        for (String toChannel : this.channels) {
            if (this.rabbitmqConfig.isBroadcastEnable()) {
                Channel broadcastChannel = this.getChannel(toChannel, false);
                this.bindChannel(broadcastChannel, this.buildBroadcastChannelName(toChannel), toChannel);
            }
            if (!this.rabbitmqConfig.isQueueEnable()) continue;
            Channel queueChannel = this.getChannel(toChannel, true);
            this.bindChannel(queueChannel, toChannel, toChannel);
        }
    }

    @Override
    protected void onStopListening() {
        this.connection.abort();
    }

    public void bindChannel(final Channel channel, String name, final String orginaChannelName) {
        if (channel != null) {
            try {
                channel.basicConsume(name, this.rabbitmqConfig.isAutoAck(), (Consumer)new DefaultConsumer(channel){

                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        Object o = JbootRabbitmqImpl.this.getSerializer().deserialize(body);
                        JbootRabbitmqImpl.this.notifyListeners(orginaChannelName, o, new RabbitmqMessageContext(JbootRabbitmqImpl.this, channel, consumerTag, envelope, properties));
                    }
                });
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public synchronized Channel getChannel(String toChannel, boolean queueMode) {
        Channel channel = this.channelMap.get(toChannel + queueMode);
        if (channel == null) {
            try {
                channel = this.connection.createChannel();
                if (queueMode) {
                    channel.queueDeclare(toChannel, this.rabbitmqConfig.isQueueDeclareDurable(), this.rabbitmqConfig.isQueueDeclareExclusive(), this.rabbitmqConfig.isQueueDeclareAutoDelete(), null);
                } else {
                    channel.queueDeclare(this.buildBroadcastChannelName(toChannel), this.rabbitmqConfig.isBroadcastQueueDeclareDurable(), this.rabbitmqConfig.isBroadcastQueueDeclareExclusive(), this.rabbitmqConfig.isBroadcastQueueDeclareAutoDelete(), null);
                    BuiltinExchangeType exchangeType = BuiltinExchangeType.FANOUT;
                    for (BuiltinExchangeType type : BuiltinExchangeType.values()) {
                        if (!type.getType().equals(this.rabbitmqConfig.getBroadcastExchangeDeclareExchangeType())) continue;
                        exchangeType = type;
                    }
                    channel.exchangeDeclare(toChannel, exchangeType, this.rabbitmqConfig.isBroadcastExchangeDeclareDurable());
                    channel.queueBind(this.buildBroadcastChannelName(toChannel), toChannel, this.rabbitmqConfig.getBroadcastChannelRoutingKey());
                }
            }
            catch (Exception ex) {
                throw new JbootException("Can not create rabbit mq channel.", ex);
            }
            this.channelMap.put(toChannel + queueMode, channel);
        }
        return channel;
    }

    public String buildBroadcastChannelName(String channel) {
        return this.rabbitmqConfig.getBroadcastChannelPrefix() + channel;
    }

    @Override
    public void enqueue(Object message, String toChannel) {
        Channel channel = this.getChannel(toChannel, true);
        try {
            byte[] bytes = this.getSerializer().serialize(message);
            channel.basicPublish("", toChannel, MessageProperties.BASIC, bytes);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void publish(Object message, String toChannel) {
        Channel channel = this.getChannel(toChannel, false);
        try {
            byte[] bytes = this.getSerializer().serialize(message);
            channel.basicPublish(toChannel, "", MessageProperties.BASIC, bytes);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }
}

