/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.common.utils.TemporaryClassLoaderContext;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaPartitionSplitReader
implements SplitReader<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
    private static final String CLIENT_ID_PREFIX = "seatunnel";
    private final KafkaSourceConfig kafkaSourceConfig;
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final Map<TopicPartition, Long> stoppingOffsets;
    private final String groupId;
    private final Set<String> emptySplits = new HashSet<String>();
    private final long pollTimeout;

    public KafkaPartitionSplitReader(KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) {
        this.kafkaSourceConfig = kafkaSourceConfig;
        this.consumer = this.initConsumer(kafkaSourceConfig, context.getIndexOfSubtask());
        this.stoppingOffsets = new HashMap<TopicPartition, Long>();
        this.groupId = kafkaSourceConfig.getProperties().getProperty("group.id");
        this.pollTimeout = kafkaSourceConfig.getPollTimeout();
    }

    @Override
    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
        ConsumerRecords<byte[], byte[]> consumerRecords;
        try {
            consumerRecords = this.consumer.poll(Duration.ofMillis(this.pollTimeout));
        }
        catch (IllegalStateException | WakeupException e) {
            KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(ConsumerRecords.empty());
            this.markEmptySplitsAsFinished(recordsBySplits);
            return recordsBySplits;
        }
        KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords);
        ArrayList<TopicPartition> finishedPartitions = new ArrayList<TopicPartition>();
        for (TopicPartition tp : consumerRecords.partitions()) {
            ConsumerRecord<byte[], byte[]> lastRecord;
            long stoppingOffset = this.getStoppingOffset(tp);
            List<ConsumerRecord<byte[], byte[]>> recordsFromPartition = consumerRecords.records(tp);
            if (recordsFromPartition.size() <= 0 || (lastRecord = recordsFromPartition.get(recordsFromPartition.size() - 1)).offset() < stoppingOffset - 1L) continue;
            recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
            this.finishSplitAtRecord(tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits);
        }
        this.markEmptySplitsAsFinished(recordsBySplits);
        if (!finishedPartitions.isEmpty()) {
            this.unassignPartitions(finishedPartitions);
        }
        return recordsBySplits;
    }

    private void finishSplitAtRecord(TopicPartition tp, long stoppingOffset, long currentOffset, List<TopicPartition> finishedPartitions, KafkaPartitionSplitRecords recordsBySplits) {
        LOG.debug("{} has reached stopping offset {}, current offset is {}", new Object[]{tp, stoppingOffset, currentOffset});
        finishedPartitions.add(tp);
        recordsBySplits.addFinishedSplit(tp.toString());
    }

    private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) {
        if (!this.emptySplits.isEmpty()) {
            recordsBySplits.finishedSplits.addAll(this.emptySplits);
            this.emptySplits.clear();
        }
    }

    @Override
    public void handleSplitsChanges(SplitsChange<KafkaSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        ArrayList<TopicPartition> newPartitionAssignments = new ArrayList<TopicPartition>();
        HashMap<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new HashMap<TopicPartition, Long>();
        ArrayList<TopicPartition> partitionsStartingFromEarliest = new ArrayList<TopicPartition>();
        ArrayList<TopicPartition> partitionsStartingFromLatest = new ArrayList<TopicPartition>();
        ArrayList<TopicPartition> partitionsStoppingAtLatest = new ArrayList<TopicPartition>();
        splitsChange.splits().forEach(s2 -> {
            newPartitionAssignments.add(s2.getTopicPartition());
            this.parseStartingOffsets((KafkaSourceSplit)s2, (List<TopicPartition>)partitionsStartingFromEarliest, (List<TopicPartition>)partitionsStartingFromLatest, (Map<TopicPartition, Long>)partitionsStartingFromSpecifiedOffsets);
            this.parseStoppingOffsets((KafkaSourceSplit)s2, (List<TopicPartition>)partitionsStoppingAtLatest);
        });
        newPartitionAssignments.addAll(this.consumer.assignment());
        this.consumer.assign(newPartitionAssignments);
        this.seekToStartingOffsets(partitionsStartingFromEarliest, partitionsStartingFromLatest, partitionsStartingFromSpecifiedOffsets);
        this.acquireAndSetStoppingOffsets(partitionsStoppingAtLatest);
        this.removeEmptySplits();
        this.maybeLogSplitChangesHandlingResult(splitsChange);
    }

    private void maybeLogSplitChangesHandlingResult(SplitsChange<KafkaSourceSplit> splitsChange) {
        if (LOG.isDebugEnabled()) {
            StringJoiner splitsInfo = new StringJoiner(",");
            Set<TopicPartition> assginment = this.consumer.assignment();
            for (KafkaSourceSplit split : splitsChange.splits()) {
                if (!assginment.contains(split.getTopicPartition())) continue;
                long startingOffset = this.retryOnWakeup(() -> this.consumer.position(split.getTopicPartition()), "logging starting position");
                long stoppingOffset = this.getStoppingOffset(split.getTopicPartition());
                splitsInfo.add(String.format("[%s, start:%d, stop: %d]", split.getTopicPartition(), startingOffset, stoppingOffset));
            }
            LOG.debug("SplitsChange handling result: {}", (Object)splitsInfo);
        }
    }

    private void removeEmptySplits() {
        ArrayList<TopicPartition> emptyPartitions = new ArrayList<TopicPartition>();
        for (TopicPartition tp : this.consumer.assignment()) {
            if (this.retryOnWakeup(() -> this.consumer.position(tp), "getting starting offset to check if split is empty") < this.getStoppingOffset(tp)) continue;
            emptyPartitions.add(tp);
        }
        if (!emptyPartitions.isEmpty()) {
            LOG.debug("These assigning splits are empty and will be marked as finished in later fetch: {}", emptyPartitions);
            this.emptySplits.addAll(emptyPartitions.stream().map(TopicPartition::toString).collect(Collectors.toSet()));
            this.unassignPartitions(emptyPartitions);
        }
    }

    private void unassignPartitions(Collection<TopicPartition> partitionsToUnassign) {
        HashSet<TopicPartition> newAssignment = new HashSet<TopicPartition>(this.consumer.assignment());
        newAssignment.removeAll(partitionsToUnassign);
        this.consumer.assign(newAssignment);
    }

    private void acquireAndSetStoppingOffsets(List<TopicPartition> partitionsStoppingAtLatest) {
        Map<TopicPartition, Long> endOffset = this.consumer.endOffsets(partitionsStoppingAtLatest);
        this.stoppingOffsets.putAll(endOffset);
    }

    private void seekToStartingOffsets(List<TopicPartition> partitionsStartingFromEarliest, List<TopicPartition> partitionsStartingFromLatest, Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
        if (!partitionsStartingFromEarliest.isEmpty()) {
            LOG.trace("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest);
            this.consumer.seekToBeginning(partitionsStartingFromEarliest);
        }
        if (!partitionsStartingFromLatest.isEmpty()) {
            LOG.trace("Seeking starting offsets to end: {}", partitionsStartingFromLatest);
            this.consumer.seekToEnd(partitionsStartingFromLatest);
        }
        if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
            LOG.trace("Seeking starting offsets to specified offsets: {}", partitionsStartingFromSpecifiedOffsets);
            partitionsStartingFromSpecifiedOffsets.forEach(this.consumer::seek);
        }
    }

    private void parseStoppingOffsets(KafkaSourceSplit split, List<TopicPartition> partitionsStoppingAtLatest) {
        TopicPartition tp = split.getTopicPartition();
        if (split.getEndOffset() >= 0L) {
            this.stoppingOffsets.put(tp, split.getEndOffset());
        } else {
            partitionsStoppingAtLatest.add(tp);
        }
    }

    private long getStoppingOffset(TopicPartition tp) {
        return this.stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
    }

    private void parseStartingOffsets(KafkaSourceSplit split, List<TopicPartition> partitionsStartingFromEarliest, List<TopicPartition> partitionsStartingFromLatest, Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
        TopicPartition tp = split.getTopicPartition();
        ConsumerMetadata metadata = this.kafkaSourceConfig.getMapMetadata().get(split.getTablePath());
        if (metadata.getStartMode() == StartMode.EARLIEST) {
            partitionsStartingFromEarliest.add(tp);
        } else if (metadata.getStartMode() == StartMode.LATEST) {
            partitionsStartingFromLatest.add(tp);
        } else if (metadata.getStartMode() != StartMode.GROUP_OFFSETS) {
            partitionsStartingFromSpecifiedOffsets.put(tp, split.getStartOffset());
        }
    }

    @Override
    public void wakeUp() {
        this.consumer.wakeup();
    }

    @Override
    public void close() throws Exception {
        this.consumer.close();
    }

    public void notifyCheckpointComplete(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback offsetCommitCallback) {
        this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    }

    private KafkaConsumer<byte[], byte[]> initConsumer(KafkaSourceConfig kafkaSourceConfig, int subtaskId) {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)kafkaSourceConfig.getClass().getClassLoader());){
            Properties props = new Properties();
            kafkaSourceConfig.getProperties().forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> props.setProperty(String.valueOf(key), String.valueOf(value))));
            props.setProperty("group.id", kafkaSourceConfig.getConsumerGroup());
            props.setProperty("bootstrap.servers", kafkaSourceConfig.getBootstrap());
            if (this.kafkaSourceConfig.getProperties().get("client.id") == null) {
                props.setProperty("client.id", "seatunnel-consumer-" + subtaskId);
            } else {
                props.setProperty("client.id", this.kafkaSourceConfig.getProperties().get("client.id").toString() + "-" + subtaskId);
            }
            props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
            props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            props.setProperty("enable.auto.commit", String.valueOf(kafkaSourceConfig.isCommitOnCheckpoint()));
            props.setProperty("allow.auto.create.topics", "false");
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(props);
            return kafkaConsumer;
        }
    }

    private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
        try {
            return consumerCall.get();
        }
        catch (WakeupException we) {
            LOG.info("Caught WakeupException while executing Kafka consumer call for {}. Will retry the consumer call.", (Object)description);
            return consumerCall.get();
        }
    }

    private static class KafkaPartitionSplitRecords
    implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
        private final Set<String> finishedSplits = new HashSet<String>();
        private final Map<TopicPartition, Long> stoppingOffsets = new HashMap<TopicPartition, Long>();
        private final ConsumerRecords<byte[], byte[]> consumerRecords;
        private final Iterator<TopicPartition> splitIterator;
        private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
        private TopicPartition currentTopicPartition;
        private Long currentSplitStoppingOffset;

        private KafkaPartitionSplitRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
            this.consumerRecords = consumerRecords;
            this.splitIterator = consumerRecords.partitions().iterator();
        }

        private void setPartitionStoppingOffset(TopicPartition topicPartition, long stoppingOffset) {
            this.stoppingOffsets.put(topicPartition, stoppingOffset);
        }

        private void addFinishedSplit(String splitId) {
            this.finishedSplits.add(splitId);
        }

        @Override
        @Nullable
        public String nextSplit() {
            if (this.splitIterator.hasNext()) {
                this.currentTopicPartition = this.splitIterator.next();
                this.recordIterator = this.consumerRecords.records(this.currentTopicPartition).iterator();
                this.currentSplitStoppingOffset = this.stoppingOffsets.getOrDefault(this.currentTopicPartition, Long.MAX_VALUE);
                return this.currentTopicPartition.toString();
            }
            this.currentTopicPartition = null;
            this.recordIterator = null;
            this.currentSplitStoppingOffset = null;
            return null;
        }

        @Override
        @Nullable
        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
            ConsumerRecord<byte[], byte[]> record;
            Preconditions.checkNotNull((Object)this.currentTopicPartition, (Object)"Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext() && (record = this.recordIterator.next()).offset() < this.currentSplitStoppingOffset) {
                return record;
            }
            return null;
        }

        @Override
        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }
}

