/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server.resourcemanager;

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceRequestHandler;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ReleaseSlotOperation;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourceOperation;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.SyncWorkerProfileOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractResourceManager
implements ResourceManager {
    private static final Logger log = LoggerFactory.getLogger(AbstractResourceManager.class);
    private static final long DEFAULT_WORKER_CHECK_INTERVAL = 500L;
    protected final ConcurrentMap<Address, WorkerProfile> registerWorker = new ConcurrentHashMap<Address, WorkerProfile>();
    private final NodeEngine nodeEngine;
    private final ExecutionMode mode;
    private final EngineConfig engineConfig;
    private volatile boolean isRunning = true;

    public AbstractResourceManager(NodeEngine nodeEngine, EngineConfig engineConfig) {
        this.nodeEngine = nodeEngine;
        this.engineConfig = engineConfig;
        this.mode = engineConfig.getMode();
    }

    @Override
    public void init() {
        log.info("Init ResourceManager");
        this.initWorker();
    }

    private void initWorker() {
        log.info("initWorker... ");
        List aliveNode = this.nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());
        log.info("init live nodes: {}", aliveNode);
        List<CompletableFuture> futures = aliveNode.stream().map(node -> this.sendToMember(new SyncWorkerProfileOperation(), (Address)node).thenAccept(p -> {
            if (p != null) {
                this.registerWorker.put((Address)node, (WorkerProfile)p);
                log.info("received new worker register: " + ((WorkerProfile)p).getAddress());
            }
        })).collect(Collectors.toList());
        futures.forEach(CompletableFuture::join);
        log.info("registerWorker: {}", this.registerWorker);
    }

    @Override
    public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile, Map<String, String> tagFilter) throws NoEnoughResourceException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.applyResources(jobId, Collections.singletonList(resourceProfile), tagFilter).whenComplete((profile, error) -> {
            if (error != null) {
                completableFuture.completeExceptionally(error);
            } else {
                completableFuture.complete(profile.get(0));
            }
        });
        return completableFuture;
    }

    private void waitingWorkerRegister() {
        if (ExecutionMode.LOCAL.equals((Object)this.mode)) {
            try {
                while (this.registerWorker.isEmpty() && this.isRunning) {
                    log.info("waiting current worker register to resource manager...");
                    Thread.sleep(500L);
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void memberRemoved(MembershipServiceEvent event) {
        log.warn("Node heartbeat timeout, disconnected for resource manager. Node Address: " + event.getMember().getAddress());
        this.registerWorker.remove(event.getMember().getAddress());
    }

    @Override
    public CompletableFuture<List<SlotProfile>> applyResources(long jobId, List<ResourceProfile> resourceProfile, Map<String, String> tagFilter) throws NoEnoughResourceException {
        this.waitingWorkerRegister();
        ConcurrentMap<Address, WorkerProfile> matchedWorker = this.filterWorkerByTag(tagFilter);
        if (matchedWorker.isEmpty()) {
            log.error("No matched worker with tag filter {}.", tagFilter);
            throw new NoEnoughResourceException();
        }
        return new ResourceRequestHandler(jobId, resourceProfile, matchedWorker, this).request(tagFilter);
    }

    protected boolean supportDynamicWorker() {
        return false;
    }

    protected void findNewWorker(List<ResourceProfile> resourceProfiles, Map<String, String> tagFilter) {
        throw new UnsupportedOperationException("Unsupported operation to find new worker in " + this.getClass().getName());
    }

    @Override
    public void close() {
        this.isRunning = false;
    }

    protected <E> CompletableFuture<E> sendToMember(Operation operation, Address address) {
        return new CompletableFuture(NodeEngineUtil.sendOperationToMemberNode(this.nodeEngine, operation, address));
    }

    @Override
    public CompletableFuture<Void> releaseResources(long jobId, List<SlotProfile> profiles) {
        CompletableFuture completableFuture = new CompletableFuture();
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
        for (SlotProfile profile : profiles) {
            futures.add(this.releaseResource(jobId, profile));
        }
        CompletableFuture.allOf((CompletableFuture[])futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
            if (e != null) {
                completableFuture.completeExceptionally(e);
            } else {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    @Override
    public CompletableFuture<Void> releaseResource(long jobId, SlotProfile profile) {
        if (this.nodeEngine.getClusterService().getMember(profile.getWorker()) != null) {
            CompletableFuture future = this.sendToMember(new ReleaseSlotOperation(jobId, profile), profile.getWorker());
            return future.thenAccept(this::heartbeat);
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public boolean slotActiveCheck(SlotProfile profile) {
        boolean active = false;
        if (this.registerWorker.containsKey(profile.getWorker())) {
            active = Arrays.stream(((WorkerProfile)this.registerWorker.get(profile.getWorker())).getAssignedSlots()).anyMatch(s -> s.getSlotID() == profile.getSlotID() && s.getSequence().equals(profile.getSequence()));
        }
        if (!active) {
            log.info("received slot active check failed, profile: " + profile);
        } else {
            log.info("received slot active check success, profile: " + profile);
        }
        return active;
    }

    @Override
    public void heartbeat(WorkerProfile workerProfile) {
        if (!this.registerWorker.containsKey(workerProfile.getAddress())) {
            log.info("received new worker register: " + workerProfile.getAddress());
            this.sendToMember(new ResetResourceOperation(), workerProfile.getAddress()).join();
        } else {
            log.debug("received worker heartbeat from: " + workerProfile.getAddress());
        }
        this.registerWorker.put(workerProfile.getAddress(), workerProfile);
    }

    @Override
    public List<SlotProfile> getUnassignedSlots(Map<String, String> tags) {
        return this.filterWorkerByTag(tags).values().stream().flatMap(workerProfile -> Arrays.stream(workerProfile.getUnassignedSlots())).collect(Collectors.toList());
    }

    @Override
    public List<SlotProfile> getAssignedSlots(Map<String, String> tags) {
        return this.filterWorkerByTag(tags).values().stream().flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots())).collect(Collectors.toList());
    }

    @Override
    public int workerCount(Map<String, String> tags) {
        return this.filterWorkerByTag(tags).size();
    }

    private ConcurrentMap<Address, WorkerProfile> filterWorkerByTag(Map<String, String> tagFilter) {
        if (tagFilter == null || tagFilter.isEmpty()) {
            return this.registerWorker;
        }
        return this.registerWorker.entrySet().stream().filter(e -> {
            Map<String, String> workerAttr = ((WorkerProfile)e.getValue()).getAttributes();
            if (workerAttr == null || workerAttr.isEmpty()) {
                return false;
            }
            boolean match = true;
            for (Map.Entry entry : tagFilter.entrySet()) {
                if (workerAttr.containsKey(entry.getKey()) && workerAttr.get(entry.getKey()).equals(entry.getValue())) continue;
                return false;
            }
            return match;
        }).collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
    }
}

