/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.utils.MetadataTableUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanner;
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CleanPlanActionExecutor<T, I, K, O>
extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {
    private static final Logger LOG = LoggerFactory.getLogger(CleanPlanActionExecutor.class);
    private final Option<Map<String, String>> extraMetadata;

    public CleanPlanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, Option<Map<String, String>> extraMetadata) {
        super(context, config, table, instantTime);
        this.extraMetadata = extraMetadata;
    }

    private int getCommitsSinceLastCleaning() {
        int numCommits;
        Option lastCleanInstant = this.table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
        HoodieTimeline commitTimeline = this.table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
        if (lastCleanInstant.isPresent() && !this.table.getActiveTimeline().isEmpty((HoodieInstant)lastCleanInstant.get())) {
            try {
                HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata((byte[])((byte[])this.table.getActiveTimeline().getInstantDetails((HoodieInstant)lastCleanInstant.get()).get()));
                String lastCompletedCommitTimestamp = cleanMetadata.getLastCompletedCommitTimestamp();
                numCommits = commitTimeline.findInstantsAfter(lastCompletedCommitTimestamp).countInstants();
            }
            catch (IOException e) {
                throw new HoodieIOException("Parsing of last clean instant " + lastCleanInstant.get() + " failed", e);
            }
        } else {
            numCommits = commitTimeline.countInstants();
        }
        return numCommits;
    }

    private boolean needsCleaning(CleaningTriggerStrategy strategy) {
        if (strategy == CleaningTriggerStrategy.NUM_COMMITS) {
            int maxInlineCommitsForNextClean;
            int numberOfCommits = this.getCommitsSinceLastCleaning();
            return numberOfCommits >= (maxInlineCommitsForNextClean = this.config.getCleaningMaxCommits());
        }
        throw new HoodieException("Unsupported cleaning trigger strategy: " + (Object)((Object)this.config.getCleaningTriggerStrategy()));
    }

    HoodieCleanerPlan requestClean(HoodieEngineContext context) {
        try {
            CleanPlanner planner = new CleanPlanner(context, this.table, this.config);
            Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
            context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + this.config.getTableName());
            List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
            if (partitionsToClean.isEmpty()) {
                LOG.info("Nothing to clean here. It is already clean");
                return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
            }
            LOG.info("Earliest commit to retain for clean : " + (earliestInstant.isPresent() ? ((HoodieInstant)earliestInstant.get()).getTimestamp() : "null"));
            LOG.info("Total partitions to clean : " + partitionsToClean.size() + ", with policy " + this.config.getCleanerPolicy());
            int cleanerParallelism = Math.min(partitionsToClean.size(), this.config.getCleanerParallelism());
            LOG.info("Using cleanerParallelism: " + cleanerParallelism);
            context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + this.config.getTableName());
            HashMap<String, List> cleanOps = new HashMap<String, List>();
            ArrayList partitionsToDelete = new ArrayList();
            boolean shouldUseBatchLookup = MetadataTableUtils.shouldUseBatchLookup(this.table.getMetaClient().getTableConfig(), this.config);
            for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
                List<String> subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
                if (shouldUseBatchLookup) {
                    LOG.info("Load partitions and files into file system view in advance. Paths: {}", subPartitionsToClean);
                    this.table.getHoodieView().loadPartitions(subPartitionsToClean);
                }
                Map<String, Pair> cleanOpsWithPartitionMeta = context.map(subPartitionsToClean, (SerializableFunction & Serializable)partitionPathToClean -> Pair.of((Object)partitionPathToClean, planner.getDeletePaths((String)partitionPathToClean, earliestInstant)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue));
                cleanOps.putAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(e -> !((List)((Pair)e.getValue()).getValue()).isEmpty()).collect(Collectors.toMap(Map.Entry::getKey, e -> CleanerUtils.convertToHoodieCleanFileInfoList((List)((List)((Pair)e.getValue()).getValue())))));
                partitionsToDelete.addAll(cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> (Boolean)((Pair)entry.getValue()).getKey()).map(Map.Entry::getKey).collect(Collectors.toList()));
            }
            return new HoodieCleanerPlan((HoodieActionInstant)earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), planner.getLastCompletedCommitTimestamp(), this.config.getCleanerPolicy().name(), Collections.emptyMap(), CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete, this.prepareExtraMetadata(planner.getSavepointedTimestamps()));
        }
        catch (IOException e2) {
            throw new HoodieIOException("Failed to schedule clean operation", e2);
        }
    }

    private Map<String, String> prepareExtraMetadata(List<String> savepointedTimestamps) {
        if (savepointedTimestamps.isEmpty()) {
            return Collections.emptyMap();
        }
        return Collections.singletonMap("savepointed_timestamps", savepointedTimestamps.stream().collect(Collectors.joining(",")));
    }

    protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
        HoodieCleanerPlan cleanerPlan = this.requestClean(this.context);
        Option option = Option.empty();
        if (MapUtils.nonEmpty((Map)cleanerPlan.getFilePathsToBeDeletedPerPartition()) && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
            HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, "clean", startCleanTime);
            try {
                this.table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan((HoodieCleanerPlan)cleanerPlan));
                LOG.info("Requesting Cleaning with instant time " + cleanInstant);
            }
            catch (IOException e) {
                LOG.error("Got exception when saving cleaner requested file", (Throwable)e);
                throw new HoodieIOException(e.getMessage(), e);
            }
            option = Option.of((Object)cleanerPlan);
        }
        return option;
    }

    @Override
    public Option<HoodieCleanerPlan> execute() {
        if (!this.needsCleaning(this.config.getCleaningTriggerStrategy())) {
            return Option.empty();
        }
        return this.requestClean(this.instantTime);
    }
}

