/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.NonNull;
import org.apache.iceberg.Table;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSplitEnumerator
implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSplitEnumerator.class);
    protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context;
    protected final SourceConfig sourceConfig;
    protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;
    protected IcebergTableLoader icebergTableLoader;
    private volatile boolean isOpen = false;
    private CatalogTable catalogTable;

    public AbstractSplitEnumerator(@NonNull SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context, @NonNull SourceConfig sourceConfig, @NonNull Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits, CatalogTable catalogTable) {
        if (context == null) {
            throw new NullPointerException("context is marked non-null but is null");
        }
        if (sourceConfig == null) {
            throw new NullPointerException("sourceConfig is marked non-null but is null");
        }
        if (pendingSplits == null) {
            throw new NullPointerException("pendingSplits is marked non-null but is null");
        }
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.pendingSplits = new HashMap<Integer, List<IcebergFileScanTaskSplit>>(pendingSplits);
        this.catalogTable = catalogTable;
    }

    public void open() {
        this.icebergTableLoader = IcebergTableLoader.create(this.sourceConfig, this.catalogTable);
        this.icebergTableLoader.open();
        this.isOpen = true;
    }

    public void run() {
        this.refreshPendingSplits();
        this.assignPendingSplits(this.context.registeredReaders());
    }

    public void close() throws IOException {
        this.icebergTableLoader.close();
        this.isOpen = false;
    }

    public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int subtaskId) {
        this.addPendingSplits(splits);
        if (this.context.registeredReaders().contains(subtaskId)) {
            this.assignPendingSplits(Collections.singleton(subtaskId));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void registerReader(int subtaskId) {
        log.debug("Adding reader {} to IcebergSourceEnumerator.", (Object)subtaskId);
        this.assignPendingSplits(Collections.singleton(subtaskId));
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    protected void refreshPendingSplits() {
        List<IcebergFileScanTaskSplit> newSplits = this.loadNewSplits(this.icebergTableLoader.loadTable());
        this.addPendingSplits(newSplits);
    }

    protected abstract List<IcebergFileScanTaskSplit> loadNewSplits(Table var1);

    private void addPendingSplits(Collection<IcebergFileScanTaskSplit> newSplits) {
        int numReaders = this.context.currentParallelism();
        for (IcebergFileScanTaskSplit newSplit : newSplits) {
            int ownerReader = (newSplit.splitId().hashCode() & Integer.MAX_VALUE) % numReaders;
            this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).add(newSplit);
            log.info("Assigning {} to {} reader.", (Object)newSplit, (Object)ownerReader);
        }
    }

    protected void assignPendingSplits(Set<Integer> pendingReaders) {
        for (int pendingReader : pendingReaders) {
            List<IcebergFileScanTaskSplit> pendingAssignmentForReader = this.pendingSplits.remove(pendingReader);
            if (pendingAssignmentForReader == null || pendingAssignmentForReader.isEmpty()) continue;
            log.info("Assign splits {} to reader {}", pendingAssignmentForReader, (Object)pendingReader);
            try {
                this.context.assignSplit(pendingReader, pendingAssignmentForReader);
            }
            catch (Exception e) {
                log.error("Failed to assign splits {} to reader {}", new Object[]{pendingAssignmentForReader, pendingReader, e});
                this.pendingSplits.put(pendingReader, pendingAssignmentForReader);
            }
        }
    }

    public boolean isOpen() {
        return this.isOpen;
    }
}

