/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aggregator;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.Lifecycle;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
import org.springframework.integration.aggregator.MessageGroupExpiredEvent;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.MessageSequenceComparator;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.aggregator.SequenceSizeReleaseStrategy;
import org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public abstract class AbstractCorrelatingMessageHandler
extends AbstractMessageProducingHandler
implements DiscardingMessageHandler,
DisposableBean,
ApplicationEventPublisherAware,
Lifecycle {
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final Comparator<Message<?>> sequenceNumberComparator = new MessageSequenceComparator();
    private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new ConcurrentHashMap();
    private final Set<Object> groupIds = ConcurrentHashMap.newKeySet();
    private MessageGroupProcessor outputProcessor;
    private MessageGroupStore messageStore;
    private CorrelationStrategy correlationStrategy;
    private ReleaseStrategy releaseStrategy;
    private boolean releaseStrategySet;
    private MessageChannel discardChannel;
    private String discardChannelName;
    private boolean sendPartialResultOnExpiry = false;
    private boolean sequenceAware = false;
    private LockRegistry lockRegistry = new DefaultLockRegistry();
    private boolean lockRegistrySet = false;
    private long minimumTimeoutForEmptyGroups;
    private boolean releasePartialSequences;
    private Expression groupTimeoutExpression;
    private List<Advice> forceReleaseAdviceChain;
    private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
    private EvaluationContext evaluationContext;
    private ApplicationEventPublisher applicationEventPublisher;
    private boolean expireGroupsUponTimeout = true;
    private boolean popSequence = true;
    private volatile boolean running;

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
        Assert.notNull((Object)processor, (String)"'processor' must not be null");
        Assert.notNull((Object)store, (String)"'store' must not be null");
        this.setMessageStore(store);
        this.outputProcessor = processor;
        this.correlationStrategy = correlationStrategy == null ? new HeaderAttributeCorrelationStrategy("correlationId") : correlationStrategy;
        this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
        this.releaseStrategySet = releaseStrategy != null;
        this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store) {
        this(processor, store, null, null);
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) {
        this(processor, new SimpleMessageStore(0), null, null);
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        Assert.isTrue((!this.lockRegistrySet ? 1 : 0) != 0, (String)"'this.lockRegistry' can not be reset once its been set");
        Assert.notNull((Object)lockRegistry, (String)"'lockRegistry' must not be null");
        this.lockRegistry = lockRegistry;
        this.lockRegistrySet = true;
    }

    public final void setMessageStore(MessageGroupStore store) {
        this.messageStore = store;
        store.registerMessageGroupExpiryCallback((messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group));
    }

    public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
        Assert.notNull((Object)correlationStrategy, (String)"'correlationStrategy' must not be null");
        this.correlationStrategy = correlationStrategy;
    }

    public void setReleaseStrategy(ReleaseStrategy releaseStrategy) {
        Assert.notNull((Object)releaseStrategy, (String)"'releaseStrategy' must not be null");
        this.releaseStrategy = releaseStrategy;
        this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
        this.releaseStrategySet = true;
    }

    public void setGroupTimeoutExpression(Expression groupTimeoutExpression) {
        this.groupTimeoutExpression = groupTimeoutExpression;
    }

    public void setForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain) {
        Assert.notNull(forceReleaseAdviceChain, (String)"'forceReleaseAdviceChain' must not be null");
        this.forceReleaseAdviceChain = forceReleaseAdviceChain;
    }

    public void setOutputProcessor(MessageGroupProcessor outputProcessor) {
        Assert.notNull((Object)outputProcessor, (String)"'processor' must not be null");
        this.outputProcessor = outputProcessor;
    }

    public void setDiscardChannel(MessageChannel discardChannel) {
        Assert.notNull((Object)discardChannel, (String)"'discardChannel' cannot be null");
        this.discardChannel = discardChannel;
    }

    public void setDiscardChannelName(String discardChannelName) {
        Assert.hasText((String)discardChannelName, (String)"'discardChannelName' must not be empty");
        this.discardChannelName = discardChannelName;
    }

    public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
        this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
    }

    public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
        this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
    }

    public void setReleasePartialSequences(boolean releasePartialSequences) {
        if (!this.releaseStrategySet && releasePartialSequences) {
            this.setReleaseStrategy(new SequenceSizeReleaseStrategy());
        }
        this.releasePartialSequences = releasePartialSequences;
    }

    public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
        this.expireGroupsUponTimeout = expireGroupsUponTimeout;
    }

    public void setPopSequence(boolean popSequence) {
        this.popSequence = popSequence;
    }

    @Override
    public void setTaskScheduler(TaskScheduler taskScheduler) {
        super.setTaskScheduler(taskScheduler);
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    protected void onInit() throws Exception {
        super.onInit();
        Assert.state((this.discardChannelName == null || this.discardChannel == null ? 1 : 0) != 0, (String)"'discardChannelName' and 'discardChannel' are mutually exclusive.");
        BeanFactory beanFactory = this.getBeanFactory();
        if (beanFactory != null) {
            if (this.outputProcessor instanceof BeanFactoryAware) {
                ((BeanFactoryAware)this.outputProcessor).setBeanFactory(beanFactory);
            }
            if (this.correlationStrategy instanceof BeanFactoryAware) {
                ((BeanFactoryAware)this.correlationStrategy).setBeanFactory(beanFactory);
            }
            if (this.releaseStrategy instanceof BeanFactoryAware) {
                ((BeanFactoryAware)this.releaseStrategy).setBeanFactory(beanFactory);
            }
        }
        if (this.discardChannel == null) {
            this.discardChannel = new NullChannel();
        }
        if (this.releasePartialSequences) {
            Assert.isInstanceOf(SequenceSizeReleaseStrategy.class, (Object)this.releaseStrategy, () -> "Release strategy of type [" + this.releaseStrategy.getClass().getSimpleName() + "] cannot release partial sequences. Use a SequenceSizeReleaseStrategy instead.");
            ((SequenceSizeReleaseStrategy)this.releaseStrategy).setReleasePartialSequences(this.releasePartialSequences);
        }
        if (this.evaluationContext == null) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.getBeanFactory());
        }
        if (this.sequenceAware) {
            this.logger.warn((Object)"Using a SequenceSizeReleaseStrategy with large groups may not perform well, consider using a SimpleSequenceSizeReleaseStrategy");
        }
        this.lockRegistrySet = true;
        this.forceReleaseProcessor = this.createGroupTimeoutProcessor();
    }

    private MessageGroupProcessor createGroupTimeoutProcessor() {
        ForceReleaseMessageGroupProcessor processor = new ForceReleaseMessageGroupProcessor();
        if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
            ProxyFactory proxyFactory = new ProxyFactory((Object)processor);
            this.forceReleaseAdviceChain.forEach(arg_0 -> ((ProxyFactory)proxyFactory).addAdvice(arg_0));
            return (MessageGroupProcessor)proxyFactory.getProxy(this.getApplicationContext().getClassLoader());
        }
        return processor;
    }

    @Override
    public String getComponentType() {
        return "aggregator";
    }

    public MessageGroupStore getMessageStore() {
        return this.messageStore;
    }

    protected Map<UUID, ScheduledFuture<?>> getExpireGroupScheduledFutures() {
        return this.expireGroupScheduledFutures;
    }

    protected MessageGroupProcessor getOutputProcessor() {
        return this.outputProcessor;
    }

    protected CorrelationStrategy getCorrelationStrategy() {
        return this.correlationStrategy;
    }

    protected ReleaseStrategy getReleaseStrategy() {
        return this.releaseStrategy;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageChannel getDiscardChannel() {
        if (this.discardChannelName != null) {
            AbstractCorrelatingMessageHandler abstractCorrelatingMessageHandler = this;
            synchronized (abstractCorrelatingMessageHandler) {
                if (this.discardChannelName != null) {
                    this.discardChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.discardChannelName);
                    this.discardChannelName = null;
                }
            }
        }
        return this.discardChannel;
    }

    protected String getDiscardChannelName() {
        return this.discardChannelName;
    }

    protected boolean isSendPartialResultOnExpiry() {
        return this.sendPartialResultOnExpiry;
    }

    protected boolean isSequenceAware() {
        return this.sequenceAware;
    }

    protected LockRegistry getLockRegistry() {
        return this.lockRegistry;
    }

    protected boolean isLockRegistrySet() {
        return this.lockRegistrySet;
    }

    protected long getMinimumTimeoutForEmptyGroups() {
        return this.minimumTimeoutForEmptyGroups;
    }

    protected boolean isReleasePartialSequences() {
        return this.releasePartialSequences;
    }

    protected Expression getGroupTimeoutExpression() {
        return this.groupTimeoutExpression;
    }

    protected EvaluationContext getEvaluationContext() {
        return this.evaluationContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void handleMessageInternal(Message<?> message) throws Exception {
        block14: {
            Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
            Assert.state((correlationKey != null ? 1 : 0) != 0, (String)"Null correlation not allowed.  Maybe the CorrelationStrategy is failing?");
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Handling message with correlationKey [" + correlationKey + "]: " + message));
            }
            UUID groupIdUuid = UUIDConverter.getUUID(correlationKey);
            Lock lock = this.lockRegistry.obtain(groupIdUuid.toString());
            lock.lockInterruptibly();
            try {
                boolean canceled;
                ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(groupIdUuid);
                if (scheduledFuture != null && (canceled = scheduledFuture.cancel(true)) && this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Cancel 'ScheduledFuture' for MessageGroup with Correlation Key [ " + correlationKey + "]."));
                }
                MessageGroup messageGroup = this.messageStore.getMessageGroup(correlationKey);
                if (this.sequenceAware) {
                    messageGroup = new SequenceAwareMessageGroup(messageGroup);
                }
                if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace((Object)("Adding message to group [ " + messageGroup + "]"));
                    }
                    if (this.releaseStrategy.canRelease(messageGroup = this.store(correlationKey, message))) {
                        Collection<Message<?>> completedMessages = null;
                        try {
                            completedMessages = this.completeGroup(message, correlationKey, messageGroup);
                        }
                        finally {
                            this.afterRelease(messageGroup, completedMessages);
                        }
                        if (!this.isExpireGroupsUponCompletion() && this.minimumTimeoutForEmptyGroups > 0L) {
                            this.removeEmptyGroupAfterTimeout(messageGroup, this.minimumTimeoutForEmptyGroups);
                        }
                        break block14;
                    }
                    this.scheduleGroupToForceComplete(messageGroup);
                    break block14;
                }
                this.discardMessage(message);
            }
            finally {
                lock.unlock();
            }
        }
    }

    protected boolean isExpireGroupsUponCompletion() {
        return false;
    }

    private void removeEmptyGroupAfterTimeout(MessageGroup messageGroup, long timeout) {
        Object groupId = messageGroup.getGroupId();
        UUID groupUuid = UUIDConverter.getUUID(groupId);
        ScheduledFuture scheduledFuture = this.getTaskScheduler().schedule(() -> {
            Lock lock = this.lockRegistry.obtain(groupUuid.toString());
            try {
                lock.lockInterruptibly();
                try {
                    boolean removeGroup;
                    this.expireGroupScheduledFutures.remove(groupUuid);
                    MessageGroup groupNow = this.messageStore.getMessageGroup(groupUuid);
                    boolean bl = removeGroup = groupNow.size() == 0 && groupNow.getLastModified() <= System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups;
                    if (removeGroup) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("Removing empty group: " + groupUuid));
                        }
                        this.remove(messageGroup);
                    }
                }
                finally {
                    lock.unlock();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Thread was interrupted while trying to obtain lock.Rescheduling empty MessageGroup [ " + groupId + "] for removal."));
                }
                this.removeEmptyGroupAfterTimeout(messageGroup, timeout);
            }
        }, new Date(System.currentTimeMillis() + timeout));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Schedule empty MessageGroup [ " + groupId + "] for removal."));
        }
        this.expireGroupScheduledFutures.put(groupUuid, scheduledFuture);
    }

    private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
        Long groupTimeout = this.obtainGroupTimeout(messageGroup);
        if (groupTimeout != null) {
            if (groupTimeout > 0L) {
                Object groupId = messageGroup.getGroupId();
                long timestamp = messageGroup.getTimestamp();
                long lastModified = messageGroup.getLastModified();
                ScheduledFuture scheduledFuture = this.getTaskScheduler().schedule(() -> {
                    try {
                        this.processForceRelease(groupId, timestamp, lastModified);
                    }
                    catch (MessageDeliveryException e) {
                        if (this.logger.isWarnEnabled()) {
                            this.logger.warn((Object)("The MessageGroup [" + groupId + "] is rescheduled by the reason of:"), (Throwable)e);
                        }
                        this.scheduleGroupToForceComplete(groupId);
                    }
                }, new Date(System.currentTimeMillis() + groupTimeout));
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Schedule MessageGroup [ " + messageGroup + "] to 'forceComplete'."));
                }
                this.expireGroupScheduledFutures.put(UUIDConverter.getUUID(groupId), scheduledFuture);
            } else {
                this.forceReleaseProcessor.processMessageGroup(messageGroup);
            }
        }
    }

    private void scheduleGroupToForceComplete(Object groupId) {
        MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
        this.scheduleGroupToForceComplete(messageGroup);
    }

    private void processForceRelease(Object groupId, long timestamp, long lastModified) {
        MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
        if (messageGroup.getTimestamp() == timestamp && messageGroup.getLastModified() == lastModified) {
            this.forceReleaseProcessor.processMessageGroup(messageGroup);
        }
    }

    private void discardMessage(Message<?> message) {
        this.messagingTemplate.send(this.getDiscardChannel(), message);
    }

    protected abstract void afterRelease(MessageGroup var1, Collection<Message<?>> var2);

    protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) {
        this.afterRelease(group, completedMessages);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void forceComplete(MessageGroup group) {
        Object correlationKey = group.getGroupId();
        Lock lock = this.lockRegistry.obtain(UUIDConverter.getUUID(correlationKey).toString());
        boolean removeGroup = true;
        try {
            lock.lockInterruptibly();
            try {
                boolean canceled;
                ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(UUIDConverter.getUUID(correlationKey));
                if (scheduledFuture != null && (canceled = scheduledFuture.cancel(false)) && this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Cancel 'forceComplete' scheduling for MessageGroup [ " + group + "]."));
                }
                MessageGroup groupNow = group;
                if (!group.isComplete()) {
                    groupNow = this.messageStore.getMessageGroup(correlationKey);
                }
                long lastModifiedNow = groupNow.getLastModified();
                int groupSize = groupNow.size();
                if (!(groupNow.isComplete() && groupSize != 0 || group.getLastModified() != lastModifiedNow || group.getTimestamp() != groupNow.getTimestamp())) {
                    if (groupSize > 0) {
                        if (this.releaseStrategy.canRelease(groupNow)) {
                            this.completeGroup(correlationKey, groupNow);
                        } else {
                            this.expireGroup(correlationKey, groupNow);
                        }
                        if (!this.expireGroupsUponTimeout) {
                            this.afterRelease(groupNow, groupNow.getMessages(), true);
                            removeGroup = false;
                        }
                    } else {
                        boolean bl = removeGroup = lastModifiedNow <= System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups;
                        if (removeGroup && this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("Removing empty group: " + correlationKey));
                        }
                    }
                } else {
                    removeGroup = false;
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Group expiry candidate (" + correlationKey + ") has changed - it may be reconsidered for a future expiration"));
                    }
                }
            }
            catch (MessageDeliveryException e) {
                removeGroup = false;
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Group expiry candidate (" + correlationKey + ") has been affected by MessageDeliveryException - it may be reconsidered for a future expiration one more time"));
                }
                throw e;
            }
            finally {
                try {
                    if (removeGroup) {
                        this.remove(group);
                    }
                }
                finally {
                    lock.unlock();
                }
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            this.logger.debug((Object)"Thread was interrupted while trying to obtain lock");
        }
    }

    protected void remove(MessageGroup group) {
        Object correlationKey = group.getGroupId();
        this.messageStore.removeMessageGroup(correlationKey);
        this.groupIds.remove(group.getGroupId());
    }

    protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) {
        Message<?> lastReleasedMessage = Collections.max(partialSequence, this.sequenceNumberComparator);
        return new IntegrationMessageHeaderAccessor(lastReleasedMessage).getSequenceNumber();
    }

    protected MessageGroup store(Object correlationKey, Message<?> message) {
        this.groupIds.add(correlationKey);
        return this.messageStore.addMessageToGroup(correlationKey, message);
    }

    protected void expireGroup(Object correlationKey, MessageGroup group) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Expiring MessageGroup with correlationKey[" + correlationKey + "]"));
        }
        if (this.sendPartialResultOnExpiry) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Prematurely releasing partially complete group with key [" + correlationKey + "] to: " + this.getOutputChannel()));
            }
            this.completeGroup(correlationKey, group);
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Discarding messages of partially complete group with key [" + correlationKey + "] to: " + (this.discardChannelName != null ? this.discardChannelName : this.discardChannel)));
            }
            for (Message<?> message : group.getMessages()) {
                this.discardMessage(message);
            }
        }
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new MessageGroupExpiredEvent(this, correlationKey, group.size(), new Date(group.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
        }
    }

    protected void completeGroup(Object correlationKey, MessageGroup group) {
        Message<?> first = null;
        if (group != null) {
            first = group.getOne();
        }
        this.completeGroup(first, correlationKey, group);
    }

    protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Completing group with correlationKey [" + correlationKey + "]"));
        }
        AbstractIntegrationMessageBuilder result = this.outputProcessor.processMessageGroup(group);
        Collection partialSequence = null;
        if (result instanceof Collection) {
            this.verifyResultCollectionConsistsOfMessages((Collection)((Object)result));
            partialSequence = (Collection)((Object)result);
        }
        if (this.popSequence && partialSequence == null && !(result instanceof Message)) {
            AbstractIntegrationMessageBuilder<Object> messageBuilder = result instanceof AbstractIntegrationMessageBuilder ? (AbstractIntegrationMessageBuilder<Object>)result : this.getMessageBuilderFactory().withPayload(result).copyHeaders((Map<String, ?>)message.getHeaders());
            result = messageBuilder.popSequenceDetails();
        }
        this.sendOutputs(result, message);
        return partialSequence;
    }

    protected void verifyResultCollectionConsistsOfMessages(Collection<?> elements) {
        Class commonElementType = CollectionUtils.findCommonElementType(elements);
        Assert.isAssignable(Message.class, (Class)commonElementType, (String)("The expected collection of Messages contains non-Message element: " + commonElementType));
    }

    protected Long obtainGroupTimeout(MessageGroup group) {
        return this.groupTimeoutExpression != null ? (Long)this.groupTimeoutExpression.getValue(this.evaluationContext, (Object)group, Long.class) : null;
    }

    @Override
    public void destroy() {
        this.expireGroupScheduledFutures.values().forEach(future -> future.cancel(true));
    }

    public void start() {
        if (!this.running) {
            this.running = true;
            if (this.outputProcessor instanceof Lifecycle) {
                ((Lifecycle)this.outputProcessor).start();
            }
            if (this.releaseStrategy instanceof Lifecycle) {
                ((Lifecycle)this.releaseStrategy).start();
            }
        }
    }

    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.outputProcessor instanceof Lifecycle) {
                ((Lifecycle)this.outputProcessor).stop();
            }
            if (this.releaseStrategy instanceof Lifecycle) {
                ((Lifecycle)this.releaseStrategy).stop();
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    private class ForceReleaseMessageGroupProcessor
    implements MessageGroupProcessor {
        ForceReleaseMessageGroupProcessor() {
        }

        @Override
        public Object processMessageGroup(MessageGroup group) {
            if (AbstractCorrelatingMessageHandler.this.groupIds.contains(group.getGroupId())) {
                AbstractCorrelatingMessageHandler.this.forceComplete(group);
            }
            return null;
        }
    }

    protected static class SequenceAwareMessageGroup
    extends SimpleMessageGroup {
        private final SimpleMessageGroup sourceGroup;

        public SequenceAwareMessageGroup(MessageGroup messageGroup) {
            super(messageGroup.getMessages(), null, messageGroup.getGroupId(), messageGroup.getTimestamp(), messageGroup.isComplete(), true);
            this.sourceGroup = messageGroup instanceof SimpleMessageGroup ? (SimpleMessageGroup)messageGroup : null;
        }

        @Override
        public boolean canAdd(Message<?> message) {
            if (this.size() == 0) {
                return true;
            }
            Integer messageSequenceNumber = (Integer)message.getHeaders().get((Object)"sequenceNumber", Integer.class);
            if (messageSequenceNumber != null && messageSequenceNumber > 0) {
                Integer messageSequenceSize = (Integer)message.getHeaders().get((Object)"sequenceSize", Integer.class);
                if (messageSequenceSize == null) {
                    messageSequenceSize = 0;
                }
                return messageSequenceSize.equals(this.getSequenceSize()) && !(this.sourceGroup == null ? this.containsSequenceNumber(this.getMessages(), messageSequenceNumber) : this.sourceGroup.containsSequence(messageSequenceNumber));
            }
            return true;
        }

        private boolean containsSequenceNumber(Collection<Message<?>> messages, Integer messageSequenceNumber) {
            for (Message<?> member : messages) {
                if (!messageSequenceNumber.equals(member.getHeaders().get((Object)"sequenceNumber", Integer.class))) continue;
                return true;
            }
            return false;
        }
    }
}

