/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.cluster.impl.MasterNodeChangedException;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.partition.impl.PartitionServiceState;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.internal.util.executor.ExecutorType;
import com.hazelcast.internal.util.executor.ManagedExecutorService;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetMemberSelector;
import com.hazelcast.jet.JobAlreadyExistsException;
import com.hazelcast.jet.config.DeltaJobConfig;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.JobSuspensionCause;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.AbstractJobProxy;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobAndSqlSummary;
import com.hazelcast.jet.impl.JobClassLoaderService;
import com.hazelcast.jet.impl.JobEventService;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobInvocationObserver;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.JobSummary;
import com.hazelcast.jet.impl.LightMasterContext;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.MasterJobContext;
import com.hazelcast.jet.impl.SqlSummary;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.EnteringPassiveClusterStateException;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.observer.ObservableImpl;
import com.hazelcast.jet.impl.observer.WrappedThrowable;
import com.hazelcast.jet.impl.operation.GetJobIdsOperation;
import com.hazelcast.jet.impl.pipeline.PipelineImpl;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.security.SecurityContext;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.impl.Registration;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.version.Version;
import java.security.Permission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.security.auth.Subject;

public class JobCoordinationService
implements DynamicMetricsProvider {
    private static final String COORDINATOR_EXECUTOR_NAME = "jet:coordinator";
    private static final long RETRY_DELAY_IN_MILLIS = TimeUnit.SECONDS.toMillis(2L);
    private static final ThreadLocal<Boolean> IS_JOB_COORDINATOR_THREAD = ThreadLocal.withInitial(() -> false);
    private static final int COORDINATOR_THREADS_POOL_SIZE = 4;
    private static final int MIN_JOB_SCAN_PERIOD_MILLIS = 100;
    private static final Object UNINITIALIZED_LIGHT_JOB_MARKER = new Object();
    final List<JobInvocationObserver> jobInvocationObservers = new ArrayList<JobInvocationObserver>();
    private final NodeEngineImpl nodeEngine;
    private final JetServiceBackend jetServiceBackend;
    private final JetConfig config;
    private final PipelineImpl.Context pipelineToDagContext;
    private final ILogger logger;
    private final JobRepository jobRepository;
    private final ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap<Long, MasterContext>();
    private final ConcurrentMap<Long, Object> lightMasterContexts = new ConcurrentHashMap<Long, Object>();
    private final ConcurrentMap<UUID, CompletableFuture<Void>> membersShuttingDown = new ConcurrentHashMap<UUID, CompletableFuture<Void>>();
    private final ConcurrentMap<Long, ScheduledFuture<?>> scheduledJobTimeouts = new ConcurrentHashMap();
    private final Map<UUID, Long> removedMembers = new ConcurrentHashMap<UUID, Long>();
    private final Object lock = new Object();
    private volatile boolean isClusterEnteringPassiveState;
    private volatile boolean jobsScanned;
    private final AtomicInteger scaleUpScheduledCount = new AtomicInteger();
    @Probe(name="jobs.submitted")
    private final Counter jobSubmitted = MwCounter.newMwCounter();
    @Probe(name="jobs.completedSuccessfully")
    private final Counter jobCompletedSuccessfully = MwCounter.newMwCounter();
    @Probe(name="jobs.completedWithFailure")
    private final Counter jobCompletedWithFailure = MwCounter.newMwCounter();
    private long maxJobScanPeriodInMillis;

    JobCoordinationService(NodeEngineImpl nodeEngine, JetServiceBackend jetServiceBackend, JetConfig config, JobRepository jobRepository) {
        this.nodeEngine = nodeEngine;
        this.jetServiceBackend = jetServiceBackend;
        this.config = config;
        this.pipelineToDagContext = this.config::getCooperativeThreadCount;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRepository = jobRepository;
        ExecutionService executionService = nodeEngine.getExecutionService();
        executionService.register(COORDINATOR_EXECUTOR_NAME, 4, Integer.MAX_VALUE, ExecutorType.CACHED);
        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
        MetricDescriptor descriptor = registry.newMetricDescriptor().withTag("module", "jet");
        registry.registerStaticMetrics(descriptor, this);
    }

    public JobRepository jobRepository() {
        return this.jobRepository;
    }

    public void startScanningForJobs() {
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        HazelcastProperties properties = this.nodeEngine.getProperties();
        this.maxJobScanPeriodInMillis = properties.getMillis(ClusterProperty.JOB_SCAN_PERIOD);
        try {
            executionService.schedule(COORDINATOR_EXECUTOR_NAME, this::scanJobs, 0L, TimeUnit.MILLISECONDS);
            this.logger.info("Jet started scanning for jobs");
        }
        catch (RejectedExecutionException ex) {
            this.logger.info("Scan jobs task is rejected on the execution service since the executor service has shutdown", ex);
        }
    }

    public CompletableFuture<Void> submitJob(long jobId, Data serializedJobDefinition, JobConfig jobConfig, Subject subject) {
        CompletableFuture<Void> res = new CompletableFuture<Void>();
        this.submitToCoordinatorThread(() -> {
            MasterContext masterContext;
            try {
                MasterContext prev;
                boolean hasDuplicateJobName;
                void var11_13;
                DAG dag;
                this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " to non-master node");
                this.checkOperationalState();
                JobResult jobResult = this.jobRepository.getJobResult(jobId);
                if (jobResult != null) {
                    this.logger.fine("Not starting job %s since already completed with result: %s", com.hazelcast.jet.Util.idToString(jobId), jobResult);
                    return;
                }
                if (!this.config.isResourceUploadEnabled() && !jobConfig.getResourceConfigs().isEmpty()) {
                    throw new JetException(Util.JET_RESOURCE_UPLOAD_DISABLED_MESSAGE);
                }
                Object jobDefinition = this.deserializeJobDefinition(jobId, jobConfig, serializedJobDefinition);
                if (jobDefinition instanceof PipelineImpl) {
                    PipelineImpl pipeline = (PipelineImpl)jobDefinition;
                    dag = pipeline.toDag(this.pipelineToDagContext);
                    dag.setMemberSelector(pipeline.memberSelector());
                    Object b = this.nodeEngine.getSerializationService().toData(dag);
                } else {
                    dag = (DAG)jobDefinition;
                    Data data = serializedJobDefinition;
                }
                this.validateJob(dag, subject);
                Set<String> ownedObservables = JobCoordinationService.ownedObservables(dag);
                JobRecord jobRecord = new JobRecord(this.nodeEngine.getClusterService().getClusterVersion(), jobId, (Data)var11_13, this.dagToJson(dag), jobConfig, ownedObservables, subject);
                int quorumSize = jobConfig.isSplitBrainProtectionEnabled() ? this.getQuorumSize() : 0;
                JobExecutionRecord jobExecutionRecord = new JobExecutionRecord(jobId, quorumSize);
                masterContext = this.createMasterContext(jobRecord, jobExecutionRecord);
                masterContext.setMemberSelector(dag.memberSelector());
                Object object = this.lock;
                // MONITORENTER : object
                this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " to non-master node");
                this.checkOperationalState();
                boolean bl = hasDuplicateJobName = jobConfig.getName() != null && this.hasActiveJobWithName(jobConfig.getName());
                if (!hasDuplicateJobName && (prev = this.masterContexts.putIfAbsent(jobId, masterContext)) != null) {
                    this.logger.fine("Joining to already existing masterContext %s", prev.jobIdString());
                    // MONITOREXIT : object
                    return;
                }
                // MONITOREXIT : object
                if (hasDuplicateJobName) {
                    this.jobRepository.deleteJob(jobId, !jobConfig.getResourceConfigs().isEmpty());
                    throw new JobAlreadyExistsException("Another active job with equal name (" + jobConfig.getName() + ") exists: " + com.hazelcast.jet.Util.idToString(jobId));
                }
                if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
                    return;
                }
                this.onJobSubmitted(dag);
                this.jobRepository.putNewJobRecord(jobRecord);
                this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(masterContext.jobId()) + " based on submit request");
            }
            catch (Throwable e) {
                this.jetServiceBackend.getJobClassLoaderService().tryRemoveClassloadersForJob(jobId, JobClassLoaderService.JobPhase.COORDINATOR);
                res.completeExceptionally(e);
                throw e;
            }
            finally {
                res.complete(null);
            }
            masterContext.jobContext().tryStartJob();
        });
        return res;
    }

    public CompletableFuture<Void> submitLightJob(long jobId, Object jobDefinition, Data serializedJobDefinition, JobConfig jobConfig, Subject subject) {
        DAG dag;
        if (jobDefinition == null) {
            jobDefinition = this.nodeEngine.getSerializationService().toObject(serializedJobDefinition);
        }
        if (jobDefinition instanceof PipelineImpl) {
            PipelineImpl pipeline = (PipelineImpl)jobDefinition;
            dag = pipeline.toDag(this.pipelineToDagContext);
            dag.setMemberSelector(pipeline.memberSelector());
        } else {
            dag = (DAG)jobDefinition;
        }
        this.validateJob(dag, subject);
        Object oldContext = this.lightMasterContexts.putIfAbsent(jobId, UNINITIALIZED_LIGHT_JOB_MARKER);
        if (oldContext != null) {
            throw new JetException("duplicate jobId " + com.hazelcast.jet.Util.idToString(jobId));
        }
        return LightMasterContext.createContext(this.nodeEngine, this, dag, jobId, jobConfig, subject).thenComposeAsync(mc -> {
            Object oldCtx = this.lightMasterContexts.put(jobId, mc);
            assert (oldCtx == UNINITIALIZED_LIGHT_JOB_MARKER);
            this.scheduleJobTimeout(jobId, jobConfig.getTimeoutMillis());
            return mc.getCompletionFuture().whenComplete((r, t) -> {
                Object removed = this.lightMasterContexts.remove(jobId);
                assert (removed instanceof LightMasterContext) : "LMC not found: " + String.valueOf(removed);
                this.unscheduleJobTimeout(jobId);
            });
        }, (Executor)this.coordinationExecutor());
    }

    private void validateJob(DAG dag, Subject subject) {
        SecurityContext securityContext = this.nodeEngine.getNode().securityContext;
        if (securityContext != null && subject != null) {
            for (Vertex vertex : dag) {
                Permission requiredPermission = vertex.getMetaSupplier().getRequiredPermission();
                if (requiredPermission == null) continue;
                securityContext.checkPermission(subject, requiredPermission);
            }
        }
        if (dag.memberSelector() != null) {
            this.validateIsolatedJob(dag.memberSelector());
        }
    }

    protected void validateIsolatedJob(@Nonnull JetMemberSelector memberSelector) {
        throw new UnsupportedOperationException("The Isolated Jobs feature is only available in Hazelcast Enterprise Edition.");
    }

    protected void onJobSubmitted(DAG dag) {
        this.jobSubmitted.inc();
    }

    public long getJobSubmittedCount() {
        return this.jobSubmitted.get();
    }

    public JobConfig getLightJobConfig(long jobId) {
        Object mc = this.lightMasterContexts.get(jobId);
        if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
            throw new JobNotFoundException(jobId);
        }
        return ((LightMasterContext)mc).getJobConfig();
    }

    private static Set<String> ownedObservables(DAG dag) {
        return dag.vertices().stream().map(vertex -> vertex.getMetaSupplier().getTags().get(ObservableImpl.OWNED_OBSERVABLE)).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    MasterContext createMasterContext(JobRecord jobRecord, JobExecutionRecord jobExecutionRecord) {
        return new MasterContext(this.nodeEngine, this, jobRecord, jobExecutionRecord);
    }

    private boolean hasActiveJobWithName(@Nonnull String jobName) {
        if (!this.jobsScanned) {
            throw new RetryableHazelcastException("Cannot submit job with name '" + jobName + "' before the master node initializes job coordination service's state");
        }
        return this.masterContexts.values().stream().anyMatch(ctx -> jobName.equals(ctx.jobConfig().getName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> prepareForPassiveClusterState() {
        this.assertIsMaster("Cannot prepare for passive cluster state on a non-master node");
        Object object = this.lock;
        synchronized (object) {
            this.isClusterEnteringPassiveState = true;
        }
        return this.submitToCoordinatorThread(() -> {
            CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.jobContext().gracefullyTerminateOrCancel()).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures);
        }).thenCompose(Function.identity());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clusterChangeDone() {
        Object object = this.lock;
        synchronized (object) {
            this.isClusterEnteringPassiveState = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reset() {
        ArrayList contexts;
        assert (!this.isMaster()) : "this member is a master";
        Object object = this.lock;
        synchronized (object) {
            contexts = new ArrayList(this.masterContexts.values());
            this.masterContexts.clear();
            this.jobsScanned = false;
        }
        contexts.forEach(ctx -> ctx.jobContext().setFinalResult(new CancellationException()));
    }

    public CompletableFuture<Void> joinSubmittedJob(long jobId) {
        this.checkOperationalState();
        CompletableFuture<CompletableFuture> future = this.callWithJob(jobId, mc -> mc.jobContext().jobCompletionFuture().handle((r, t) -> {
            if (t == null) {
                return null;
            }
            if (t instanceof CancellationException || t instanceof JetException) {
                throw ExceptionUtil.sneakyThrow(t);
            }
            throw new JetException(ExceptionUtil.toString(t));
        }), JobResult::asCompletableFuture, jobRecord -> {
            JobExecutionRecord jobExecutionRecord = this.ensureExecutionRecord(jobId, this.jobRepository.getJobExecutionRecord(jobId));
            return this.startJobIfNotStartedOrCompleted((JobRecord)jobRecord, jobExecutionRecord, "join request from client");
        }, null);
        return future.thenCompose(Function.identity());
    }

    public CompletableFuture<Void> joinLightJob(long jobId) {
        Object mc = this.lightMasterContexts.get(jobId);
        if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
            throw new JobNotFoundException(jobId);
        }
        return ((LightMasterContext)mc).getCompletionFuture();
    }

    public CompletableFuture<Void> terminateJob(long jobId, TerminationMode terminationMode, boolean userInitiated) {
        return this.runWithJob(jobId, masterContext -> {
            JobStatus jobStatus = masterContext.jobStatus();
            if (jobStatus != JobStatus.RUNNING && terminationMode != TerminationMode.CANCEL_FORCEFUL) {
                throw new IllegalStateException("Cannot " + String.valueOf((Object)terminationMode) + ", job status is " + String.valueOf((Object)jobStatus) + ", should be " + String.valueOf((Object)JobStatus.RUNNING));
            }
            String terminationResult = masterContext.jobContext().requestTermination(terminationMode, false, userInitiated).f1();
            if (terminationResult != null) {
                throw new IllegalStateException("Cannot " + String.valueOf((Object)terminationMode) + ": " + terminationResult);
            }
        }, jobResult -> {
            if (terminationMode != TerminationMode.CANCEL_FORCEFUL) {
                throw new IllegalStateException("Cannot " + String.valueOf((Object)terminationMode) + " job " + com.hazelcast.jet.Util.idToString(jobId) + " because it already has a result: " + String.valueOf(jobResult));
            }
            this.logger.fine("Ignoring cancellation of a completed job %s", com.hazelcast.jet.Util.idToString(jobId));
        }, jobRecord -> {
            throw new RetryableHazelcastException("No MasterContext found for job " + com.hazelcast.jet.Util.idToString(jobId) + " for " + String.valueOf((Object)terminationMode));
        });
    }

    public void terminateLightJob(long jobId, boolean userInitiated) {
        Object mc = this.lightMasterContexts.get(jobId);
        if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
            throw new JobNotFoundException(jobId);
        }
        ((LightMasterContext)mc).requestTermination(userInitiated);
    }

    public CompletableFuture<GetJobIdsOperation.GetJobIdsResult> getJobIdById(long jobId) {
        return this.submitToCoordinatorThread(() -> {
            Object lmc = this.lightMasterContexts.get(jobId);
            if (lmc != null && lmc != UNINITIALIZED_LIGHT_JOB_MARKER) {
                return new GetJobIdsOperation.GetJobIdsResult(jobId, true);
            }
            if (this.isMaster()) {
                try {
                    this.callWithJob(jobId, mc -> null, jobResult -> null, jobRecord -> null, null).get();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof JobNotFoundException) {
                        return GetJobIdsOperation.GetJobIdsResult.EMPTY;
                    }
                    throw e;
                }
                return new GetJobIdsOperation.GetJobIdsResult(jobId, false);
            }
            return GetJobIdsOperation.GetJobIdsResult.EMPTY;
        });
    }

    public CompletableFuture<GetJobIdsOperation.GetJobIdsResult> getLightJobIds() {
        return this.submitToCoordinatorThread(() -> {
            ArrayList<Tuple2<Long, Boolean>> result = new ArrayList<Tuple2<Long, Boolean>>();
            for (Object ctx : this.lightMasterContexts.values()) {
                if (ctx == UNINITIALIZED_LIGHT_JOB_MARKER) continue;
                result.add(Tuple2.tuple2(((LightMasterContext)ctx).getJobId(), true));
            }
            return new GetJobIdsOperation.GetJobIdsResult(result);
        });
    }

    public CompletableFuture<GetJobIdsOperation.GetJobIdsResult> getAllJobsId() {
        this.assertIsMaster("This method is allowed to run only on the master node!");
        return this.submitToCoordinatorThread(() -> {
            ArrayList<Tuple2<Long, Boolean>> result = new ArrayList<Tuple2<Long, Boolean>>();
            for (Object ctx : this.lightMasterContexts.values()) {
                if (ctx == UNINITIALIZED_LIGHT_JOB_MARKER) continue;
                result.add(Tuple2.tuple2(((LightMasterContext)ctx).getJobId(), true));
            }
            for (Long jobId : this.jobRepository.getAllJobIds()) {
                result.add(Tuple2.tuple2(jobId, false));
            }
            return new GetJobIdsOperation.GetJobIdsResult(result);
        });
    }

    public CompletableFuture<GetJobIdsOperation.GetJobIdsResult> getNormalJobIdsByName(@Nonnull String name) {
        return this.submitToCoordinatorThread(() -> {
            ArrayList<Tuple2<Long, Boolean>> result = new ArrayList<Tuple2<Long, Boolean>>();
            HashMap<Long, Long> jobs = new HashMap<Long, Long>();
            for (MasterContext ctx : this.masterContexts.values()) {
                if (!name.equals(ctx.jobConfig().getName())) continue;
                jobs.put(ctx.jobId(), Long.MAX_VALUE);
            }
            for (JobResult jobResult : this.jobRepository.getJobResults(name)) {
                jobs.put(jobResult.getJobId(), jobResult.getCreationTime());
            }
            jobs.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue).thenComparing(Map.Entry::getKey).reversed()).forEach(entry -> result.add(Tuple2.tuple2((Long)entry.getKey(), false)));
            return new GetJobIdsOperation.GetJobIdsResult(result);
        });
    }

    public CompletableFuture<JobStatus> getJobStatus(long jobId) {
        return this.callWithJob(jobId, JobCoordinationService::determineJobStatusFromMasterContext, JobResult::getJobStatus, jobRecord -> JobStatus.NOT_RUNNING, jobExecutionRecord -> jobExecutionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING);
    }

    private static JobStatus determineJobStatusFromMasterContext(MasterContext mc) {
        Optional<MasterJobContext.TerminationRequest> maybeTerminationRequest = mc.jobContext().getTerminationRequest();
        JobStatus jobStatus = mc.jobStatus();
        return jobStatus == JobStatus.RUNNING && maybeTerminationRequest.isPresent() ? JobStatus.COMPLETING : jobStatus;
    }

    private static boolean determineIsJobUserCancelledFromMasterContext(MasterContext mc) {
        boolean userInitiatedTermination = mc.jobContext().isUserInitiatedTermination();
        JobStatus jobStatus = mc.jobStatus();
        switch (jobStatus) {
            case COMPLETED: {
                return false;
            }
            case FAILED: {
                return userInitiatedTermination;
            }
        }
        throw new IllegalStateException("Job not finished");
    }

    public CompletableFuture<JobSuspensionCause> getJobSuspensionCause(long jobId) {
        FunctionEx<JobExecutionRecord, JobSuspensionCause> jobExecutionRecordHandler = jobExecutionRecord -> {
            JobSuspensionCause cause = jobExecutionRecord.getSuspensionCause();
            if (cause == null) {
                throw new IllegalStateException("Job not suspended");
            }
            return cause;
        };
        return this.callWithJob(jobId, mc -> {
            JobExecutionRecord jobExecutionRecord = mc.jobExecutionRecord();
            return (JobSuspensionCause)jobExecutionRecordHandler.apply(jobExecutionRecord);
        }, jobResult -> {
            throw new IllegalStateException("Job not suspended");
        }, jobRecord -> {
            throw new IllegalStateException("Job not suspended");
        }, jobExecutionRecordHandler);
    }

    public CompletableFuture<Boolean> isJobUserCancelled(long jobId) {
        return this.callWithJob(jobId, JobCoordinationService::determineIsJobUserCancelledFromMasterContext, JobResult::isUserCancelled, jobRecord -> {
            throw new IllegalStateException("Job not finished");
        }, null);
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        try {
            descriptor.withTag("module", "jet");
            this.masterContexts.forEach((id, ctx) -> ctx.provideDynamicMetrics(descriptor.copy(), context));
        }
        catch (Throwable t) {
            this.logger.warning("Dynamic metric collection failed", t);
            throw t;
        }
    }

    public CompletableFuture<List<RawJobMetrics>> getJobMetrics(long jobId) {
        CompletableFuture<List<RawJobMetrics>> cf = new CompletableFuture<List<RawJobMetrics>>();
        this.runWithJob(jobId, mc -> mc.jobContext().collectMetrics(cf), jobResult -> {
            List<RawJobMetrics> metrics = this.jobRepository.getJobMetrics(jobId);
            cf.complete(metrics != null ? metrics : Collections.emptyList());
        }, jobRecord -> cf.complete(Collections.emptyList()));
        return cf;
    }

    public CompletableFuture<Long> getJobSubmissionTime(long jobId, boolean isLightJob) {
        if (isLightJob) {
            Object mc2 = this.lightMasterContexts.get(jobId);
            if (mc2 == null || mc2 == UNINITIALIZED_LIGHT_JOB_MARKER) {
                throw new JobNotFoundException(jobId);
            }
            return CompletableFuture.completedFuture(((LightMasterContext)mc2).getStartTime());
        }
        return this.callWithJob(jobId, mc -> mc.jobRecord().getCreationTime(), JobResult::getCreationTime, JobRecord::getCreationTime, null);
    }

    public CompletableFuture<Void> resumeJob(long jobId) {
        return this.runWithJob(jobId, masterContext -> masterContext.jobContext().resumeJob(), jobResult -> {
            throw new IllegalStateException("Job already completed");
        }, jobRecord -> {
            throw new RetryableHazelcastException("Job " + com.hazelcast.jet.Util.idToString(jobId) + " not yet discovered");
        });
    }

    @Deprecated(since="5.3", forRemoval=true)
    public CompletableFuture<List<JobSummary>> getJobSummaryList() {
        return this.getJobAndSqlSummaryList().thenApply(jobAndSqlSummaries -> jobAndSqlSummaries.stream().map(this::toJobSummary).collect(Collectors.toList()));
    }

    private JobSummary toJobSummary(JobAndSqlSummary jobAndSqlSummary) {
        return new JobSummary(jobAndSqlSummary.isLightJob(), jobAndSqlSummary.getJobId(), jobAndSqlSummary.getExecutionId(), jobAndSqlSummary.getNameOrId(), jobAndSqlSummary.getStatus(), jobAndSqlSummary.getSubmissionTime(), jobAndSqlSummary.getCompletionTime(), jobAndSqlSummary.getFailureText());
    }

    public CompletableFuture<List<JobAndSqlSummary>> getJobAndSqlSummaryList() {
        return this.submitToCoordinatorThread(() -> {
            HashMap jobs = new HashMap();
            if (this.isMaster()) {
                this.jobRepository.getJobRecords().stream().map(this::getJobAndSqlSummary).forEach(s -> jobs.put(s.getJobId(), s));
                this.jobRepository.getJobResults().stream().map(r -> {
                    JobExecutionRecord executionRecord = this.jobRepository.getJobExecutionRecord(r.getJobId());
                    return new JobAndSqlSummary(false, r.getJobId(), 0L, r.getJobNameOrId(), r.getJobStatus(), r.getCreationTime(), r.getCompletionTime(), r.getFailureText(), JobCoordinationService.getSqlSummary(r.getJobConfig()), executionRecord == null || executionRecord.getSuspensionCause() == null ? null : executionRecord.getSuspensionCause().description(), r.isUserCancelled());
                }).forEach(s -> jobs.put(s.getJobId(), s));
            }
            this.lightMasterContexts.values().stream().filter(lmc -> lmc != UNINITIALIZED_LIGHT_JOB_MARKER).map(LightMasterContext.class::cast).map(this::getJobAndSqlSummary).forEach(s -> jobs.put(s.getJobId(), s));
            return jobs.values().stream().sorted(Comparator.comparing(JobAndSqlSummary::getSubmissionTime).reversed()).collect(Collectors.toList());
        });
    }

    private JobAndSqlSummary getJobAndSqlSummary(LightMasterContext lmc) {
        SqlSummary sqlSummary = JobCoordinationService.getSqlSummary(lmc.getJobConfig());
        return new JobAndSqlSummary(true, lmc.getJobId(), lmc.getJobId(), com.hazelcast.jet.Util.idToString(lmc.getJobId()), JobStatus.RUNNING, lmc.getStartTime(), 0L, null, sqlSummary, null, false);
    }

    @Nullable
    private static SqlSummary getSqlSummary(JobConfig jobConfig) {
        String query = (String)jobConfig.getArgument("__sql.queryText");
        Object unbounded = jobConfig.getArgument("__sql.queryUnbounded");
        SqlSummary sqlSummary = query != null && unbounded != null ? new SqlSummary(query, Boolean.TRUE.equals(unbounded)) : null;
        return sqlSummary;
    }

    public CompletableFuture<JobConfig> updateJobConfig(long jobId, @Nonnull DeltaJobConfig deltaConfig) {
        return this.callWithJob(jobId, masterContext -> masterContext.updateJobConfig(deltaConfig), jobResult -> {
            throw new IllegalStateException("Job not suspended, but " + String.valueOf((Object)jobResult.getJobStatus()));
        }, jobRecord -> {
            throw new IllegalStateException("Job not suspended");
        }, null);
    }

    public CompletableFuture<UUID> addJobStatusListener(long jobId, boolean isLightJob, Registration registration) {
        if (isLightJob) {
            Object mc = this.lightMasterContexts.get(jobId);
            if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
                throw new JobNotFoundException(jobId);
            }
            return CompletableFuture.completedFuture(((LightMasterContext)mc).addStatusListener(registration));
        }
        return this.callWithJob(jobId, masterContext -> masterContext.addStatusListener(registration), jobResult -> {
            throw AbstractJobProxy.cannotAddStatusListener(jobResult.getJobStatus());
        }, jobRecord -> {
            JobEventService jobEventService = (JobEventService)this.nodeEngine.getService("hz:impl:jobEventService");
            return jobEventService.handleAllRegistrations(jobId, registration).getId();
        }, null);
    }

    @Nonnull
    public CompletableFuture<Void> addShuttingDownMember(UUID uuid) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        CompletableFuture<Void> oldFuture = this.membersShuttingDown.putIfAbsent(uuid, future);
        if (oldFuture != null) {
            return oldFuture;
        }
        if (this.removedMembers.containsKey(uuid)) {
            this.logger.fine("NotifyMemberShutdownOperation received for a member that was already removed from the cluster: %s", uuid);
            return CompletableFuture.completedFuture(null);
        }
        this.logger.fine("Added a shutting-down member: %s", uuid);
        CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.jobContext().onParticipantGracefulShutdown(uuid)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> future.complete(null)));
        return future;
    }

    public Map<Long, MasterContext> getMasterContexts() {
        return new HashMap<Long, MasterContext>(this.masterContexts);
    }

    public Map<Long, Object> getLightMasterContexts() {
        return new HashMap<Long, Object>(this.lightMasterContexts);
    }

    public MasterContext getMasterContext(long jobId) {
        return (MasterContext)this.masterContexts.get(jobId);
    }

    public void registerInvocationObserver(JobInvocationObserver observer) {
        this.jobInvocationObservers.add(observer);
    }

    public void unregisterInvocationObserver(JobInvocationObserver observer) {
        this.jobInvocationObservers.remove(observer);
    }

    JetServiceBackend getJetServiceBackend() {
        return this.jetServiceBackend;
    }

    boolean shouldStartJobs() {
        if (!this.isMaster() || !this.nodeEngine.isRunning()) {
            return false;
        }
        ClusterState clusterState = this.nodeEngine.getClusterService().getClusterState();
        if (this.isClusterEnteringPassiveState || clusterState == ClusterState.PASSIVE || clusterState == ClusterState.IN_TRANSITION) {
            this.logger.fine("Not starting jobs because cluster is in passive state or in transition.");
            return false;
        }
        if (!this.membersShuttingDown.isEmpty()) {
            this.logger.fine("Not starting jobs because members are shutting down: %s", this.membersShuttingDown.keySet());
            return false;
        }
        Version clusterVersion = this.nodeEngine.getClusterService().getClusterVersion();
        for (Member m : this.nodeEngine.getClusterService().getMembers()) {
            if (clusterVersion.equals(m.getVersion().asVersion())) continue;
            this.logger.fine("Not starting non-light jobs because rolling upgrade is in progress");
            return false;
        }
        PartitionServiceState state = this.getInternalPartitionService().getPartitionReplicaStateChecker().getPartitionServiceState();
        if (state != PartitionServiceState.SAFE) {
            this.logger.fine("Not starting jobs because partition replication is not in safe state, but in %s", (Object)state);
            return false;
        }
        if (!this.getInternalPartitionService().getPartitionStateManager().isInitialized()) {
            this.logger.fine("Not starting jobs because partitions are not yet initialized.");
            return false;
        }
        if (this.nodeEngine.getNode().isClusterStateManagementAutomatic() && !this.nodeEngine.getNode().isManagedClusterStable()) {
            this.logger.fine("Not starting jobs because cluster is running in managed context and is not yet stable. Current cluster topology intent: %s, expected cluster size: %d, current: %d.", (Object)this.nodeEngine.getNode().getClusterTopologyIntent(), this.nodeEngine.getNode().currentSpecifiedReplicaCount(), this.nodeEngine.getClusterService().getSize());
            return false;
        }
        return true;
    }

    private CompletableFuture<Void> runWithJob(long jobId, @Nonnull Consumer<MasterContext> masterContextHandler, @Nonnull Consumer<JobResult> jobResultHandler, @Nonnull Consumer<JobRecord> jobRecordHandler) {
        return this.callWithJob(jobId, this.toNullFunction(masterContextHandler), this.toNullFunction(jobResultHandler), this.toNullFunction(jobRecordHandler), null);
    }

    @Nonnull
    private <T, R> Function<T, R> toNullFunction(@Nonnull Consumer<T> consumer) {
        return val -> {
            consumer.accept(val);
            return null;
        };
    }

    private <T> CompletableFuture<T> callWithJob(long jobId, @Nonnull Function<MasterContext, T> masterContextHandler, @Nonnull Function<JobResult, T> jobResultHandler, @Nonnull Function<JobRecord, T> jobRecordHandler, @Nullable Function<JobExecutionRecord, T> jobExecutionRecordHandler) {
        this.assertIsMaster("Cannot do this task on non-master. jobId=" + com.hazelcast.jet.Util.idToString(jobId));
        return this.submitToCoordinatorThread(() -> {
            JobExecutionRecord jobExRecord;
            MasterContext mc = (MasterContext)this.masterContexts.get(jobId);
            if (mc != null) {
                return masterContextHandler.apply(mc);
            }
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResultHandler.apply(jobResult);
            }
            if (jobExecutionRecordHandler != null && (jobExRecord = this.jobRepository.getJobExecutionRecord(jobId)) != null) {
                return jobExecutionRecordHandler.apply(jobExRecord);
            }
            JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
            if (jobRecord != null) {
                return jobRecordHandler.apply(jobRecord);
            }
            jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResultHandler.apply(jobResult);
            }
            throw new JobNotFoundException(jobId);
        });
    }

    MembersView membersView(JetMemberSelector memberSelector) {
        return Util.getMembersView(this.nodeEngine);
    }

    void onMemberAdded(MemberImpl addedMember) {
        this.membersShuttingDown.remove(addedMember.getUuid());
        this.removedMembers.remove(addedMember.getUuid());
        this.scheduleScaleUp(this.config.getScaleUpDelayMillis());
        if (addedMember.isLiteMember()) {
            return;
        }
        this.updateQuorumValues();
    }

    void onMemberRemoved(UUID uuid) {
        if (this.membersShuttingDown.remove(uuid) != null) {
            this.logger.fine("Removed a shutting-down member: %s, now shuttingDownMembers=%s", uuid, this.membersShuttingDown.keySet());
        } else {
            this.removedMembers.put(uuid, System.nanoTime());
        }
        long removeThreshold = System.nanoTime() - TimeUnit.HOURS.toNanos(1L);
        this.removedMembers.entrySet().removeIf(en -> (Long)en.getValue() < removeThreshold);
    }

    boolean isQuorumPresent(int quorumSize) {
        return this.getDataMemberCount() >= quorumSize;
    }

    @CheckReturnValue
    CompletableFuture<Void> completeJob(MasterContext masterContext, Throwable error, long completionTime, boolean userCancelled) {
        return this.submitToCoordinatorThread(() -> {
            List<RawJobMetrics> jobMetrics = masterContext.jobConfig().isStoreMetricsAfterJobCompletion() ? masterContext.jobContext().persistentMetrics() : null;
            this.jobRepository.completeJob(masterContext, jobMetrics, error, completionTime, userCancelled);
            if (this.removeMasterContext(masterContext)) {
                this.completeObservables(masterContext.jobRecord().getOwnedObservables(), error);
                this.logger.fine(masterContext.jobIdString() + " is completed");
                (error == null ? this.jobCompletedSuccessfully : this.jobCompletedWithFailure).inc();
            } else {
                MasterContext existing = (MasterContext)this.masterContexts.get(masterContext.jobId());
                if (existing != null) {
                    this.logger.severe("Different master context found to complete " + masterContext.jobIdString() + ", master context execution " + com.hazelcast.jet.Util.idToString(existing.executionId()));
                } else {
                    this.logger.severe("No master context found to complete " + masterContext.jobIdString());
                }
            }
            this.unscheduleJobTimeout(masterContext.jobId());
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean removeMasterContext(MasterContext masterContext) {
        Object object = this.lock;
        synchronized (object) {
            return this.masterContexts.remove(masterContext.jobId(), masterContext);
        }
    }

    void scheduleRestart(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to schedule restart");
            return;
        }
        this.logger.fine("Scheduling restart on master for job %s", masterContext.jobName());
        this.nodeEngine.getExecutionService().schedule(COORDINATOR_EXECUTOR_NAME, () -> this.restartJob(jobId), RETRY_DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
    }

    void scheduleSnapshot(MasterContext mc, long executionId) {
        long snapshotInterval = mc.jobConfig().getSnapshotIntervalMillis();
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        if (this.logger.isFineEnabled()) {
            this.logger.fine(mc.jobIdString() + " snapshot is scheduled in " + snapshotInterval + "ms");
        }
        executionService.schedule(COORDINATOR_EXECUTOR_NAME, () -> mc.snapshotContext().startScheduledSnapshot(executionId), snapshotInterval, TimeUnit.MILLISECONDS);
    }

    void restartJob(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to restart");
            return;
        }
        masterContext.jobContext().tryStartJob();
    }

    private void checkOperationalState() {
        if (this.isClusterEnteringPassiveState) {
            throw new EnteringPassiveClusterStateException();
        }
    }

    private void scheduleScaleUp(long delay) {
        int counter = this.scaleUpScheduledCount.incrementAndGet();
        this.nodeEngine.getExecutionService().schedule(() -> this.scaleJobsUpNow(counter), delay, TimeUnit.MILLISECONDS);
    }

    private void scaleJobsUpNow(int counter) {
        if (this.scaleUpScheduledCount.get() != counter) {
            return;
        }
        if (!this.shouldStartJobs()) {
            this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
            return;
        }
        this.submitToCoordinatorThread(() -> {
            boolean allSucceeded = true;
            int dataMembersCount = this.nodeEngine.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
            int partitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
            int dataMembersWithPartitionsCount = Math.min(dataMembersCount, partitionCount);
            for (MasterContext mc : this.masterContexts.values()) {
                allSucceeded &= mc.maybeScaleUp(dataMembersWithPartitionsCount);
            }
            if (!allSucceeded) {
                this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
            }
        });
    }

    private void updateQuorumValues() {
        if (!this.shouldCheckQuorumValues()) {
            return;
        }
        this.submitToCoordinatorThread(() -> {
            try {
                int currentQuorumSize = this.getQuorumSize();
                for (JobRecord jobRecord : this.jobRepository.getJobRecords()) {
                    try {
                        if (!jobRecord.getConfig().isSplitBrainProtectionEnabled()) continue;
                        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                        if (masterContext == null) {
                            this.jobRepository.updateJobQuorumSizeIfSmaller(jobRecord.getJobId(), currentQuorumSize);
                            masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                        }
                        if (masterContext == null) continue;
                        masterContext.updateQuorumSize(currentQuorumSize);
                    }
                    catch (Exception e) {
                        this.logger.severe("Quorum of job " + com.hazelcast.jet.Util.idToString(jobRecord.getJobId()) + " could not be updated to " + currentQuorumSize, e);
                    }
                }
            }
            catch (Exception e) {
                this.logger.severe("update quorum values task failed", e);
            }
        });
    }

    private boolean shouldCheckQuorumValues() {
        return this.isMaster() && this.nodeEngine.isRunning() && this.getInternalPartitionService().getPartitionStateManager().isInitialized();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Object deserializeJobDefinition(long jobId, JobConfig jobConfig, Data jobDefinitionData) {
        JobClassLoaderService jobClassLoaderService = this.jetServiceBackend.getJobClassLoaderService();
        ClassLoader classLoader = jobClassLoaderService.getOrCreateClassLoader(jobConfig, jobId, JobClassLoaderService.JobPhase.COORDINATOR);
        try {
            jobClassLoaderService.prepareProcessorClassLoaders(jobId);
            Object t = CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine().getSerializationService(), classLoader, jobDefinitionData);
            return t;
        }
        finally {
            jobClassLoaderService.clearProcessorClassLoaders();
        }
    }

    private String dagToJson(DAG dag) {
        int coopThreadCount = this.config.getCooperativeThreadCount();
        return dag.toJson(coopThreadCount).toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> startJobIfNotStartedOrCompleted(@Nonnull JobRecord jobRecord, @Nonnull JobExecutionRecord jobExecutionRecord, String reason) {
        MasterContext oldMasterContext;
        MasterContext masterContext;
        long jobId = jobRecord.getJobId();
        Object object = this.lock;
        synchronized (object) {
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                this.logger.fine("Not starting job %s, already has result: %s", com.hazelcast.jet.Util.idToString(jobId), jobResult);
                return jobResult.asCompletableFuture();
            }
            this.checkOperationalState();
            masterContext = this.createMasterContext(jobRecord, jobExecutionRecord);
            oldMasterContext = this.masterContexts.putIfAbsent(jobId, masterContext);
        }
        if (oldMasterContext != null) {
            return oldMasterContext.jobContext().jobCompletionFuture();
        }
        assert (this.jobRepository.getJobResult(jobId) == null) : "jobResult should not exist at this point";
        if (this.finalizeJobIfAutoScalingOff(masterContext)) {
            return masterContext.jobContext().jobCompletionFuture();
        }
        if (jobExecutionRecord.isSuspended()) {
            this.logger.finest("MasterContext for suspended %s is created", masterContext.jobIdString());
        } else {
            this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(jobId) + ": " + reason);
            masterContext.jobContext().tryStartJob();
        }
        return masterContext.jobContext().jobCompletionFuture();
    }

    private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterContext) {
        long jobId = masterContext.jobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Completing master context for %s since already completed with result: %s", masterContext.jobIdString(), jobResult);
            masterContext.jobContext().setFinalResult(jobResult.getFailureAsThrowable());
            return this.removeMasterContext(masterContext);
        }
        return this.finalizeJobIfAutoScalingOff(masterContext);
    }

    private boolean finalizeJobIfAutoScalingOff(MasterContext masterContext) {
        if (!masterContext.jobConfig().isAutoScaling() && masterContext.jobExecutionRecord().executed()) {
            this.logger.info("Suspending or failing " + masterContext.jobIdString() + " since auto-restart is disabled and the job has been executed before");
            masterContext.jobContext().finalizeExecution(new TopologyChangedException());
            return true;
        }
        return false;
    }

    int getQuorumSize() {
        return this.getDataMemberCount() / 2 + 1;
    }

    private int getDataMemberCount() {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        return clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
    }

    private JobAndSqlSummary getJobAndSqlSummary(JobRecord record) {
        boolean userCancelled;
        JobStatus status;
        String suspensionCause;
        MasterContext ctx = (MasterContext)this.masterContexts.get(record.getJobId());
        long execId = ctx == null ? 0L : ctx.executionId();
        JobExecutionRecord executionRecord = this.jobRepository.getJobExecutionRecord(record.getJobId());
        String string = suspensionCause = executionRecord != null && executionRecord.getSuspensionCause() != null ? executionRecord.getSuspensionCause().description() : null;
        if (ctx == null) {
            status = executionRecord != null && executionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING;
            userCancelled = false;
        } else {
            Optional<MasterJobContext.TerminationRequest> maybeTerminationRequest = ctx.jobContext().getTerminationRequest();
            JobStatus jobStatus = ctx.jobStatus();
            status = jobStatus == JobStatus.RUNNING && maybeTerminationRequest.isPresent() ? JobStatus.COMPLETING : jobStatus;
            userCancelled = status == JobStatus.FAILED && maybeTerminationRequest.map(MasterJobContext.TerminationRequest::isUserInitiated).orElse(false) != false;
        }
        return new JobAndSqlSummary(false, record.getJobId(), execId, record.getJobNameOrId(), status, record.getCreationTime(), 0L, null, JobCoordinationService.getSqlSummary(record.getConfig()), suspensionCause, userCancelled);
    }

    private InternalPartitionServiceImpl getInternalPartitionService() {
        Node node = this.nodeEngine.getNode();
        return (InternalPartitionServiceImpl)node.getPartitionService();
    }

    private void scanJobs() {
        long scanStart = System.currentTimeMillis();
        long nextScanDelay = this.maxJobScanPeriodInMillis;
        try {
            if (this.isMaster()) {
                if (this.shouldStartJobs()) {
                    this.doScanJobs();
                } else {
                    nextScanDelay = 100L;
                }
            }
        }
        catch (HazelcastInstanceNotActiveException hazelcastInstanceNotActiveException) {
        }
        catch (Throwable e) {
            this.logger.severe("Scanning jobs failed", e);
        }
        long scanTime = System.currentTimeMillis() - scanStart;
        nextScanDelay = Math.max(0L, nextScanDelay - scanTime);
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        executionService.schedule(this::scanJobs, nextScanDelay, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doScanJobs() {
        Collection<JobRecord> jobs = this.jobRepository.getJobRecords();
        for (JobRecord jobRecord : jobs) {
            JobExecutionRecord jobExecutionRecord = this.ensureExecutionRecord(jobRecord.getJobId(), this.jobRepository.getJobExecutionRecord(jobRecord.getJobId()));
            this.startJobIfNotStartedOrCompleted(jobRecord, jobExecutionRecord, "discovered by scanning of JobRecords");
        }
        this.jobRepository.cleanup(this.nodeEngine);
        if (!this.jobsScanned) {
            Object object = this.lock;
            synchronized (object) {
                this.jobsScanned = true;
            }
        }
    }

    private JobExecutionRecord ensureExecutionRecord(long jobId, JobExecutionRecord record) {
        return record != null ? record : new JobExecutionRecord(jobId, this.getQuorumSize());
    }

    void assertIsMaster(String error) {
        if (!this.isMaster()) {
            throw new MasterNodeChangedException(error + ". Master address: " + String.valueOf(this.nodeEngine.getClusterService().getMasterAddress()));
        }
    }

    private boolean isMaster() {
        return this.nodeEngine.getClusterService().isMaster();
    }

    NodeEngineImpl nodeEngine() {
        return this.nodeEngine;
    }

    ManagedExecutorService coordinationExecutor() {
        return this.nodeEngine.getExecutionService().getExecutor(COORDINATOR_EXECUTOR_NAME);
    }

    CompletableFuture<Void> submitToCoordinatorThread(Runnable action) {
        return this.submitToCoordinatorThread(() -> {
            action.run();
            return null;
        });
    }

    <T> CompletableFuture<T> submitToCoordinatorThread(Callable<T> action) {
        if (IS_JOB_COORDINATOR_THREAD.get().booleanValue()) {
            try {
                return CompletableFuture.completedFuture(action.call());
            }
            catch (Throwable e) {
                this.logger.warning(null, e);
                return Util.exceptionallyCompletedFuture(e);
            }
        }
        Future<Object> future = this.nodeEngine.getExecutionService().submit(COORDINATOR_EXECUTOR_NAME, () -> {
            assert (!IS_JOB_COORDINATOR_THREAD.get().booleanValue()) : "flag already raised";
            IS_JOB_COORDINATOR_THREAD.set(true);
            try {
                Object v = action.call();
                return v;
            }
            catch (Throwable e) {
                this.logger.warning(null, e);
                throw e;
            }
            finally {
                IS_JOB_COORDINATOR_THREAD.set(false);
            }
        });
        return this.nodeEngine.getExecutionService().asCompletableFuture(future);
    }

    void assertOnCoordinatorThread() {
        assert (IS_JOB_COORDINATOR_THREAD.get().booleanValue()) : "not on coordinator thread";
    }

    private void completeObservables(Set<String> observables, Throwable error) {
        for (String observable : observables) {
            try {
                String ringbufferName = ObservableImpl.ringbufferName(observable);
                Ringbuffer<Object> ringbuffer = this.nodeEngine.getHazelcastInstance().getRingbuffer(ringbufferName);
                Object completion = error == null ? DoneItem.DONE_ITEM : WrappedThrowable.of(error);
                ringbuffer.addAsync(completion, OverflowPolicy.OVERWRITE);
            }
            catch (Exception e) {
                this.logger.severe("Failed to complete observable '" + observable + "': " + String.valueOf(e), e);
            }
        }
    }

    public long[] findUnknownExecutions(long[] executionIds) {
        return Arrays.stream(executionIds).filter(key -> {
            LightMasterContext lightMasterContext;
            Object lmc = this.lightMasterContexts.get(key);
            return lmc == null || lmc instanceof LightMasterContext && (lightMasterContext = (LightMasterContext)lmc).isCancelled();
        }).toArray();
    }

    void scheduleJobTimeout(long jobId, long timeout) {
        if (timeout <= 0L) {
            return;
        }
        this.scheduledJobTimeouts.computeIfAbsent(jobId, id -> this.scheduleJobTimeoutTask((long)id, timeout));
    }

    private void unscheduleJobTimeout(long jobId) {
        ScheduledFuture timeoutFuture = (ScheduledFuture)this.scheduledJobTimeouts.remove(jobId);
        if (timeoutFuture != null) {
            timeoutFuture.cancel(true);
        }
    }

    private ScheduledFuture<?> scheduleJobTimeoutTask(long jobId, long timeout) {
        return this.nodeEngine().getExecutionService().schedule(() -> {
            MasterContext mc = (MasterContext)this.masterContexts.get(jobId);
            LightMasterContext lightMc = (LightMasterContext)this.lightMasterContexts.get(jobId);
            try {
                if (mc != null && this.isMaster() && !mc.jobStatus().isTerminal()) {
                    this.terminateJob(jobId, TerminationMode.CANCEL_FORCEFUL, false);
                } else if (lightMc != null && !lightMc.isCancelled()) {
                    lightMc.requestTermination(false);
                }
            }
            finally {
                this.scheduledJobTimeouts.remove(jobId);
            }
        }, timeout, TimeUnit.MILLISECONDS);
    }

    boolean isMemberShuttingDown(UUID uuid) {
        return this.membersShuttingDown.containsKey(uuid);
    }
}

