/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.jet.config.DeltaJobConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.AbstractJobProxy;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.JobEventService;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.MasterJobContext;
import com.hazelcast.jet.impl.MasterSnapshotContext;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.impl.Registration;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class MasterContext
implements DynamicMetricsProvider {
    private static final Object NULL_OBJECT = new Object(){

        public String toString() {
            return "NULL_OBJECT";
        }
    };
    private final ReentrantLock lock = new ReentrantLock();
    private final NodeEngineImpl nodeEngine;
    private final JobEventService jobEventService;
    private final JobCoordinationService coordinationService;
    private final ILogger logger;
    private final long jobId;
    private final String jobName;
    private final JobRepository jobRepository;
    private final JobRecord jobRecord;
    private final JobExecutionRecord jobExecutionRecord;
    private volatile JobStatus jobStatus;
    private volatile long executionId;
    private volatile Map<MemberInfo, ExecutionPlan> executionPlanMap;
    private volatile ConcurrentMap<Address, CompletableFuture<Object>> startOperationResponses;
    private final MasterJobContext jobContext;
    private final MasterSnapshotContext snapshotContext;

    MasterContext(NodeEngineImpl nodeEngine, JobCoordinationService coordinationService, @Nonnull JobRecord jobRecord, @Nonnull JobExecutionRecord jobExecutionRecord) {
        this.nodeEngine = nodeEngine;
        this.jobEventService = (JobEventService)nodeEngine.getService("hz:impl:jobEventService");
        this.coordinationService = coordinationService;
        this.jobRepository = coordinationService.jobRepository();
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRecord = jobRecord;
        this.jobExecutionRecord = jobExecutionRecord;
        this.jobId = jobRecord.getJobId();
        this.jobName = jobRecord.getJobNameOrId();
        this.jobStatus = jobExecutionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING;
        this.jobContext = new MasterJobContext(this, nodeEngine.getLogger(MasterJobContext.class));
        this.snapshotContext = this.createMasterSnapshotContext(nodeEngine);
    }

    MasterSnapshotContext createMasterSnapshotContext(NodeEngineImpl nodeEngine) {
        return new MasterSnapshotContext(this, nodeEngine.getLogger(MasterSnapshotContext.class));
    }

    void lock() {
        this.assertLockNotHeld();
        this.lock.lock();
    }

    void unlock() {
        this.lock.unlock();
    }

    void assertLockHeld() {
        assert (this.lock.isHeldByCurrentThread()) : "the lock should be held at this place";
    }

    private void assertLockNotHeld() {
        assert (!this.lock.isHeldByCurrentThread()) : "the lock should not be held at this place";
    }

    public long jobId() {
        return this.jobId;
    }

    public long executionId() {
        return this.executionId;
    }

    public void setExecutionId(long newExecutionId) {
        this.executionId = newExecutionId;
    }

    public JobStatus jobStatus() {
        return this.jobStatus;
    }

    void setJobStatus(JobStatus jobStatus, String description, boolean userRequested) {
        JobStatus oldStatus = this.jobStatus;
        this.jobStatus = jobStatus;
        this.jobContext.setJobMetrics(jobStatus, userRequested);
        this.jobEventService.publishEvent(this.jobId, oldStatus, jobStatus, description, userRequested);
        if (jobStatus.isTerminal()) {
            this.jobEventService.removeAllEventListeners(this.jobId);
        }
    }

    void setJobStatus(JobStatus jobStatus) {
        this.setJobStatus(jobStatus, null, false);
    }

    public JobConfig jobConfig() {
        return this.jobRecord.getConfig();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobConfig updateJobConfig(DeltaJobConfig deltaConfig) {
        this.lock();
        try {
            if (!Util.isJobSuspendable(this.jobConfig())) {
                throw new IllegalStateException("The job " + this.jobName + " is not suspendable, can't perform `updateJobConfig()`");
            }
            if (this.jobStatus != JobStatus.SUSPENDED && this.jobStatus != JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
                throw new IllegalStateException("Job not suspended, but " + this.jobStatus);
            }
            boolean wasSplitBrainProtectionEnabled = this.jobConfig().isSplitBrainProtectionEnabled();
            deltaConfig.applyTo(this.jobConfig());
            this.jobRepository.updateJobRecord(this.jobRecord);
            if (this.jobConfig().isSplitBrainProtectionEnabled() != wasSplitBrainProtectionEnabled) {
                this.updateQuorumSize(this.jobConfig().isSplitBrainProtectionEnabled() ? this.coordinationService.getQuorumSize() : 0);
            }
            JobConfig jobConfig = this.jobConfig();
            return jobConfig;
        }
        finally {
            this.unlock();
        }
    }

    public UUID addStatusListener(Registration registration) {
        this.lock();
        try {
            if (this.jobStatus.isTerminal()) {
                throw AbstractJobProxy.cannotAddStatusListener(this.jobStatus);
            }
            UUID uUID = this.jobEventService.handleAllRegistrations(this.jobId, registration).getId();
            return uUID;
        }
        finally {
            this.unlock();
        }
    }

    boolean metricsEnabled() {
        return this.jobConfig().isMetricsEnabled() && this.nodeEngine.getConfig().getMetricsConfig().isEnabled();
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        if (!this.metricsEnabled()) {
            return;
        }
        descriptor.withTag("job", com.hazelcast.jet.Util.idToString(this.jobId)).withTag("jobName", this.jobName);
        context.collect(descriptor, "status", ProbeLevel.INFO, ProbeUnit.ENUM, this.jobStatus.getId());
    }

    public JobRecord jobRecord() {
        return this.jobRecord;
    }

    public MasterJobContext jobContext() {
        return this.jobContext;
    }

    MasterSnapshotContext snapshotContext() {
        return this.snapshotContext;
    }

    public JobExecutionRecord jobExecutionRecord() {
        return this.jobExecutionRecord;
    }

    String jobName() {
        return this.jobName;
    }

    String jobIdString() {
        return Util.jobNameAndExecutionId(this.jobName, this.executionId);
    }

    public JetServiceBackend getJetServiceBackend() {
        return this.coordinationService.getJetServiceBackend();
    }

    public NodeEngineImpl nodeEngine() {
        return this.nodeEngine;
    }

    public JobRepository jobRepository() {
        return this.jobRepository;
    }

    JobCoordinationService coordinationService() {
        return this.coordinationService;
    }

    Map<MemberInfo, ExecutionPlan> executionPlanMap() {
        return this.executionPlanMap;
    }

    boolean hasTimeout() {
        return this.jobConfig().getTimeoutMillis() > 0L;
    }

    long remainingTime(long currentTimeMillis) {
        long elapsed = currentTimeMillis - this.jobRecord().getCreationTime();
        long timeout = this.jobConfig().getTimeoutMillis();
        return timeout - elapsed;
    }

    ConcurrentMap<Address, CompletableFuture<Object>> startOperationResponses() {
        return this.startOperationResponses;
    }

    void resetStartOperationResponses() {
        this.startOperationResponses = this.executionPlanMap().keySet().stream().collect(Collectors.toConcurrentMap(MemberInfo::getAddress, mi -> new CompletableFuture()));
    }

    void setExecutionPlanMap(Map<MemberInfo, ExecutionPlan> executionPlans) {
        this.executionPlanMap = executionPlans;
    }

    void updateQuorumSize(int newQuorumSize) {
        this.coordinationService().assertOnCoordinatorThread();
        int quorumSize = newQuorumSize > 0 ? this.jobExecutionRecord.setLargerQuorumSize(newQuorumSize) : this.jobExecutionRecord.resetQuorumSize();
        this.writeJobExecutionRecord(false);
        this.logger.info("Quorum size of job " + this.jobIdString() + " is updated from " + quorumSize + " to " + (newQuorumSize > 0 ? Math.max(quorumSize, newQuorumSize) : 0));
    }

    void writeJobExecutionRecord(boolean canCreate) {
        this.coordinationService.assertOnCoordinatorThread();
        try {
            this.coordinationService.jobRepository().writeJobExecutionRecord(this.jobRecord.getJobId(), this.jobExecutionRecord, canCreate);
        }
        catch (RuntimeException e) {
            this.logger.warning("Failed to update JobExecutionRecord", e);
        }
    }

    void writeJobExecutionRecordSafe(boolean canCreate) {
        this.coordinationService.assertOnCoordinatorThread();
        while (!this.coordinationService.jobRepository().writeJobExecutionRecord(this.jobRecord.getJobId(), this.jobExecutionRecord, canCreate)) {
            this.logger.info("Repeating JobExecutionRecord update to be safe");
        }
    }

    void invokeOnParticipants(Function<ExecutionPlan, Operation> operationCtor, @Nullable Consumer<Collection<Map.Entry<MemberInfo, Object>>> completionCallback, @Nullable BiConsumer<Address, Object> individualCallback, boolean retryOnTimeoutException) {
        this.invokeOnParticipants(this.executionPlanMap, operationCtor, completionCallback, individualCallback, retryOnTimeoutException);
    }

    void invokeOnParticipants(List<Address> participants, Function<ExecutionPlan, Operation> operationCtor, @Nullable Consumer<Collection<Map.Entry<MemberInfo, Object>>> completionCallback, @Nullable BiConsumer<Address, Object> individualCallback, boolean retryOnTimeoutException) {
        Map<MemberInfo, ExecutionPlan> entitiesInvokeOn = this.executionPlanMap.entrySet().stream().filter(member -> participants.contains(((MemberInfo)member.getKey()).getAddress())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        this.invokeOnParticipants(entitiesInvokeOn, operationCtor, completionCallback, individualCallback, retryOnTimeoutException);
    }

    private void invokeOnParticipants(Map<MemberInfo, ExecutionPlan> executionPlanMap, Function<ExecutionPlan, Operation> operationCtor, @Nullable Consumer<Collection<Map.Entry<MemberInfo, Object>>> completionCallback, @Nullable BiConsumer<Address, Object> individualCallback, boolean retryOnTimeoutException) {
        ConcurrentHashMap<MemberInfo, Object> responses = new ConcurrentHashMap<MemberInfo, Object>();
        AtomicInteger remainingCount = new AtomicInteger(executionPlanMap.size());
        for (Map.Entry<MemberInfo, ExecutionPlan> entry : executionPlanMap.entrySet()) {
            MemberInfo memberInfo = entry.getKey();
            Supplier<Operation> opSupplier = () -> (Operation)operationCtor.apply((ExecutionPlan)entry.getValue());
            this.invokeOnParticipant(memberInfo, opSupplier, completionCallback, individualCallback, retryOnTimeoutException, responses, remainingCount);
        }
    }

    private void invokeOnParticipant(MemberInfo memberInfo, Supplier<Operation> operationSupplier, @Nullable Consumer<Collection<Map.Entry<MemberInfo, Object>>> completionCallback, @Nullable BiConsumer<Address, Object> individualCallback, boolean retryOnTimeoutException, ConcurrentMap<MemberInfo, Object> collectedResponses, AtomicInteger remainingCount) {
        Operation operation = operationSupplier.get();
        InvocationFuture future = this.nodeEngine.getOperationService().createInvocationBuilder("hz:impl:jetService", operation, memberInfo.getAddress()).invoke();
        ((InternalCompletableFuture)future).whenCompleteAsync(com.hazelcast.internal.util.ExceptionUtil.withTryCatch(this.logger, (r, throwable) -> {
            Object response;
            Object object = r != null ? r : (response = throwable != null ? ExceptionUtil.peel(throwable) : NULL_OBJECT);
            if (retryOnTimeoutException && throwable instanceof OperationTimeoutException) {
                this.logger.warning("Retrying " + operation.getClass().getName() + " that failed with " + OperationTimeoutException.class.getSimpleName() + " in " + this.jobIdString());
                this.invokeOnParticipant(memberInfo, operationSupplier, completionCallback, individualCallback, retryOnTimeoutException, collectedResponses, remainingCount);
                return;
            }
            if (individualCallback != null) {
                individualCallback.accept(memberInfo.getAddress(), throwable != null ? ExceptionUtil.peel(throwable) : r);
            }
            Object oldResponse = collectedResponses.put(memberInfo, response);
            assert (oldResponse == null) : "Duplicate response for " + memberInfo.getAddress() + ". Old=" + oldResponse + ", new=" + response;
            if (remainingCount.decrementAndGet() == 0 && completionCallback != null) {
                completionCallback.accept(collectedResponses.entrySet().stream().map(e -> e.getValue() == NULL_OBJECT ? com.hazelcast.jet.Util.entry((MemberInfo)e.getKey(), null) : e).collect(Collectors.toList()));
            }
        }));
    }
}

