/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.consuming;

import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class RocketMQListenerBindingContainer
implements InitializingBean,
RocketMQListenerContainer,
SmartLifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQListenerBindingContainer.class);
    private long suspendCurrentQueueTimeMillis = 1000L;
    private int delayLevelWhenNextConsume = 0;
    private String nameServer;
    private String consumerGroup;
    private String topic;
    private int consumeThreadMax = 64;
    private String charset = "UTF-8";
    private RocketMQListener rocketMQListener;
    private RocketMQHeaderMapper headerMapper;
    private DefaultMQPushConsumer consumer;
    private boolean running;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
    private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
    private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
    private ConsumeMode consumeMode;
    private SelectorType selectorType;
    private String selectorExpression;
    private MessageModel messageModel;

    public RocketMQListenerBindingContainer(ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
        this.rocketMQConsumerProperties = rocketMQConsumerProperties;
        this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
        this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
        ConsumeMode consumeMode = this.consumeMode = ((RocketMQConsumerProperties)rocketMQConsumerProperties.getExtension()).getOrderly() != false ? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
        if (StringUtils.isEmpty((Object)((RocketMQConsumerProperties)rocketMQConsumerProperties.getExtension()).getSql())) {
            this.selectorType = SelectorType.TAG;
            this.selectorExpression = ((RocketMQConsumerProperties)rocketMQConsumerProperties.getExtension()).getTags();
        } else {
            this.selectorType = SelectorType.SQL92;
            this.selectorExpression = ((RocketMQConsumerProperties)rocketMQConsumerProperties.getExtension()).getSql();
        }
        this.messageModel = ((RocketMQConsumerProperties)rocketMQConsumerProperties.getExtension()).getBroadcasting() != false ? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
    }

    public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
        this.rocketMQListener = rocketMQListener;
    }

    public void destroy() throws Exception {
        this.setRunning(false);
        if (Objects.nonNull(this.consumer)) {
            this.consumer.shutdown();
        }
        log.info("container destroyed, {}", (Object)this.toString());
    }

    public void afterPropertiesSet() throws Exception {
        this.initRocketMQPushConsumer();
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }
        try {
            this.consumer.start();
        }
        catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
        this.setRunning(true);
        log.info("running container: {}", (Object)this.toString());
    }

    public void stop() {
        if (this.isRunning()) {
            if (Objects.nonNull(this.consumer)) {
                this.consumer.shutdown();
            }
            this.setRunning(false);
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    private void initRocketMQPushConsumer() throws MQClientException {
        Assert.notNull((Object)this.rocketMQListener, (String)"Property 'rocketMQListener' is required");
        Assert.notNull((Object)this.consumerGroup, (String)"Property 'consumerGroup' is required");
        Assert.notNull((Object)this.nameServer, (String)"Property 'nameServer' is required");
        Assert.notNull((Object)this.topic, (String)"Property 'topic' is required");
        String ak = this.rocketBinderConfigurationProperties.getAccessKey();
        String sk = this.rocketBinderConfigurationProperties.getSecretKey();
        if (!StringUtils.isEmpty((Object)ak) && !StringUtils.isEmpty((Object)sk)) {
            AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
            this.consumer = new DefaultMQPushConsumer(this.consumerGroup, (RPCHook)rpcHook, (AllocateMessageQueueStrategy)new AllocateMessageQueueAveragely(), this.rocketBinderConfigurationProperties.isEnableMsgTrace(), this.rocketBinderConfigurationProperties.getCustomizedTraceTopic());
            this.consumer.setInstanceName(RocketMQUtil.getInstanceName((RPCHook)rpcHook, (String)(this.topic + "|" + UtilAll.getPid())));
            this.consumer.setVipChannelEnabled(false);
        } else {
            this.consumer = new DefaultMQPushConsumer(this.consumerGroup, this.rocketBinderConfigurationProperties.isEnableMsgTrace(), this.rocketBinderConfigurationProperties.getCustomizedTraceTopic());
        }
        this.consumer.setNamesrvAddr(this.nameServer);
        this.consumer.setConsumeThreadMax(this.rocketMQConsumerProperties.getConcurrency());
        this.consumer.setConsumeThreadMin(this.rocketMQConsumerProperties.getConcurrency());
        switch (this.messageModel) {
            case BROADCASTING: {
                this.consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            }
            case CLUSTERING: {
                this.consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
            }
        }
        switch (this.selectorType) {
            case TAG: {
                this.consumer.subscribe(this.topic, this.selectorExpression);
                break;
            }
            case SQL92: {
                this.consumer.subscribe(this.topic, MessageSelector.bySql((String)this.selectorExpression));
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
            }
        }
        switch (this.consumeMode) {
            case ORDERLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerOrderly());
                break;
            }
            case CONCURRENTLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerConcurrently());
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
        }
        if (this.rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener)this.rocketMQListener).prepareStart((Object)this.consumer);
        }
    }

    public String toString() {
        return "RocketMQListenerBindingContainer{consumerGroup='" + this.consumerGroup + '\'' + ", nameServer='" + this.nameServer + '\'' + ", topic='" + this.topic + '\'' + ", consumeMode=" + this.consumeMode + ", selectorType=" + this.selectorType + ", selectorExpression='" + this.selectorExpression + '\'' + ", messageModel=" + this.messageModel + '}';
    }

    public long getSuspendCurrentQueueTimeMillis() {
        return this.suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    public int getDelayLevelWhenNextConsume() {
        return this.delayLevelWhenNextConsume;
    }

    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
    }

    public String getNameServer() {
        return this.nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getConsumeThreadMax() {
        return this.consumeThreadMax;
    }

    public void setConsumeThreadMax(int consumeThreadMax) {
        this.consumeThreadMax = consumeThreadMax;
    }

    public String getCharset() {
        return this.charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public RocketMQListener getRocketMQListener() {
        return this.rocketMQListener;
    }

    public void setRocketMQListener(RocketMQListener rocketMQListener) {
        this.rocketMQListener = rocketMQListener;
    }

    public DefaultMQPushConsumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
        return this.rocketMQConsumerProperties;
    }

    public ConsumeMode getConsumeMode() {
        return this.consumeMode;
    }

    public SelectorType getSelectorType() {
        return this.selectorType;
    }

    public String getSelectorExpression() {
        return this.selectorExpression;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public RocketMQHeaderMapper getHeaderMapper() {
        return this.headerMapper;
    }

    public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
        this.headerMapper = headerMapper;
    }

    private Message convertToSpringMessage(MessageExt messageExt) {
        int reconsumeTimes = messageExt.getReconsumeTimes();
        messageExt.putUserProperty("rocketmq_RECONSUME_TIMES", String.valueOf(reconsumeTimes));
        Message message = RocketMQUtil.convertToSpringMessage((MessageExt)messageExt);
        return MessageBuilder.fromMessage((Message)message).copyHeaders((Map)this.headerMapper.toHeaders(messageExt.getProperties())).build();
    }

    public class DefaultMessageListenerOrderly
    implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", (Object)messageExt);
                try {
                    long now = System.currentTimeMillis();
                    RocketMQListenerBindingContainer.this.rocketMQListener.onMessage((Object)RocketMQListenerBindingContainer.this.convertToSpringMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} message key:[{}] cost: {} ms", new Object[]{messageExt.getMsgId(), messageExt.getKeys(), costTime});
                }
                catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                    context.setSuspendCurrentQueueTimeMillis(RocketMQListenerBindingContainer.this.suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public class DefaultMessageListenerConcurrently
    implements MessageListenerConcurrently {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", (Object)messageExt);
                try {
                    long now = System.currentTimeMillis();
                    RocketMQListenerBindingContainer.this.rocketMQListener.onMessage((Object)RocketMQListenerBindingContainer.this.convertToSpringMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} message key:[{}] cost: {} ms", new Object[]{messageExt.getMsgId(), messageExt.getKeys(), costTime});
                }
                catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", (Object)messageExt, (Object)e);
                    context.setDelayLevelWhenNextConsume(RocketMQListenerBindingContainer.this.delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}

