package org.apache.flink.table.filesystem.stream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/table/filesystem/stream/ProcTimeCommitTrigger.class */
public class ProcTimeCommitTrigger implements PartitionCommitTrigger {
    private static final ListStateDescriptor<Map<String, Long>> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>("pending-partitions-with-time", new MapSerializer(StringSerializer.INSTANCE, LongSerializer.INSTANCE));
    private final ListState<Map<String, Long>> pendingPartitionsState;
    private final Map<String, Long> pendingPartitions = new HashMap();
    private final long commitDelay;
    private final ProcessingTimeService procTimeService;

    public ProcTimeCommitTrigger(boolean z, OperatorStateStore operatorStateStore, Configuration configuration, ProcessingTimeService processingTimeService) throws Exception {
        this.pendingPartitionsState = operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        if (z) {
            this.pendingPartitions.putAll((Map) ((Iterable) this.pendingPartitionsState.get()).iterator().next());
        }
        this.procTimeService = processingTimeService;
        this.commitDelay = ((Duration) configuration.get(FileSystemOptions.SINK_PARTITION_COMMIT_DELAY)).toMillis();
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public void addPartition(String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            return;
        }
        this.pendingPartitions.putIfAbsent(str, Long.valueOf(this.procTimeService.getCurrentProcessingTime()));
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public List<String> committablePartitions(long j) {
        ArrayList arrayList = new ArrayList();
        long currentProcessingTime = this.procTimeService.getCurrentProcessingTime();
        Iterator<Map.Entry<String, Long>> it = this.pendingPartitions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Long> next = it.next();
            long longValue = next.getValue().longValue();
            if (this.commitDelay == 0 || currentProcessingTime > longValue + this.commitDelay) {
                arrayList.add(next.getKey());
                it.remove();
            }
        }
        return arrayList;
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public void snapshotState(long j, long j2) throws Exception {
        this.pendingPartitionsState.clear();
        this.pendingPartitionsState.add(new HashMap(this.pendingPartitions));
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public List<String> endInput() {
        ArrayList arrayList = new ArrayList(this.pendingPartitions.keySet());
        this.pendingPartitions.clear();
        return arrayList;
    }
}
