/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.rocketmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.CanalMessageSerializer;
import com.alibaba.otter.canal.common.MQMessageUtils;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.exception.CanalServerException;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CanalRocketMQProducer
implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQProducer.class);
    private DefaultMQProducer defaultMQProducer;
    private MQProperties mqProperties;

    @Override
    public void init(MQProperties rocketMQProperties) {
        this.mqProperties = rocketMQProperties;
        ClientRPCHook rpcHook = null;
        if (rocketMQProperties.getAliyunAccessKey().length() > 0 && rocketMQProperties.getAliyunSecretKey().length() > 0) {
            SessionCredentials sessionCredentials = new SessionCredentials();
            sessionCredentials.setAccessKey(rocketMQProperties.getAliyunAccessKey());
            sessionCredentials.setSecretKey(rocketMQProperties.getAliyunSecretKey());
            rpcHook = new ClientRPCHook(sessionCredentials);
        }
        this.defaultMQProducer = new DefaultMQProducer(rocketMQProperties.getProducerGroup(), rpcHook);
        this.defaultMQProducer.setNamesrvAddr(rocketMQProperties.getServers());
        this.defaultMQProducer.setRetryTimesWhenSendFailed(rocketMQProperties.getRetries());
        this.defaultMQProducer.setVipChannelEnabled(false);
        logger.info("##Start RocketMQ producer##");
        try {
            this.defaultMQProducer.start();
        }
        catch (MQClientException ex) {
            throw new CanalServerException("Start RocketMQ producer error", ex);
        }
    }

    @Override
    public void send(MQProperties.CanalDestination destination, Message data, CanalMQProducer.Callback callback) {
        try {
            if (!StringUtils.isEmpty((String)destination.getDynamicTopic())) {
                Map<String, Message> messageMap = MQMessageUtils.messageTopics(data, destination.getTopic(), destination.getDynamicTopic());
                for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
                    String topicName = entry.getKey().replace('.', '_');
                    Message messageSub = entry.getValue();
                    this.send(destination, topicName, messageSub);
                }
            } else {
                this.send(destination, destination.getTopic(), data);
            }
            callback.commit();
        }
        catch (Throwable e) {
            callback.rollback();
        }
    }

    public void send(MQProperties.CanalDestination destination, String topicName, Message data) throws Exception {
        if (!this.mqProperties.getFlatMessage()) {
            try {
                if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                    Message[] messages = MQMessageUtils.messagePartition(data, destination.getPartitionsNum(), destination.getPartitionHash());
                    int length = messages.length;
                    for (int i = 0; i < length; ++i) {
                        Message dataPartition = messages[i];
                        if (dataPartition == null) continue;
                        if (logger.isDebugEnabled()) {
                            logger.debug("flatMessagePart: {}, partition: {}", (Object)JSON.toJSONString((Object)dataPartition, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), (Object)i);
                        }
                        final int index = i;
                        try {
                            org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topicName, CanalMessageSerializer.serializer(dataPartition, this.mqProperties.isFilterTransactionEntry()));
                            this.defaultMQProducer.send(message, new MessageQueueSelector(){

                                public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg, Object arg) {
                                    if (index > mqs.size()) {
                                        return mqs.get(index % mqs.size());
                                    }
                                    return mqs.get(index);
                                }
                            }, null);
                            continue;
                        }
                        catch (Exception e) {
                            logger.error("send flat message to hashed partition error", (Throwable)e);
                            throw e;
                        }
                    }
                }
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topicName, CanalMessageSerializer.serializer(data, this.mqProperties.isFilterTransactionEntry()));
                if (logger.isDebugEnabled()) {
                    logger.debug("send message:{} to destination:{}, partition: {}", new Object[]{message, destination.getCanalDestination(), partition});
                }
                this.defaultMQProducer.send(message, new MessageQueueSelector(){

                    public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg, Object arg) {
                        if (partition > mqs.size()) {
                            return mqs.get(partition % mqs.size());
                        }
                        return mqs.get(partition);
                    }
                }, null);
            }
            catch (InterruptedException | MQBrokerException | MQClientException | RemotingException e) {
                logger.error("Send message error!", e);
                throw e;
            }
        } else {
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(data);
            if (flatMessages != null) {
                for (FlatMessage flatMessage : flatMessages) {
                    if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, destination.getPartitionsNum(), destination.getPartitionHash());
                        int length = partitionFlatMessage.length;
                        for (int i = 0; i < length; ++i) {
                            FlatMessage flatMessagePart = partitionFlatMessage[i];
                            if (flatMessagePart == null) continue;
                            if (logger.isDebugEnabled()) {
                                logger.debug("flatMessagePart: {}, partition: {}", (Object)JSON.toJSONString((Object)flatMessagePart, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), (Object)i);
                            }
                            final int index = i;
                            try {
                                org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topicName, JSON.toJSONString((Object)flatMessagePart, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}).getBytes());
                                this.defaultMQProducer.send(message, new MessageQueueSelector(){

                                    public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg, Object arg) {
                                        if (index > mqs.size()) {
                                            return mqs.get(index % mqs.size());
                                        }
                                        return mqs.get(index);
                                    }
                                }, null);
                                continue;
                            }
                            catch (Exception e) {
                                logger.error("send flat message to hashed partition error", (Throwable)e);
                                throw e;
                            }
                        }
                        continue;
                    }
                    try {
                        int partition;
                        int n = partition = destination.getPartition() != null ? destination.getPartition() : 0;
                        if (logger.isDebugEnabled()) {
                            logger.debug("send message: {} to topic: {} fixed partition: {}", new Object[]{JSON.toJSONString((Object)flatMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), topicName, partition});
                        }
                        org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topicName, JSON.toJSONString((Object)flatMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}).getBytes());
                        this.defaultMQProducer.send(message, new MessageQueueSelector(){

                            public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg, Object arg) {
                                if (partition > mqs.size()) {
                                    return mqs.get(partition % mqs.size());
                                }
                                return mqs.get(partition);
                            }
                        }, null);
                    }
                    catch (Exception e) {
                        logger.error("send flat message to fixed partition error", (Throwable)e);
                        throw e;
                    }
                }
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("send message to rocket topic: {}", (Object)destination.getTopic());
        }
    }

    @Override
    public void stop() {
        logger.info("## Stop RocketMQ producer##");
        this.defaultMQProducer.shutdown();
    }
}

