/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.source.split;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSourceSplitEnumerator
implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FileSourceSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<FileSourceSplit> context;
    private final Set<FileSourceSplit> allSplit = new TreeSet<FileSourceSplit>(Comparator.comparing(FileSourceSplit::splitId));
    private Set<FileSourceSplit> assignedSplit;
    private final List<String> filePaths;
    private final AtomicInteger assignCount = new AtomicInteger(0);

    public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths) {
        this.context = context;
        this.filePaths = filePaths;
        this.assignedSplit = new HashSet<FileSourceSplit>();
    }

    public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, List<String> filePaths, FileSourceState sourceState) {
        this(context, filePaths);
        this.assignedSplit = sourceState.getAssignedSplit();
    }

    public void open() {
        this.allSplit.addAll(this.discoverySplits());
    }

    public void run() {
        for (int i = 0; i < this.context.currentParallelism(); ++i) {
            LOGGER.info("Assigned splits to reader [{}]", (Object)i);
            this.assignSplit(i);
        }
    }

    private Set<FileSourceSplit> discoverySplits() {
        HashSet<FileSourceSplit> fileSourceSplits = new HashSet<FileSourceSplit>();
        this.filePaths.forEach(k -> fileSourceSplits.add(new FileSourceSplit((String)k)));
        return fileSourceSplits;
    }

    public void close() throws IOException {
    }

    public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            this.allSplit.addAll(splits);
            this.assignSplit(subtaskId);
        }
    }

    private void assignSplit(int taskId) {
        ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<FileSourceSplit>();
        if (this.context.currentParallelism() == 1) {
            currentTaskSplits.addAll(this.allSplit);
        } else {
            this.assignCount.set(0);
            for (FileSourceSplit fileSourceSplit : this.allSplit) {
                int splitOwner = FileSourceSplitEnumerator.getSplitOwner(this.assignCount.getAndIncrement(), this.context.currentParallelism());
                if (splitOwner != taskId) continue;
                currentTaskSplits.add(fileSourceSplit);
            }
        }
        this.context.assignSplit(taskId, currentTaskSplits);
        this.assignedSplit.addAll(currentTaskSplits);
        LOGGER.info("SubTask {} is assigned to [{}]", (Object)taskId, (Object)currentTaskSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.joining(",")));
        this.context.signalNoMoreSplits(taskId);
    }

    private static int getSplitOwner(int assignCount, int numReaders) {
        return assignCount % numReaders;
    }

    public int currentUnassignedSplitSize() {
        return this.allSplit.size() - this.assignedSplit.size();
    }

    public void registerReader(int subtaskId) {
    }

    public FileSourceState snapshotState(long checkpointId) {
        return new FileSourceState(this.assignedSplit);
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void handleSplitRequest(int subtaskId) {
    }
}

