/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.partition.operation;

import com.hazelcast.internal.partition.ChunkSupplier;
import com.hazelcast.internal.partition.NonFragmentedServiceNamespace;
import com.hazelcast.internal.partition.PartitionReplicaVersionManager;
import com.hazelcast.internal.partition.PartitionReplicationEvent;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.partition.impl.PartitionStateManager;
import com.hazelcast.internal.partition.operation.PartitionReplicaSyncRequest;
import com.hazelcast.internal.partition.operation.PartitionReplicaSyncResponse;
import com.hazelcast.internal.partition.operation.UrgentPartitionRunnable;
import com.hazelcast.internal.serialization.impl.SerializationUtil;
import com.hazelcast.internal.services.ServiceNamespace;
import com.hazelcast.internal.util.BiTuple;
import com.hazelcast.internal.util.CollectionUtil;
import com.hazelcast.internal.util.ThreadUtil;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.impl.operationservice.CallStatus;
import com.hazelcast.spi.impl.operationservice.Offload;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.OperationService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;

public final class PartitionReplicaSyncRequestOffloadable
extends PartitionReplicaSyncRequest {
    private final transient ConcurrentMap<BiTuple, long[]> replicaVersions = new ConcurrentHashMap<BiTuple, long[]>();
    private volatile int partitionId;

    public PartitionReplicaSyncRequestOffloadable() {
        this.namespaces = Collections.emptyList();
    }

    public PartitionReplicaSyncRequestOffloadable(Collection<ServiceNamespace> namespaces, int partitionId, int replicaIndex) {
        this.namespaces = Collections.newSetFromMap(new ConcurrentHashMap());
        this.namespaces.addAll(namespaces);
        this.partitionId = partitionId;
        this.setPartitionId(-1);
        this.setReplicaIndex(replicaIndex);
    }

    @Override
    public CallStatus call() throws Exception {
        return new ReplicaSyncRequestOffload();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void sendOperationsForNamespaces(int permits) {
        InternalPartitionServiceImpl partitionService = (InternalPartitionServiceImpl)this.getService();
        try {
            PartitionReplicationEvent event = new PartitionReplicationEvent(this.getCallerAddress(), this.partitionId, this.getReplicaIndex());
            this.readReplicaVersions();
            Iterator iterator = this.namespaces.iterator();
            for (int i = 0; i < permits; ++i) {
                ServiceNamespace namespace = (ServiceNamespace)iterator.next();
                Collection<Object> operations = Collections.emptyList();
                List<ChunkSupplier> chunkSuppliers = Collections.emptyList();
                if (NonFragmentedServiceNamespace.INSTANCE.equals(namespace)) {
                    operations = this.createNonFragmentedReplicationOperations(event);
                } else {
                    Collection<ChunkSupplier> collection = chunkSuppliers = this.isChunkedMigrationEnabled() ? this.collectChunkSuppliers(event, namespace) : chunkSuppliers;
                    if (CollectionUtil.isEmpty(chunkSuppliers)) {
                        operations = this.createFragmentReplicationOperationsOffload(event, namespace);
                    }
                }
                if (!CollectionUtil.isNotEmpty(operations) && !CollectionUtil.isNotEmpty(chunkSuppliers)) continue;
                this.sendOperationsOnPartitionThread(new CopyOnWriteArrayList<Operation>(operations), new CopyOnWriteArrayList<ChunkSupplier>(chunkSuppliers), namespace);
                while (this.hasRemainingChunksToSend(chunkSuppliers)) {
                    this.sendOperationsOnPartitionThread(new CopyOnWriteArrayList<Operation>(operations), new CopyOnWriteArrayList<ChunkSupplier>(chunkSuppliers), namespace);
                }
                iterator.remove();
            }
        }
        finally {
            partitionService.getReplicaManager().releaseReplicaSyncPermits(permits);
        }
    }

    private void readReplicaVersions() {
        InternalPartitionServiceImpl partitionService = (InternalPartitionServiceImpl)this.getService();
        OperationService operationService = this.getNodeEngine().getOperationService();
        PartitionReplicaVersionManager versionManager = partitionService.getPartitionReplicaVersionManager();
        UrgentPartitionRunnable gatherReplicaVersionsRunnable = new UrgentPartitionRunnable(this.partitionId(), () -> {
            for (ServiceNamespace ns : this.namespaces) {
                long[] versions = Arrays.copyOf(versionManager.getPartitionReplicaVersions(this.partitionId(), ns), 6);
                this.replicaVersions.put(BiTuple.of(this.partitionId(), ns), versions);
            }
        });
        operationService.execute(gatherReplicaVersionsRunnable);
        gatherReplicaVersionsRunnable.future.joinInternal();
    }

    @Override
    protected int partitionId() {
        return this.partitionId;
    }

    private void sendOperationsOnPartitionThread(Collection<Operation> operations, Collection<ChunkSupplier> chunkSuppliers, ServiceNamespace ns) {
        if (ThreadUtil.isRunningOnPartitionThread()) {
            this.sendOperations(operations, chunkSuppliers, ns);
        } else {
            UrgentPartitionRunnable partitionRunnable = new UrgentPartitionRunnable(this.partitionId(), () -> this.sendOperations(operations, chunkSuppliers, ns));
            this.getNodeEngine().getOperationService().execute(partitionRunnable);
            partitionRunnable.future.joinInternal();
        }
    }

    @Override
    protected PartitionReplicaSyncResponse createResponse(Collection<Operation> operations, Collection<ChunkSupplier> chunkSuppliers, ServiceNamespace ns) {
        int partitionId = this.partitionId();
        int replicaIndex = this.getReplicaIndex();
        long[] versions = (long[])this.replicaVersions.get(BiTuple.of(partitionId, ns));
        PartitionReplicaSyncResponse syncResponse = new PartitionReplicaSyncResponse(operations, chunkSuppliers, ns, versions, this.isChunkedMigrationEnabled(), this.getMaxTotalChunkedDataInBytes(), this.getLogger(), partitionId);
        syncResponse.setPartitionId(partitionId).setReplicaIndex(replicaIndex);
        return syncResponse;
    }

    @Override
    public int getClassId() {
        return 25;
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        SerializationUtil.writeCollection(this.namespaces, out);
        out.writeInt(this.partitionId);
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        this.namespaces = Collections.newSetFromMap(new ConcurrentHashMap());
        this.namespaces.addAll(SerializationUtil.readCollection(in));
        this.partitionId = in.readInt();
    }

    private boolean trySetMigratingFlag() {
        InternalPartitionServiceImpl partitionService = (InternalPartitionServiceImpl)this.getService();
        PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
        UrgentPartitionRunnable trySetMigrating = new UrgentPartitionRunnable(this.partitionId(), () -> partitionStateManager.trySetMigratingFlag(this.partitionId()));
        this.getNodeEngine().getOperationService().execute(trySetMigrating);
        return (Boolean)trySetMigrating.future.joinInternal();
    }

    private void clearMigratingFlag() {
        InternalPartitionServiceImpl partitionService = (InternalPartitionServiceImpl)this.getService();
        PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
        UrgentPartitionRunnable trySetMigrating = new UrgentPartitionRunnable(this.partitionId(), () -> partitionStateManager.clearMigratingFlag(this.partitionId()));
        this.getNodeEngine().getOperationService().execute(trySetMigrating);
        trySetMigrating.future.joinInternal();
    }

    final class ReplicaSyncRequestOffload
    extends Offload {
        ReplicaSyncRequestOffload() {
            super(PartitionReplicaSyncRequestOffloadable.this);
        }

        @Override
        public void start() {
            try {
                this.nodeEngine.getExecutionService().execute("hz:async", () -> {
                    try {
                        if (!PartitionReplicaSyncRequestOffloadable.this.trySetMigratingFlag()) {
                            PartitionReplicaSyncRequestOffloadable.this.sendRetryResponse();
                        }
                        try {
                            Integer permits = PartitionReplicaSyncRequestOffloadable.this.getPermits();
                            if (permits == null) {
                                return;
                            }
                            PartitionReplicaSyncRequestOffloadable.this.sendOperationsForNamespaces(permits);
                            if (!PartitionReplicaSyncRequestOffloadable.this.namespaces.isEmpty()) {
                                PartitionReplicaSyncRequestOffloadable.this.logNotEnoughPermits();
                                PartitionReplicaSyncRequestOffloadable.this.sendRetryResponse();
                            }
                        }
                        finally {
                            PartitionReplicaSyncRequestOffloadable.this.clearMigratingFlag();
                        }
                    }
                    finally {
                        PartitionReplicaSyncRequestOffloadable.this.sendResponse(null);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                try {
                    PartitionReplicaSyncRequestOffloadable.this.sendRetryResponse();
                }
                finally {
                    PartitionReplicaSyncRequestOffloadable.this.sendResponse(null);
                }
            }
        }
    }
}

