/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.operation;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobClassLoaderService;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.operation.AsyncJobOperation;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.version.Version;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public class InitExecutionOperation
extends AsyncJobOperation {
    private long executionId;
    private int coordinatorMemberListVersion;
    private Version coordinatorVersion;
    private Set<MemberInfo> participants;
    private Object plan;
    private boolean isLightJob;

    public InitExecutionOperation() {
    }

    private InitExecutionOperation(long jobId, long executionId, int coordinatorMemberListVersion, Version coordinatorVersion, Set<MemberInfo> participants, @Nonnull Object plan, boolean isLightJob) {
        super(jobId);
        this.executionId = executionId;
        this.coordinatorMemberListVersion = coordinatorMemberListVersion;
        this.coordinatorVersion = coordinatorVersion;
        this.participants = participants;
        assert (isLightJob || plan instanceof Data) : "Serialized plan is expected for normal job";
        assert (!isLightJob || plan instanceof ExecutionPlan) : "Non-serialized plan is expected for light job";
        this.plan = plan;
        this.isLightJob = isLightJob;
    }

    public static InitExecutionOperation forNormalJob(long jobId, long executionId, int coordinatorMemberListVersion, Version coordinatorVersion, Set<MemberInfo> participants, @Nonnull Data serializedPlan) {
        return new InitExecutionOperation(jobId, executionId, coordinatorMemberListVersion, coordinatorVersion, participants, serializedPlan, false);
    }

    public static InitExecutionOperation forLightJob(long jobId, long executionId, int coordinatorMemberListVersion, Version coordinatorVersion, Set<MemberInfo> participants, @Nonnull ExecutionPlan plan) {
        return new InitExecutionOperation(jobId, executionId, coordinatorMemberListVersion, coordinatorVersion, participants, plan, true);
    }

    @Override
    protected CompletableFuture<?> doRun() {
        ILogger logger = this.getLogger();
        if (!this.getNodeEngine().getLocalMember().getVersion().asVersion().equals(this.coordinatorVersion)) {
            throw new JetException("Mismatch between coordinator and participant version");
        }
        JetServiceBackend service = this.getJetServiceBackend();
        Address caller = this.getCallerAddress();
        logger.fine("Initializing execution plan for %s from %s", Util.jobIdAndExecutionId(this.jobId(), this.executionId), caller);
        if (this.isLightJob) {
            return service.getJobExecutionService().runLightJob(this.jobId(), this.executionId, this.getCallerAddress(), this.coordinatorMemberListVersion, this.participants, (ExecutionPlan)this.plan);
        }
        return service.getJobExecutionService().initExecution(this.jobId(), this.executionId, this.getCallerAddress(), this.coordinatorMemberListVersion, this.participants, this.deserializedPlan());
    }

    @Override
    public int getClassId() {
        return 5;
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        super.writeInternal(out);
        out.writeLong(this.executionId);
        out.writeBoolean(this.isLightJob);
        out.writeInt(this.coordinatorMemberListVersion);
        out.writeObject(this.coordinatorVersion);
        out.writeInt(this.participants.size());
        for (MemberInfo participant : this.participants) {
            out.writeObject(participant);
        }
        if (this.isLightJob) {
            out.writeObject(this.plan);
        } else {
            IOUtil.writeData(out, (Data)this.plan);
        }
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        super.readInternal(in);
        this.executionId = in.readLong();
        this.isLightJob = in.readBoolean();
        this.coordinatorMemberListVersion = in.readInt();
        this.coordinatorVersion = (Version)in.readObject();
        int count = in.readInt();
        this.participants = new HashSet<MemberInfo>();
        for (int i = 0; i < count; ++i) {
            this.participants.add((MemberInfo)in.readObject());
        }
        this.plan = this.isLightJob ? in.readObject() : IOUtil.readData(in);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ExecutionPlan deserializedPlan() {
        assert (!this.isLightJob);
        JetServiceBackend service = this.getJetServiceBackend();
        JobConfig jobConfig = service.getJobConfig(this.jobId(), this.isLightJob);
        JobClassLoaderService jobClassloaderService = service.getJobClassLoaderService();
        ClassLoader cl = jobClassloaderService.getOrCreateClassLoader(jobConfig, this.jobId(), JobClassLoaderService.JobPhase.EXECUTION);
        try {
            jobClassloaderService.prepareProcessorClassLoaders(this.jobId());
            ExecutionPlan executionPlan = (ExecutionPlan)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.getNodeEngine().getSerializationService(), cl, (Data)this.plan);
            return executionPlan;
        }
        finally {
            jobClassloaderService.clearProcessorClassLoaders();
        }
    }
}

