package org.apache.seatunnel.engine.server.resourcemanager;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.class */
public abstract class AbstractResourceManager implements ResourceManager {
    private static final Logger log = LoggerFactory.getLogger(AbstractResourceManager.class);
    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500;
    private final NodeEngine nodeEngine;
    private final ExecutionMode mode = ExecutionMode.LOCAL;
    private volatile boolean isRunning = true;
    protected final ConcurrentMap<Address, WorkerProfile> registerWorker = new ConcurrentHashMap();

    public AbstractResourceManager(NodeEngine nodeEngine) {
        this.nodeEngine = nodeEngine;
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public void init() {
        log.info("Init ResourceManager");
        initWorker();
    }

    private void initWorker() {
        log.info("initWorker... ");
        List list = (List) this.nodeEngine.getClusterService().getMembers().stream().map((v0) -> {
            return v0.getAddress();
        }).collect(Collectors.toList());
        log.info("initWorker live nodes: " + list);
        ((List) list.stream().map(address -> {
            return sendToMember(new SyncWorkerProfileOperation(), address).thenAccept(obj -> {
                this.registerWorker.put(address, (WorkerProfile) obj);
            });
        }).collect(Collectors.toList())).forEach((v0) -> {
            v0.join();
        });
        log.info("registerWorker: " + this.registerWorker);
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public CompletableFuture<SlotProfile> applyResource(long j, ResourceProfile resourceProfile) throws NoEnoughResourceException {
        CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
        applyResources(j, Collections.singletonList(resourceProfile)).whenComplete((list, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.complete(list.get(0));
            }
        });
        return completableFuture;
    }

    private void waitingWorkerRegister() {
        if (ExecutionMode.LOCAL.equals(this.mode)) {
            while (this.registerWorker.isEmpty() && this.isRunning) {
                try {
                    log.info("waiting current worker register to resource manager...");
                    Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public void memberRemoved(MembershipServiceEvent membershipServiceEvent) {
        log.warn("Node heartbeat timeout, disconnected for resource manager. Node Address: " + membershipServiceEvent.getMember().getAddress());
        this.registerWorker.remove(membershipServiceEvent.getMember().getAddress());
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public CompletableFuture<List<SlotProfile>> applyResources(long j, List<ResourceProfile> list) throws NoEnoughResourceException {
        waitingWorkerRegister();
        return new ResourceRequestHandler(j, list, this.registerWorker, this).request();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean supportDynamicWorker() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void findNewWorker(List<ResourceProfile> list) {
        throw new UnsupportedOperationException("Unsupported operation to find new worker in " + getClass().getName());
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public void close() {
        this.isRunning = false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <E> CompletableFuture<E> sendToMember(Operation operation, Address address) {
        return NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, operation, address);
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public CompletableFuture<Void> releaseResources(long j, List<SlotProfile> list) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ArrayList arrayList = new ArrayList();
        Iterator<SlotProfile> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(releaseResource(j, it.next()));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r4, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public CompletableFuture<Void> releaseResource(long j, SlotProfile slotProfile) {
        return this.nodeEngine.getClusterService().getMember(slotProfile.getWorker()) != null ? sendToMember(new ReleaseSlotOperation(j, slotProfile), slotProfile.getWorker()) : CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public boolean slotActiveCheck(SlotProfile slotProfile) {
        boolean z = false;
        if (this.registerWorker.containsKey(slotProfile.getWorker())) {
            z = Arrays.stream(this.registerWorker.get(slotProfile.getWorker()).getAssignedSlots()).anyMatch(slotProfile2 -> {
                return slotProfile2.getSlotID() == slotProfile.getSlotID() && slotProfile2.getSequence().equals(slotProfile.getSequence());
            });
        }
        if (z) {
            log.info("received slot active check success, profile: " + slotProfile);
        } else {
            log.info("received slot active check failed, profile: " + slotProfile);
        }
        return z;
    }

    @Override // org.apache.seatunnel.engine.server.resourcemanager.ResourceManager
    public void heartbeat(WorkerProfile workerProfile) {
        if (this.registerWorker.containsKey(workerProfile.getAddress())) {
            log.debug("received worker heartbeat from: " + workerProfile.getAddress());
        } else {
            log.debug("received new worker register: " + workerProfile.getAddress());
            sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();
        }
        this.registerWorker.put(workerProfile.getAddress(), workerProfile);
    }
}
