/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceSplitEnumerator
implements SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceSplitEnumerator.class);
    private static final String CLIENT_ID_PREFIX = "seatunnel";
    private final Map<TablePath, ConsumerMetadata> tablePathMetadataMap;
    private final SourceSplitEnumerator.Context<KafkaSourceSplit> context;
    private final long discoveryIntervalMillis;
    private final AdminClient adminClient;
    private final KafkaSourceConfig kafkaSourceConfig;
    private final Map<TopicPartition, KafkaSourceSplit> pendingSplit;
    private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
    private ScheduledExecutorService executor;
    private ScheduledFuture<?> scheduledFuture;
    private final Map<String, TablePath> topicMappingTablePathMap = new HashMap<String, TablePath>();
    private boolean isStreamingMode;

    KafkaSourceSplitEnumerator(KafkaSourceConfig kafkaSourceConfig, SourceSplitEnumerator.Context<KafkaSourceSplit> context, KafkaSourceState sourceState, boolean isStreamingMode) {
        this.kafkaSourceConfig = kafkaSourceConfig;
        this.tablePathMetadataMap = kafkaSourceConfig.getMapMetadata();
        this.context = context;
        this.assignedSplit = new HashMap<TopicPartition, KafkaSourceSplit>();
        this.pendingSplit = new HashMap<TopicPartition, KafkaSourceSplit>();
        this.adminClient = this.initAdminClient(this.kafkaSourceConfig.getProperties());
        this.discoveryIntervalMillis = kafkaSourceConfig.getDiscoveryIntervalMillis();
        this.isStreamingMode = isStreamingMode;
    }

    @VisibleForTesting
    public KafkaSourceSplitEnumerator(AdminClient adminClient, Map<TopicPartition, KafkaSourceSplit> pendingSplit, Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
        this.tablePathMetadataMap = new HashMap<TablePath, ConsumerMetadata>();
        this.context = null;
        this.discoveryIntervalMillis = -1L;
        this.adminClient = adminClient;
        this.kafkaSourceConfig = null;
        this.pendingSplit = pendingSplit;
        this.assignedSplit = assignedSplit;
    }

    @VisibleForTesting
    public KafkaSourceSplitEnumerator(AdminClient adminClient, Map<TopicPartition, KafkaSourceSplit> pendingSplit, Map<TopicPartition, KafkaSourceSplit> assignedSplit, boolean isStreamingMode) {
        this(adminClient, pendingSplit, assignedSplit);
        this.isStreamingMode = isStreamingMode;
    }

    public void open() {
        if (this.discoveryIntervalMillis > 0L) {
            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("kafka-partition-dynamic-discovery");
                return thread;
            });
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    this.discoverySplits();
                }
                catch (Exception e) {
                    log.error("Dynamic discovery failure:", (Throwable)e);
                }
            }, this.discoveryIntervalMillis, this.discoveryIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void run() throws ExecutionException, InterruptedException {
        this.fetchPendingPartitionSplit();
        this.setPartitionStartOffset();
        this.assignSplit();
    }

    private void setPartitionStartOffset() throws ExecutionException, InterruptedException {
        Set<TopicPartition> pendingTopicPartitions = this.pendingSplit.keySet();
        HashMap<TopicPartition, Long> topicPartitionOffsets = new HashMap<TopicPartition, Long>();
        Map tablePathPartitionMap = pendingTopicPartitions.stream().collect(Collectors.groupingBy(tp -> this.topicMappingTablePathMap.get(tp.topic()), Collectors.toSet()));
        for (TablePath tablePath : tablePathPartitionMap.keySet()) {
            ConsumerMetadata metadata = this.tablePathMetadataMap.get(tablePath);
            Set<TopicPartition> topicPartitions = tablePathPartitionMap.get(tablePath);
            switch (metadata.getStartMode()) {
                case EARLIEST: {
                    topicPartitionOffsets.putAll(this.listOffsets(topicPartitions, OffsetSpec.earliest()));
                    break;
                }
                case GROUP_OFFSETS: {
                    topicPartitionOffsets.putAll(this.listConsumerGroupOffsets(topicPartitions));
                    break;
                }
                case LATEST: {
                    topicPartitionOffsets.putAll(this.listOffsets(topicPartitions, OffsetSpec.latest()));
                    break;
                }
                case TIMESTAMP: {
                    topicPartitionOffsets.putAll(this.listOffsets(topicPartitions, OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp())));
                    break;
                }
                case SPECIFIC_OFFSETS: {
                    topicPartitionOffsets.putAll(metadata.getSpecificStartOffsets());
                    break;
                }
            }
        }
        topicPartitionOffsets.forEach((key, value) -> {
            if (this.pendingSplit.containsKey(key)) {
                this.pendingSplit.get(key).setStartOffset((long)value);
            }
        });
    }

    public void close() throws IOException {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
        }
    }

    public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            Map<TopicPartition, ? extends KafkaSourceSplit> nextSplit = this.convertToNextSplit(splits);
            nextSplit.keySet().forEach(this.assignedSplit::remove);
            this.pendingSplit.putAll(nextSplit);
        }
    }

    private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
        try {
            Map<TopicPartition, Long> latestOffsets = this.listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).filter(Objects::nonNull).collect(Collectors.toList()), OffsetSpec.latest());
            splits.forEach(split -> {
                split.setStartOffset(split.getEndOffset() + 1L);
                split.setEndOffset(this.isStreamingMode ? Long.MAX_VALUE : (Long)latestOffsets.get(split.getTopicPartition()));
            });
            return splits.stream().collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
        }
        catch (Exception e) {
            throw new KafkaConnectorException((SeaTunnelErrorCode)KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void handleSplitRequest(int subtaskId) {
    }

    public void registerReader(int subtaskId) {
        if (!this.pendingSplit.isEmpty()) {
            this.assignSplit();
        }
    }

    public KafkaSourceState snapshotState(long checkpointId) throws Exception {
        return new KafkaSourceState(new HashSet<KafkaSourceSplit>(this.assignedSplit.values()));
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    private AdminClient initAdminClient(Properties properties) {
        Properties props = new Properties();
        if (properties != null) {
            props.putAll((Map<?, ?>)properties);
        }
        props.setProperty("bootstrap.servers", this.kafkaSourceConfig.getBootstrap());
        if (properties.get("client.id") != null) {
            props.setProperty("client.id", properties.get("client.id").toString());
        } else {
            props.setProperty("client.id", "seatunnel-enumerator-admin-client-" + this.hashCode());
        }
        return AdminClient.create(props);
    }

    private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, InterruptedException {
        HashSet<String> topics = new HashSet<String>();
        for (TablePath tablePath : this.tablePathMetadataMap.keySet()) {
            ConsumerMetadata metadata = this.tablePathMetadataMap.get(tablePath);
            HashSet<String> currentPathTopics = new HashSet<String>();
            if (metadata.isPattern()) {
                Pattern pattern = Pattern.compile(metadata.getTopic());
                currentPathTopics.addAll(this.adminClient.listTopics().names().get().stream().filter(t -> pattern.matcher((CharSequence)t).matches()).collect(Collectors.toSet()));
            } else {
                currentPathTopics.addAll(Arrays.asList(metadata.getTopic().split(",")));
            }
            currentPathTopics.forEach(topic -> this.topicMappingTablePathMap.put((String)topic, tablePath));
            topics.addAll(currentPathTopics);
        }
        log.info("Discovered topics: {}", topics);
        Collection partitions = this.adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream().map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet());
        Map<TopicPartition, Long> latestOffsets = this.listOffsets(partitions, OffsetSpec.latest());
        return partitions.stream().map(partition -> {
            TablePath tablePath = this.topicMappingTablePathMap.get(partition.topic());
            KafkaSourceSplit split = new KafkaSourceSplit(tablePath, (TopicPartition)partition);
            split.setEndOffset(this.isStreamingMode ? Long.MAX_VALUE : (Long)latestOffsets.get(partition));
            return split;
        }).collect(Collectors.toSet());
    }

    private synchronized void assignSplit() {
        HashMap<Integer, List> readySplit = new HashMap<Integer, List>(16);
        for (int taskID = 0; taskID < this.context.currentParallelism(); ++taskID) {
            readySplit.computeIfAbsent(taskID, id -> new ArrayList());
        }
        this.pendingSplit.forEach((key, value) -> {
            if (!this.assignedSplit.containsKey(key)) {
                ((List)readySplit.get(KafkaSourceSplitEnumerator.getSplitOwner(key, this.context.currentParallelism()))).add(value);
            }
        });
        readySplit.forEach((id, split) -> {
            this.context.assignSplit(id.intValue(), split);
            if (this.discoveryIntervalMillis <= 0L) {
                this.context.signalNoMoreSplits(id.intValue());
            }
        });
        this.assignedSplit.putAll(this.pendingSplit);
        this.pendingSplit.clear();
    }

    private static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = (tp.topic().hashCode() * 31 & Integer.MAX_VALUE) % numReaders;
        return (startIndex + tp.partition()) % numReaders;
    }

    private Map<TopicPartition, Long> listOffsets(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
        Map<TopicPartition, OffsetSpec> topicPartitionOffsets = partitions.stream().collect(Collectors.toMap(partition -> partition, __ -> offsetSpec));
        return this.adminClient.listOffsets(topicPartitionOffsets).all().thenApply(result -> {
            HashMap offsets = new HashMap();
            result.forEach((tp, offsetsResultInfo) -> {
                if (offsetsResultInfo != null) {
                    offsets.put(tp, offsetsResultInfo.offset());
                }
            });
            return offsets;
        }).get();
    }

    public Map<TopicPartition, Long> listConsumerGroupOffsets(Collection<TopicPartition> partitions) throws ExecutionException, InterruptedException {
        ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<TopicPartition>(partitions));
        return this.adminClient.listConsumerGroupOffsets(this.kafkaSourceConfig.getConsumerGroup(), options).partitionsToOffsetAndMetadata().thenApply(result -> {
            HashMap offsets = new HashMap();
            result.forEach((tp, oam) -> {
                if (oam != null) {
                    offsets.put(tp, oam.offset());
                }
            });
            return offsets;
        }).get();
    }

    private void discoverySplits() throws ExecutionException, InterruptedException {
        this.fetchPendingPartitionSplit();
        this.assignSplit();
    }

    @VisibleForTesting
    public void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
        this.getTopicInfo().forEach(split -> {
            if (!this.assignedSplit.containsKey(split.getTopicPartition()) && !this.pendingSplit.containsKey(split.getTopicPartition())) {
                this.pendingSplit.put(split.getTopicPartition(), (KafkaSourceSplit)split);
            }
        });
    }
}

