/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
import com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

class BootstrapperImpl
implements Bootstrapper {
    private final Logger logger = LoggerFactory.getLogger(BootstrapperImpl.class);
    private final PartitionSynchronizer synchronizer;
    private final LeaseStore leaseStore;
    private final LeaseStoreManager epkRangeVersionLeaseStoreManager;
    private final LeaseStoreManager pkRangeVersionLeaseStoreManager;
    private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private final ChangeFeedMode changeFeedModeToStart;
    private final Duration lockTime;
    private final Duration sleepTime;
    private volatile boolean isInitialized;
    private volatile boolean isLockAcquired;

    public BootstrapperImpl(PartitionSynchronizer synchronizer, LeaseStore leaseStore, Duration lockTime, Duration sleepTime, LeaseStoreManager epkRangeVersionLeaseStoreManager, LeaseStoreManager pkRangeVersionLeaseStoreManager, ChangeFeedProcessorOptions changeFeedProcessorOptions, ChangeFeedMode changeFeedModeToStart) {
        Preconditions.checkNotNull(synchronizer, "Argument 'synchronizer' can not be null");
        Preconditions.checkNotNull(leaseStore, "Argument 'leaseStore' can not be null");
        Preconditions.checkNotNull(epkRangeVersionLeaseStoreManager, "Argument 'epkRangeVersionLeaseStoreManager' can not be null");
        Preconditions.checkNotNull(pkRangeVersionLeaseStoreManager, "Argument 'pkRangeVersionLeaseStoreManager' can not be null");
        Preconditions.checkNotNull(changeFeedProcessorOptions, "Argument 'changeFeedProcessorOptions' can not be null");
        Preconditions.checkNotNull(changeFeedModeToStart, "Argument 'changeFeedModeToStart' can not be null");
        Preconditions.checkArgument(lockTime != null && this.isPositive(lockTime), "lockTime should be non-null and positive");
        Preconditions.checkArgument(sleepTime != null && this.isPositive(sleepTime), "sleepTime should be non-null and positive");
        this.synchronizer = synchronizer;
        this.leaseStore = leaseStore;
        this.epkRangeVersionLeaseStoreManager = epkRangeVersionLeaseStoreManager;
        this.pkRangeVersionLeaseStoreManager = pkRangeVersionLeaseStoreManager;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.changeFeedModeToStart = changeFeedModeToStart;
        this.lockTime = lockTime;
        this.sleepTime = sleepTime;
        this.isInitialized = false;
    }

    private boolean isPositive(Duration duration) {
        return !duration.isNegative() && !duration.isZero();
    }

    @Override
    public Mono<Void> initialize() {
        this.isInitialized = false;
        return Mono.just((Object)this).flatMap(value -> this.leaseStore.isInitialized()).flatMap(initialized -> {
            this.isInitialized = initialized;
            Mono<Void> previousOperation = Mono.empty();
            if (initialized.booleanValue()) {
                if (!this.changeFeedProcessorOptions.isLeaseVerificationEnabledOnRestart()) {
                    return this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
                }
                previousOperation = this.validateLeaseCFModeInteroperabilityForEpkRangeBasedLease();
            }
            this.logger.info("Acquire initialization lock");
            return previousOperation.then(this.leaseStore.acquireInitializationLock(this.lockTime).flatMap(lockAcquired -> {
                this.isLockAcquired = lockAcquired;
                if (!this.isLockAcquired) {
                    this.logger.info("Another instance is initializing the store");
                    return Mono.just((Object)this.isLockAcquired).delayElement(this.sleepTime, CosmosSchedulers.COSMOS_PARALLEL);
                }
                return this.synchronizer.createMissingLeases().then(!this.isInitialized ? this.leaseStore.markInitialized().flatMap(initSucceeded -> Mono.just((Object)lockAcquired)) : Mono.just((Object)lockAcquired));
            }).onErrorResume(throwable -> {
                this.logger.warn("Unexpected exception caught while initializing the lock", throwable);
                return Mono.just((Object)this.isLockAcquired);
            }).flatMap(lockAcquired -> {
                if (this.isLockAcquired) {
                    return this.leaseStore.releaseInitializationLock();
                }
                return Mono.just((Object)lockAcquired);
            }));
        }).repeat(() -> !this.isInitialized).then();
    }

    private Mono<Void> validateLeaseCFModeInteroperabilityForEpkRangeBasedLease() {
        return this.pkRangeVersionLeaseStoreManager.getTopLeases(1).next().flatMap(lease -> {
            if (lease.getVersion() == LeaseVersion.PARTITION_KEY_BASED_LEASE) {
                String errorMessage = String.format("ChangeFeedProcessor#handleAllVersionsAndDeletes cannot be invoked whenChangeFeedProcessor#handleChanges was also started forlease prefix : %s", this.changeFeedProcessorOptions.getLeasePrefix());
                return Mono.error((Throwable)new IllegalStateException(errorMessage));
            }
            return Mono.empty();
        }).switchIfEmpty(this.epkRangeVersionLeaseStoreManager.getTopLeases(1).next()).flatMap(epkRangeVersionLease -> Mono.just((Object)((Lease)epkRangeVersionLease))).flatMap(lease -> {
            ChangeFeedState changeFeedState;
            if (lease.getVersion() == LeaseVersion.EPK_RANGE_BASED_LEASE && !Strings.isNullOrEmpty(lease.getId()) && !Strings.isNullOrEmpty(lease.getContinuationToken()) && (changeFeedState = ChangeFeedState.fromString(lease.getContinuationToken())).getMode() != this.changeFeedModeToStart) {
                String errorMessage = String.format("ChangeFeedProcessor#handleAllVersionsAndDeletes cannot be invoked when ChangeFeedProcessor#handleLatestVersionChanges were also started for lease prefix : %s", this.changeFeedProcessorOptions.getLeasePrefix());
                return Mono.error((Throwable)new IllegalStateException(errorMessage));
            }
            return Mono.empty();
        });
    }
}

