package org.apache.seatunnel.engine.server.service.slot;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.server.TaskExecutionService;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.CPU;
import org.apache.seatunnel.engine.server.resourcemanager.resource.Memory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

/* loaded from: input_file:org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.class */
public class DefaultSlotService implements SlotService {
    private static final ILogger LOGGER = Logger.getLogger(DefaultSlotService.class);
    private static final long DEFAULT_HEARTBEAT_TIMEOUT = 5000;
    private final NodeEngineImpl nodeEngine;
    private AtomicReference<ResourceProfile> unassignedResource;
    private AtomicReference<ResourceProfile> assignedResource;
    private ConcurrentMap<Integer, SlotProfile> assignedSlots;
    private ConcurrentMap<Integer, SlotProfile> unassignedSlots;
    private ScheduledExecutorService scheduledExecutorService;
    private final SlotServiceConfig config;
    private volatile boolean initStatus;
    private final IdGenerator idGenerator = new IdGenerator();
    private final TaskExecutionService taskExecutionService;
    private ConcurrentMap<Integer, SlotContext> contexts;
    private String slotServiceSequence;

    public DefaultSlotService(NodeEngineImpl nodeEngineImpl, TaskExecutionService taskExecutionService, SlotServiceConfig slotServiceConfig) {
        this.nodeEngine = nodeEngineImpl;
        this.config = slotServiceConfig;
        this.taskExecutionService = taskExecutionService;
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public void init() {
        this.initStatus = true;
        this.slotServiceSequence = UUID.randomUUID().toString();
        this.contexts = new ConcurrentHashMap();
        this.assignedSlots = new ConcurrentHashMap();
        this.unassignedSlots = new ConcurrentHashMap();
        this.unassignedResource = new AtomicReference<>(new ResourceProfile());
        this.assignedResource = new AtomicReference<>(new ResourceProfile());
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            return new Thread(runnable, String.format("hz.%s.seaTunnel.slotService.thread", this.nodeEngine.getHazelcastInstance().getName()));
        });
        if (!this.config.isDynamicSlot()) {
            initFixedSlots();
        }
        this.unassignedResource.set(getNodeResource());
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                LOGGER.fine("start send heartbeat to resource manager, this address: " + this.nodeEngine.getClusterService().getThisAddress());
                sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
            } catch (Exception e) {
                LOGGER.warning("failed send heartbeat to resource manager, will retry later. this address: " + this.nodeEngine.getClusterService().getThisAddress());
            }
        }, 0L, DEFAULT_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public void reset() {
        if (this.initStatus) {
            return;
        }
        synchronized (this) {
            if (!this.initStatus) {
                close();
                init();
            }
        }
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public synchronized SlotAndWorkerProfile requestSlot(long j, ResourceProfile resourceProfile) {
        this.initStatus = false;
        SlotProfile selectBestMatchSlot = selectBestMatchSlot(resourceProfile);
        if (selectBestMatchSlot != null) {
            selectBestMatchSlot.assign(j);
            this.assignedResource.accumulateAndGet(selectBestMatchSlot.getResourceProfile(), (v0, v1) -> {
                return v0.merge(v1);
            });
            this.unassignedResource.accumulateAndGet(selectBestMatchSlot.getResourceProfile(), (v0, v1) -> {
                return v0.subtract(v1);
            });
            this.unassignedSlots.remove(Integer.valueOf(selectBestMatchSlot.getSlotID()));
            this.assignedSlots.put(Integer.valueOf(selectBestMatchSlot.getSlotID()), selectBestMatchSlot);
            this.contexts.computeIfAbsent(Integer.valueOf(selectBestMatchSlot.getSlotID()), num -> {
                return new SlotContext(selectBestMatchSlot.getSlotID(), this.taskExecutionService);
            });
        }
        LOGGER.fine(String.format("received slot request, jobID: %d, resource profile: %s, return: %s", Long.valueOf(j), resourceProfile, selectBestMatchSlot));
        return new SlotAndWorkerProfile(getWorkerProfile(), selectBestMatchSlot);
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public SlotContext getSlotContext(SlotProfile slotProfile) {
        if (this.contexts.containsKey(Integer.valueOf(slotProfile.getSlotID()))) {
            return this.contexts.get(Integer.valueOf(slotProfile.getSlotID()));
        }
        throw new WrongTargetSlotException("Unknown slot in slot service, slot profile: " + slotProfile);
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public synchronized void releaseSlot(long j, SlotProfile slotProfile) {
        LOGGER.info(String.format("received slot release request, jobID: %d, slot: %s", Long.valueOf(j), slotProfile));
        if (!this.assignedSlots.containsKey(Integer.valueOf(slotProfile.getSlotID()))) {
            throw new WrongTargetSlotException("Not exist this slot in slot service, slot profile: " + slotProfile);
        }
        if (!this.assignedSlots.get(Integer.valueOf(slotProfile.getSlotID())).getSequence().equals(slotProfile.getSequence())) {
            throw new WrongTargetSlotException("Wrong slot sequence in profile, slot profile: " + slotProfile);
        }
        if (this.assignedSlots.get(Integer.valueOf(slotProfile.getSlotID())).getOwnerJobID() != j) {
            throw new WrongTargetSlotException(String.format("The profile %s not belong with job %d", this.assignedSlots.get(Integer.valueOf(slotProfile.getSlotID())), Long.valueOf(j)));
        }
        this.assignedResource.accumulateAndGet(slotProfile.getResourceProfile(), (v0, v1) -> {
            return v0.subtract(v1);
        });
        this.unassignedResource.accumulateAndGet(slotProfile.getResourceProfile(), (v0, v1) -> {
            return v0.merge(v1);
        });
        slotProfile.unassigned();
        if (!this.config.isDynamicSlot()) {
            this.unassignedSlots.put(Integer.valueOf(slotProfile.getSlotID()), slotProfile);
        }
        this.assignedSlots.remove(Integer.valueOf(slotProfile.getSlotID()));
        this.contexts.remove(Integer.valueOf(slotProfile.getSlotID()));
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public void close() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    private SlotProfile selectBestMatchSlot(ResourceProfile resourceProfile) {
        if (this.unassignedSlots.isEmpty() && !this.config.isDynamicSlot()) {
            return null;
        }
        if (!this.config.isDynamicSlot()) {
            return this.unassignedSlots.values().stream().filter(slotProfile -> {
                return slotProfile.getResourceProfile().enoughThan(resourceProfile);
            }).min((slotProfile2, slotProfile3) -> {
                return slotProfile2.getResourceProfile().getHeapMemory().getBytes() != slotProfile3.getResourceProfile().getHeapMemory().getBytes() ? slotProfile2.getResourceProfile().getHeapMemory().getBytes() - slotProfile3.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1 : slotProfile2.getResourceProfile().getCpu().getCore() - slotProfile3.getResourceProfile().getCpu().getCore();
            }).orElse(null);
        }
        if (this.unassignedResource.get().enoughThan(resourceProfile)) {
            return new SlotProfile(this.nodeEngine.getThisAddress(), (int) this.idGenerator.getNextId(), resourceProfile, this.slotServiceSequence);
        }
        return null;
    }

    private void initFixedSlots() {
        long maxMemory = Runtime.getRuntime().maxMemory();
        for (int i = 0; i < this.config.getSlotNum(); i++) {
            this.unassignedSlots.put(Integer.valueOf(i), new SlotProfile(this.nodeEngine.getThisAddress(), i, new ResourceProfile(CPU.of(0), Memory.of(maxMemory / this.config.getSlotNum())), this.slotServiceSequence));
        }
    }

    @Override // org.apache.seatunnel.engine.server.service.slot.SlotService
    public synchronized WorkerProfile getWorkerProfile() {
        WorkerProfile workerProfile = new WorkerProfile(this.nodeEngine.getThisAddress());
        workerProfile.setProfile(getNodeResource());
        workerProfile.setAssignedSlots((SlotProfile[]) this.assignedSlots.values().toArray(new SlotProfile[0]));
        workerProfile.setUnassignedSlots((SlotProfile[]) this.unassignedSlots.values().toArray(new SlotProfile[0]));
        workerProfile.setUnassignedResource(this.unassignedResource.get());
        workerProfile.setAttributes(this.nodeEngine.getLocalMember().getAttributes());
        workerProfile.setDynamicSlot(this.config.isDynamicSlot());
        return workerProfile;
    }

    private ResourceProfile getNodeResource() {
        return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
    }

    public <E> InvocationFuture<E> sendToMaster(Operation operation) {
        return NodeEngineUtil.sendOperationToMasterNode(this.nodeEngine, operation);
    }
}
