/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.common.MQMessageUtils;
import com.alibaba.otter.canal.common.MQProperties;
import com.alibaba.otter.canal.kafka.MessageSerializer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.spi.CanalMQProducer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CanalKafkaProducer
implements CanalMQProducer {
    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
    private Producer<String, Message> producer;
    private Producer<String, String> producer2;
    private MQProperties kafkaProperties;

    @Override
    public void init(MQProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", kafkaProperties.getAcks());
        properties.put("compression.type", kafkaProperties.getCompressionType());
        properties.put("batch.size", (Object)kafkaProperties.getBatchSize());
        properties.put("linger.ms", (Object)kafkaProperties.getLingerMs());
        properties.put("max.request.size", (Object)kafkaProperties.getMaxRequestSize());
        properties.put("buffer.memory", (Object)kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("max.in.flight.requests.per.connection", (Object)1);
        if (!kafkaProperties.getProperties().isEmpty()) {
            properties.putAll((Map<?, ?>)kafkaProperties.getProperties());
        }
        if (kafkaProperties.getTransaction()) {
            properties.put("transactional.id", "canal-transactional-id");
        } else {
            properties.put("retries", (Object)kafkaProperties.getRetries());
        }
        if (!kafkaProperties.getFlatMessage()) {
            properties.put("value.serializer", MessageSerializer.class.getName());
            this.producer = new KafkaProducer(properties);
        } else {
            properties.put("value.serializer", StringSerializer.class.getName());
            this.producer2 = new KafkaProducer(properties);
        }
        if (kafkaProperties.getTransaction()) {
            if (!kafkaProperties.getFlatMessage()) {
                this.producer.initTransactions();
            } else {
                this.producer2.initTransactions();
            }
        }
    }

    @Override
    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            if (this.producer != null) {
                this.producer.close();
            }
            if (this.producer2 != null) {
                this.producer2.close();
            }
        }
        catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        }
        finally {
            logger.info("## kafka producer is down.");
        }
    }

    @Override
    public void send(MQProperties.CanalDestination canalDestination, Message message, CanalMQProducer.Callback callback) {
        Object producerTmp = !this.kafkaProperties.getFlatMessage() ? this.producer : this.producer2;
        if (this.kafkaProperties.getTransaction()) {
            producerTmp.beginTransaction();
        }
        try {
            if (!StringUtils.isEmpty((String)canalDestination.getDynamicTopic())) {
                Map<String, Message> messageMap = MQMessageUtils.messageTopics(message, canalDestination.getTopic(), canalDestination.getDynamicTopic());
                for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
                    String topicName = entry.getKey().replace('.', '_');
                    Message messageSub = entry.getValue();
                    if (logger.isDebugEnabled()) {
                        logger.debug("## Send message to kafka topic: " + topicName);
                    }
                    this.send(canalDestination, topicName, messageSub);
                }
            } else {
                this.send(canalDestination, canalDestination.getTopic(), message);
            }
            if (this.kafkaProperties.getTransaction()) {
                producerTmp.commitTransaction();
            }
            callback.commit();
        }
        catch (Throwable e) {
            logger.error(e.getMessage(), e);
            if (this.kafkaProperties.getTransaction()) {
                producerTmp.abortTransaction();
            }
            callback.rollback();
        }
    }

    private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message) throws Exception {
        block10: {
            block9: {
                if (this.kafkaProperties.getFlatMessage()) break block9;
                ArrayList<ProducerRecord> records = new ArrayList<ProducerRecord>();
                if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                    Message[] messages = MQMessageUtils.messagePartition(message, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                    int length = messages.length;
                    for (int i = 0; i < length; ++i) {
                        Message messagePartition = messages[i];
                        if (messagePartition == null) continue;
                        records.add(new ProducerRecord(topicName, Integer.valueOf(i), null, (Object)messagePartition));
                    }
                } else {
                    int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                    records.add(new ProducerRecord(topicName, Integer.valueOf(partition), null, (Object)message));
                }
                if (records.isEmpty()) break block10;
                for (ProducerRecord record : records) {
                    this.producer.send(record).get();
                }
                if (!logger.isDebugEnabled()) break block10;
                logger.debug("Send  message to kafka topic: [{}], packet: {}", (Object)topicName, (Object)message.toString());
                break block10;
            }
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
            if (flatMessages != null) {
                for (FlatMessage flatMessage : flatMessages) {
                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                        int length = partitionFlatMessage.length;
                        for (int i = 0; i < length; ++i) {
                            FlatMessage flatMessagePart = partitionFlatMessage[i];
                            if (flatMessagePart == null) continue;
                            this.produce(topicName, i, flatMessagePart);
                        }
                    } else {
                        int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                        this.produce(topicName, partition, flatMessage);
                    }
                    if (!logger.isDebugEnabled()) continue;
                    logger.debug("Send flat message to kafka topic: [{}], packet: {}", (Object)topicName, (Object)JSON.toJSONString((Object)flatMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}));
                }
            }
        }
    }

    private void produce(String topicName, int partition, FlatMessage flatMessage) throws ExecutionException, InterruptedException {
        ProducerRecord record = new ProducerRecord(topicName, Integer.valueOf(partition), null, (Object)JSON.toJSONString((Object)flatMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}));
        if (this.kafkaProperties.getTransaction()) {
            this.producer2.send(record);
        } else {
            this.producer2.send(record);
        }
    }
}

