/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyByGroupService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.QueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.QueuePair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.Pair;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageConst;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.google.common.base.Objects;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class MergeRequest
implements Runnable {
    private static final InternalLogger LOG = ClientLogger.getLog();
    private final ConsumeMessageOrderlyByGroupService cs;
    private QueueGroup queueGroup;
    private InterruptCode interruptCode;
    private boolean isInterrupted = false;
    private int tryMergeCount = 0;

    public MergeRequest(QueueGroup queueGroup, ConsumeMessageOrderlyByGroupService cs) {
        this.queueGroup = queueGroup;
        this.cs = cs;
    }

    public void setInterrupted(boolean interrupted) {
        this.isInterrupted = interrupted;
    }

    public boolean isInterrupted() {
        return this.isInterrupted;
    }

    public InterruptCode getInterruptCode() {
        return this.interruptCode;
    }

    public void setInterruptCode(InterruptCode interruptCode) {
        this.interruptCode = interruptCode;
    }

    @Override
    public void run() {
        if (MessageModel.BROADCASTING.equals((Object)this.cs.getDefaultMQPushConsumerImpl().messageModel()) || this.queueGroup.getProcessQueueGroup().isLocked() && !this.queueGroup.getProcessQueueGroup().isLockExpired()) {
            if (this.queueGroup.getProcessQueueGroup().getProcessQueueStatus() != ProcessQueueGroup.ProcessQueueGroupStatus.NOT_DROPPED) {
                LOG.warn("the message queue group not be able to consume, because it's dropped. {}", (Object)this.queueGroup.getMessageQueueGroup());
                this.setInterruptCode(InterruptCode.REMOVE_REQUEST);
                this.setInterrupted(true);
                return;
            }
            if (MessageModel.CLUSTERING.equals((Object)this.cs.getDefaultMQPushConsumerImpl().messageModel()) && !this.queueGroup.getProcessQueueGroup().isLocked()) {
                LOG.warn("the message queue group not locked, so consume later, {}", (Object)this.queueGroup.getMessageQueueGroup());
                this.setInterruptCode(InterruptCode.LOCK_LATER);
                this.setInterrupted(true);
                return;
            }
            if (MessageModel.CLUSTERING.equals((Object)this.cs.getDefaultMQPushConsumerImpl().messageModel()) && this.queueGroup.getProcessQueueGroup().isLockExpired()) {
                LOG.warn("the message queue group lock expired, so consume later, {}", (Object)this.queueGroup.getMessageQueueGroup());
                this.setInterruptCode(InterruptCode.LOCK_LATER);
                this.setInterrupted(true);
                return;
            }
            List<Pair<QueuePair, List<MessageExt>>> msgPairList = this.tryMergeMessage(this.queueGroup.getQueuePairs());
            if (msgPairList == null) {
                this.setInterruptCode(InterruptCode.UPDATE_LATER);
                this.setInterrupted(true);
                return;
            }
            if (msgPairList.isEmpty()) {
                if (this.queueGroup.isAllQueueGroupEmpty()) {
                    this.setInterruptCode(InterruptCode.REMOVE_REQUEST);
                    this.setInterrupted(true);
                }
                return;
            }
            this.cs.submitConsumeRequest(msgPairList, this.queueGroup.getMessageQueueGroup());
        } else {
            if (this.queueGroup.getProcessQueueGroup().getProcessQueueStatus() != ProcessQueueGroup.ProcessQueueGroupStatus.NOT_DROPPED) {
                LOG.warn("the message queue group not be able to consume, because it's dropped. {}", (Object)this.queueGroup.getMessageQueueGroup());
                this.setInterruptCode(InterruptCode.REMOVE_REQUEST);
                this.setInterrupted(true);
                return;
            }
            LOG.warn("the message queue group not locked or lock expired, so consume later, {}", (Object)this.queueGroup.getMessageQueueGroup());
            this.setInterruptCode(InterruptCode.LOCK_LATER);
            this.setInterrupted(true);
        }
    }

    List<Pair<QueuePair, List<MessageExt>>> tryMergeMessage(Set<QueuePair> queuesToMerge) {
        ++this.tryMergeCount;
        List<Pair<QueuePair, List<MessageExt>>> msgPairList = new ArrayList<Pair<QueuePair, List<MessageExt>>>();
        ArrayList<QueuePair> queuePairsToCheck = new ArrayList<QueuePair>();
        for (QueuePair queuePair : queuesToMerge) {
            long unmergedMessageSize = queuePair.getProcessQueue().unmergedMessageSize();
            if (unmergedMessageSize > 0L) {
                queuePair.getProcessQueue().setSafeMergeSize(unmergedMessageSize);
                continue;
            }
            queuePairsToCheck.add(queuePair);
        }
        if (queuePairsToCheck.size() == queuesToMerge.size()) {
            return msgPairList;
        }
        if (queuePairsToCheck.size() > 0 && this.tryMergeCount <= 1) {
            this.setInterruptCode(InterruptCode.MERGE_LATER);
            this.setInterrupted(true);
            return msgPairList;
        }
        for (QueuePair queuePair : queuePairsToCheck) {
            try {
                long maxOffset = this.cs.getDefaultMQPushConsumerImpl().maxOffset(queuePair.getMessageQueue(), false);
                long nextOffset = queuePair.getProcessQueue().getNextOffset();
                long unmergedMessageSize = queuePair.getProcessQueue().unmergedMessageSize();
                if (maxOffset > nextOffset || unmergedMessageSize > 0L) {
                    return msgPairList;
                }
                queuePair.getProcessQueue().setSafeMergeSize(0L);
            }
            catch (MQClientException e) {
                LOG.error("tryMergeMessage check maxOffset exception", e);
                return msgPairList;
            }
        }
        msgPairList = this.mergeMessage(queuesToMerge);
        this.tryMergeCount = 0;
        return msgPairList;
    }

    private List<Pair<QueuePair, List<MessageExt>>> mergeMessage(Set<QueuePair> queuesToMerge) {
        ArrayList<Pair<QueuePair, List<MessageExt>>> msgPairList = new ArrayList<Pair<QueuePair, List<MessageExt>>>();
        Pair messagePair = null;
        HashMap<String, List<MessageExt>> messageMap = new HashMap<String, List<MessageExt>>();
        HashMap<String, QueuePair> queueMap = new HashMap<String, QueuePair>();
        HashMap<Object, Integer> localProgressMap = new HashMap<Object, Integer>();
        for (QueuePair queuePair : queuesToMerge) {
            List<MessageExt> pqMsgList = queuePair.getProcessQueue().peekMessagesToMerge();
            if (pqMsgList.isEmpty()) continue;
            String key = MessageConst.withPrefix("HA_ORDER_MQ_OFFSET/", queuePair.getMessageQueue().generateKey());
            localProgressMap.put(key, 0);
            queueMap.put(key, queuePair);
            messageMap.put(key, pqMsgList);
        }
        if (messageMap.isEmpty()) {
            return msgPairList;
        }
        if (!this.isQueueGroupSizeValid(messageMap)) {
            return null;
        }
        if (messageMap.size() == 1) {
            QueuePair queuePair = (QueuePair)queueMap.values().iterator().next();
            List msgList = (List)messageMap.values().iterator().next();
            queuePair.getProcessQueue().getMergeProgress().addAndGet(msgList.size());
            msgPairList.add(new Pair<QueuePair, List>(queuePair, msgList));
            return msgPairList;
        }
        ArrayList<MessageExtWrapper> sortedMsgList = new ArrayList<MessageExtWrapper>();
        LinkedList<MessageExtWrapper> tmpSortList = new LinkedList<MessageExtWrapper>();
        for (String key : messageMap.keySet()) {
            tmpSortList.add(new MessageExtWrapper(key, (MessageExt)((List)messageMap.get(key)).get(0)));
        }
        while (!tmpSortList.isEmpty()) {
            MessageExtWrapper earliestMsg = MergeRequest.pollEarliestMsg(tmpSortList);
            sortedMsgList.add(earliestMsg);
            String mqKey = earliestMsg.getKey();
            int sortProgress = (Integer)localProgressMap.get(mqKey) + 1;
            if (sortProgress == ((List)messageMap.get(mqKey)).size()) continue;
            tmpSortList.add(new MessageExtWrapper(mqKey, (MessageExt)((List)messageMap.get(mqKey)).get(sortProgress)));
            localProgressMap.put(mqKey, sortProgress);
        }
        for (String key : localProgressMap.keySet()) {
            localProgressMap.put(key, 0);
        }
        MessageQueue previousMq = null;
        for (MessageExtWrapper msgWrapper : sortedMsgList) {
            String mqKey = msgWrapper.getKey();
            if (previousMq == null) {
                previousMq = ((QueuePair)queueMap.get(mqKey)).getMessageQueue();
                messagePair = new Pair(queueMap.get(mqKey), new ArrayList());
            }
            if (!((QueuePair)queueMap.get(mqKey)).getMessageQueue().equals(previousMq)) {
                msgPairList.add(messagePair);
                previousMq = ((QueuePair)queueMap.get(mqKey)).getMessageQueue();
                messagePair = new Pair(queueMap.get(mqKey), new ArrayList());
            }
            ((List)messagePair.getObject2()).add(msgWrapper.getMsgExt());
            int mergeProgress = (Integer)localProgressMap.get(mqKey) + 1;
            ProcessQueue pq = ((QueuePair)queueMap.get(mqKey)).getProcessQueue();
            pq.getMergeProgress().incrementAndGet();
            if (mergeProgress == ((List)messageMap.get(mqKey)).size()) {
                msgPairList.add(messagePair);
                break;
            }
            localProgressMap.put(mqKey, mergeProgress);
        }
        return msgPairList;
    }

    private boolean isQueueGroupSizeValid(Map<String, List<MessageExt>> msgMap) {
        for (String mqKey : msgMap.keySet()) {
            List<MessageExt> msgs = msgMap.get(mqKey);
            for (MessageExt msg : msgs) {
                int queueGroupSize;
                String queueGroupInfo = MessageAccessor.getQueueGroupSnapshot(msg);
                String[] mqKeys = new String[]{};
                if (queueGroupInfo != null) {
                    mqKeys = queueGroupInfo.split(";");
                }
                if (mqKeys.length <= (queueGroupSize = this.queueGroup.getMessageQueueGroup().getMessageQueueList().size())) continue;
                LOG.warn("topic: {}, group id: {}, queue group size {} not match msg property {}, maybe it has been changed", this.queueGroup.getTopic(), this.queueGroup.getQueueGroupId(), queueGroupSize, mqKeys.length);
                return false;
            }
        }
        return true;
    }

    public static MessageExtWrapper pollEarliestMsg(List<MessageExtWrapper> msgsToSort) {
        LinkedList<MessageExtWrapper> candidates = new LinkedList<MessageExtWrapper>();
        MessageExtWrapper earliestMsgByTime = null;
        MessageExtWrapper globalEarliestMsgByTime = null;
        for (int i = 0; i < msgsToSort.size(); ++i) {
            MessageExtWrapper thisMsg = msgsToSort.get(i);
            if (globalEarliestMsgByTime == null || thisMsg.getMsgExt().getStoreTimestamp() < globalEarliestMsgByTime.getMsgExt().getStoreTimestamp()) {
                globalEarliestMsgByTime = thisMsg;
            }
            boolean candidateFlag = true;
            for (int j = 0; j < msgsToSort.size(); ++j) {
                MessageExtWrapper thatMsg = msgsToSort.get(j);
                if (j == i || MergeRequest.compareByOffset(thisMsg, thatMsg) <= 0) continue;
                candidateFlag = false;
                break;
            }
            if (!candidateFlag) continue;
            if (earliestMsgByTime == null || thisMsg.getMsgExt().getStoreTimestamp() < earliestMsgByTime.getMsgExt().getStoreTimestamp()) {
                earliestMsgByTime = thisMsg;
            }
            candidates.add(thisMsg);
        }
        if (!candidates.isEmpty()) {
            earliestMsgByTime = candidates.size() == 1 ? (MessageExtWrapper)candidates.get(0) : earliestMsgByTime;
        } else {
            LOG.warn("[BUG] pollEarliestMsg candidates empty, msgsToSort {}", (Object)msgsToSort);
            earliestMsgByTime = globalEarliestMsgByTime;
        }
        msgsToSort.remove(earliestMsgByTime);
        return earliestMsgByTime;
    }

    private static int compareByOffset(MessageExtWrapper o1, MessageExtWrapper o2) {
        long otherOffsetOnThis = MergeRequest.getOffsetOn(o1, o2);
        long thisOffsetOnOther = MergeRequest.getOffsetOn(o2, o1);
        long thisMsgCount = o1.getMsgExt().getQueueOffset() + 1L;
        long otherMsgCount = o2.getMsgExt().getQueueOffset() + 1L;
        if (otherOffsetOnThis >= otherMsgCount) {
            return 1;
        }
        if (thisOffsetOnOther >= thisMsgCount) {
            return -1;
        }
        return 0;
    }

    private static long getOffsetOn(MessageExtWrapper o1, MessageExtWrapper o2) {
        long otherOffsetOnThis;
        try {
            otherOffsetOnThis = o1.getMsgExt().getProperties().containsKey(o2.key) ? Long.parseLong(o1.getMsgExt().getProperties().get(o2.key)) : -1L;
        }
        catch (NumberFormatException e) {
            otherOffsetOnThis = -1L;
        }
        return otherOffsetOnThis;
    }

    public QueueGroup getQueueGroup() {
        return this.queueGroup;
    }

    public void setQueueGroup(QueueGroup queueGroup) {
        this.queueGroup = queueGroup;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        MergeRequest that = (MergeRequest)o;
        return Objects.equal(this.queueGroup, that.queueGroup);
    }

    public int hashCode() {
        return Objects.hashCode(this.queueGroup);
    }

    public static class MessageExtWrapper {
        private String key;
        private MessageExt msgExt;

        public MessageExtWrapper(String key, MessageExt msgExt) {
            this.key = key;
            this.msgExt = msgExt;
        }

        public String getKey() {
            return this.key;
        }

        public MessageExt getMsgExt() {
            return this.msgExt;
        }

        public String toString() {
            return "MessageExtWarpper [key =" + this.key + ", toString()=" + this.msgExt.toString() + "]";
        }
    }

    public static enum InterruptCode {
        REMOVE_REQUEST,
        LOCK_LATER,
        UPDATE_LATER,
        MERGE_LATER;

    }
}

