/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfluxDBSourceSplitEnumerator
implements SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> {
    private static final Logger log = LoggerFactory.getLogger(InfluxDBSourceSplitEnumerator.class);
    final SourceConfig config;
    private final SourceSplitEnumerator.Context<InfluxDBSourceSplit> context;
    private final Map<Integer, List<InfluxDBSourceSplit>> pendingSplit;
    private final Object stateLock = new Object();
    private volatile boolean shouldEnumerate;

    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, SourceConfig config) {
        this(context, null, config);
    }

    public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> context, InfluxDBSourceState sourceState, SourceConfig config) {
        this.context = context;
        this.config = config;
        this.pendingSplit = new HashMap<Integer, List<InfluxDBSourceSplit>>();
        boolean bl = this.shouldEnumerate = sourceState == null;
        if (sourceState != null) {
            this.shouldEnumerate = sourceState.isShouldEnumerate();
            this.pendingSplit.putAll(sourceState.getPendingSplit());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        Set readers = this.context.registeredReaders();
        if (this.shouldEnumerate) {
            Set<InfluxDBSourceSplit> newSplits = this.getInfluxDBSplit();
            Object object = this.stateLock;
            synchronized (object) {
                this.addPendingSplit(newSplits);
                this.shouldEnumerate = false;
            }
            this.assignSplit(readers);
        }
        log.debug("No more splits to assign. Sending NoMoreSplitsEvent to reader {}.", (Object)readers);
        readers.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
    }

    public void addSplitsBack(List splits, int subtaskId) {
        log.debug("Add back splits {} to InfluxDBSourceSplitEnumerator.", (Object)splits);
        if (!splits.isEmpty()) {
            this.addPendingSplit(splits);
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void registerReader(int subtaskId) {
        log.debug("Register reader {} to InfluxDBSourceSplitEnumerator.", (Object)subtaskId);
        if (!this.pendingSplit.isEmpty()) {
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public InfluxDBSourceState snapshotState(long checkpointId) {
        Object object = this.stateLock;
        synchronized (object) {
            return new InfluxDBSourceState(this.shouldEnumerate, this.pendingSplit);
        }
    }

    private Set<InfluxDBSourceSplit> getInfluxDBSplit() {
        String sql = this.config.getSql();
        HashSet<InfluxDBSourceSplit> influxDBSourceSplits = new HashSet<InfluxDBSourceSplit>();
        if (this.config.getPartitionNum() == 0) {
            influxDBSourceSplits.add(new InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql));
            return influxDBSourceSplits;
        }
        List<Pair<Long, Long>> rangePairs = InfluxDBSourceSplitEnumerator.genSplitNumRange(this.config.getLowerBound(), this.config.getUpperBound(), this.config.getPartitionNum());
        String[] sqls = sql.split(SourceConfig.SQL_WHERE.key());
        if (sqls.length > 2) {
            throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "sql should not contain more than one where");
        }
        int i = 0;
        while (i < rangePairs.size()) {
            String query = " where (" + this.config.getSplitKey() + " >= " + rangePairs.get(i).getLeft() + " and " + this.config.getSplitKey() + " < " + rangePairs.get(i).getRight() + ") ";
            ++i;
            query = sqls[0] + query;
            if (sqls.length > 1) {
                query = query + " and ( " + sqls[1] + " ) ";
            }
            influxDBSourceSplits.add(new InfluxDBSourceSplit(String.valueOf((long)i + System.nanoTime()), query));
        }
        return influxDBSourceSplits;
    }

    public static List<Pair<Long, Long>> genSplitNumRange(long lowerBound, long upperBound, int splitNum) {
        ArrayList<Pair<Long, Long>> rangeList = new ArrayList<Pair<Long, Long>>();
        int numPartitions = splitNum;
        int size = (int)(upperBound - lowerBound) / numPartitions + 1;
        int remainder = (int)((upperBound + 1L - lowerBound) % (long)numPartitions);
        if (upperBound - lowerBound < (long)numPartitions) {
            numPartitions = (int)(upperBound - lowerBound);
        }
        long currentStart = lowerBound;
        int i = 0;
        while (i < numPartitions) {
            rangeList.add((Pair<Long, Long>)Pair.of((Object)currentStart, (Object)(currentStart + (long)size)));
            currentStart += (long)size;
            if (++i + 1 > numPartitions) continue;
            currentStart -= (long)remainder;
        }
        return rangeList;
    }

    private void addPendingSplit(Collection<InfluxDBSourceSplit> splits) {
        int readerCount = this.context.currentParallelism();
        for (InfluxDBSourceSplit split : splits) {
            int ownerReader = InfluxDBSourceSplitEnumerator.getSplitOwner(split.splitId(), readerCount);
            log.info("Assigning {} to {} reader.", (Object)split, (Object)ownerReader);
            this.pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList()).add(split);
        }
    }

    private void assignSplit(Collection<Integer> readers) {
        log.debug("Assign pendingSplits to readers {}", readers);
        for (int reader : readers) {
            List<InfluxDBSourceSplit> assignmentForReader = this.pendingSplit.remove(reader);
            if (assignmentForReader == null || assignmentForReader.isEmpty()) continue;
            log.info("Assign splits {} to reader {}", assignmentForReader, (Object)reader);
            try {
                this.context.assignSplit(reader, assignmentForReader);
            }
            catch (Exception e) {
                log.error("Failed to assign splits {} to reader {}", new Object[]{assignmentForReader, reader, e});
                this.pendingSplit.put(reader, assignmentForReader);
            }
        }
    }

    private static int getSplitOwner(String tp, int numReaders) {
        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
    }

    public void open() {
    }

    public void close() {
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void handleSplitRequest(int subtaskId) {
        throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", subtaskId));
    }
}

