/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.reader.fetcher;

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceFetcherManager<T>
extends SingleThreadFetcherManager<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceFetcherManager.class);

    public KafkaSourceFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue, Supplier<SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
    }

    public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback callback) {
        LOG.debug("Committing offsets {}", offsetsToCommit);
        if (offsetsToCommit.isEmpty()) {
            return;
        }
        SplitFetcher splitFetcher = (SplitFetcher)this.fetchers.get(0);
        if (splitFetcher != null) {
            this.enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
        } else {
            splitFetcher = this.createSplitFetcher();
            this.enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback);
            this.startFetcher(splitFetcher);
        }
    }

    private void enqueueOffsetsCommitTask(SplitFetcher<Tuple3<T, Long, Long>, KafkaPartitionSplit> splitFetcher, final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, final OffsetCommitCallback callback) {
        final KafkaPartitionSplitReader kafkaReader = (KafkaPartitionSplitReader)splitFetcher.getSplitReader();
        splitFetcher.enqueueTask(new SplitFetcherTask(){

            public boolean run() throws IOException {
                kafkaReader.notifyCheckpointComplete(offsetsToCommit, callback);
                return true;
            }

            public void wakeUp() {
            }
        });
    }
}

