/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.fetcher;

import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarFetcherManagerBase;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarOrderedFetcherManager<T>
extends PulsarFetcherManagerBase<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedFetcherManager.class);

    public PulsarOrderedFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<T>>> elementsQueue, Supplier<SplitReader<PulsarMessage<T>, PulsarPartitionSplit>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
    }

    public void acknowledgeMessages(Map<TopicPartition, MessageId> cursorsToCommit) {
        LOG.debug("Acknowledge messages {}", cursorsToCommit);
        cursorsToCommit.forEach((partition, messageId) -> {
            SplitFetcher fetcher = this.getOrCreateFetcher(partition.toString());
            this.triggerAcknowledge(fetcher, (TopicPartition)partition, (MessageId)messageId);
        });
    }

    private void triggerAcknowledge(SplitFetcher<PulsarMessage<T>, PulsarPartitionSplit> splitFetcher, TopicPartition partition, MessageId messageId) {
        PulsarOrderedPartitionSplitReader splitReader = (PulsarOrderedPartitionSplitReader)splitFetcher.getSplitReader();
        splitReader.notifyCheckpointComplete(partition, messageId);
        this.startFetcher(splitFetcher);
    }
}

