package org.camunda.bpm.engine.rest.impl;

import jakarta.servlet.ServletContext;
import jakarta.servlet.ServletContextEvent;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.camunda.bpm.engine.IdentityService;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.impl.ProcessEngineImpl;
import org.camunda.bpm.engine.impl.util.ClockUtil;
import org.camunda.bpm.engine.impl.util.SingleConsumerCondition;
import org.camunda.bpm.engine.rest.dto.externaltask.FetchExternalTasksExtendedDto;
import org.camunda.bpm.engine.rest.dto.externaltask.LockedExternalTaskDto;
import org.camunda.bpm.engine.rest.exception.InvalidRequestException;
import org.camunda.bpm.engine.rest.exception.RestException;
import org.camunda.bpm.engine.rest.spi.FetchAndLockHandler;
import org.camunda.bpm.engine.rest.util.EngineUtil;

/* loaded from: input_file:org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerImpl.class */
public class FetchAndLockHandlerImpl implements Runnable, FetchAndLockHandler {
    private static final Logger LOG = Logger.getLogger(FetchAndLockHandlerImpl.class.getName());
    protected static final String UNIQUE_WORKER_REQUEST_PARAM_NAME = "fetch-and-lock-unique-worker-request";
    protected static final String BLOCKING_QUEUE_CAPACITY_PARAM_NAME = "fetch-and-lock-queue-capacity";
    protected static final long PENDING_REQUEST_FETCH_INTERVAL = 30000;
    protected static final long MAX_BACK_OFF_TIME = Long.MAX_VALUE;
    protected static final long MAX_REQUEST_TIMEOUT = 1800000;
    protected static final int DEFAULT_BLOCKING_QUEUE_CAPACITY = 200;
    protected BlockingQueue<FetchAndLockRequest> queue;
    protected List<FetchAndLockRequest> pendingRequests = new ArrayList();
    protected List<FetchAndLockRequest> newRequests = new ArrayList();
    protected Thread handlerThread = new Thread(this, getClass().getSimpleName());
    protected volatile boolean isRunning = false;
    protected boolean isUniqueWorkerRequest = false;
    protected SingleConsumerCondition condition = new SingleConsumerCondition(this.handlerThread);

    @Override // java.lang.Runnable
    public void run() {
        while (this.isRunning) {
            try {
                acquire();
            } catch (Exception e) {
            }
        }
        rejectPendingRequests();
    }

    protected void acquire() {
        LOG.log(Level.FINEST, "Acquire start");
        this.queue.drainTo(this.newRequests);
        if (!this.newRequests.isEmpty()) {
            if (this.isUniqueWorkerRequest) {
                removeDuplicates();
            }
            this.pendingRequests.addAll(this.newRequests);
            this.newRequests.clear();
        }
        LOG.log(Level.FINEST, "Number of pending requests {0}", Integer.valueOf(this.pendingRequests.size()));
        long j = Long.MAX_VALUE;
        Iterator<FetchAndLockRequest> it = this.pendingRequests.iterator();
        while (it.hasNext()) {
            FetchAndLockRequest next = it.next();
            LOG.log(Level.FINEST, "Fetching tasks for request {0}", next);
            FetchAndLockResult tryFetchAndLock = tryFetchAndLock(next);
            LOG.log(Level.FINEST, "Fetch and lock result: {0}", tryFetchAndLock);
            if (tryFetchAndLock.wasSuccessful()) {
                List<LockedExternalTaskDto> tasks = tryFetchAndLock.getTasks();
                if (!tasks.isEmpty() || isExpired(next)) {
                    next.getAsyncResponse().resume(tasks);
                    LOG.log(Level.FINEST, "resume and remove request with {0}", tasks);
                    it.remove();
                } else {
                    j = Math.min(j, next.getTimeoutTimestamp() - ClockUtil.getCurrentTime().getTime());
                }
            } else {
                AsyncResponse asyncResponse = next.getAsyncResponse();
                Throwable throwable = tryFetchAndLock.getThrowable();
                asyncResponse.resume(throwable);
                LOG.log(Level.FINEST, "Resume and remove request with error", throwable);
                it.remove();
            }
        }
        long max = Math.max(0L, j);
        if (this.pendingRequests.isEmpty()) {
            suspend(max);
        } else {
            suspend(Math.min(PENDING_REQUEST_FETCH_INTERVAL, max));
        }
    }

    protected void removeDuplicates() {
        for (FetchAndLockRequest fetchAndLockRequest : this.newRequests) {
            Iterator<FetchAndLockRequest> it = this.pendingRequests.iterator();
            while (it.hasNext()) {
                FetchAndLockRequest next = it.next();
                if (next.getDto().getWorkerId().equals(fetchAndLockRequest.getDto().getWorkerId())) {
                    next.getAsyncResponse().cancel();
                    it.remove();
                }
            }
        }
    }

    @Override // org.camunda.bpm.engine.rest.spi.FetchAndLockHandler
    public void start() {
        if (this.isRunning) {
            return;
        }
        this.isRunning = true;
        this.handlerThread.start();
        if (this.queue == null) {
            initializeQueue(DEFAULT_BLOCKING_QUEUE_CAPACITY);
        }
        ProcessEngineImpl.EXT_TASK_CONDITIONS.addConsumer(this.condition);
    }

    @Override // org.camunda.bpm.engine.rest.spi.FetchAndLockHandler
    public void shutdown() {
        try {
            ProcessEngineImpl.EXT_TASK_CONDITIONS.removeConsumer(this.condition);
            try {
                this.handlerThread.join();
            } catch (InterruptedException e) {
                LOG.log(Level.WARNING, "Shutting down the handler thread failed", (Throwable) e);
            }
        } finally {
            this.isRunning = false;
            this.condition.signal();
        }
    }

    protected void suspend(long j) {
        if (j <= 0) {
            return;
        }
        suspendAcquisition(j);
    }

    protected void suspendAcquisition(long j) {
        try {
            if (this.queue.isEmpty() && this.isRunning) {
                LOG.log(Level.FINEST, "Suspend acquisition for {0}ms", Long.valueOf(j));
                this.condition.await(j);
                LOG.log(Level.FINEST, "Acquisition woke up");
            }
        } finally {
            if (this.handlerThread.isInterrupted()) {
                Thread.currentThread().interrupt();
            }
        }
    }

    protected void addRequest(FetchAndLockRequest fetchAndLockRequest) {
        if (!this.queue.offer(fetchAndLockRequest)) {
            errorTooManyRequests(fetchAndLockRequest.getAsyncResponse());
        }
        this.condition.signal();
    }

    protected FetchAndLockResult tryFetchAndLock(FetchAndLockRequest fetchAndLockRequest) {
        FetchAndLockResult failed;
        IdentityService identityService = null;
        try {
            try {
                ProcessEngine processEngine = getProcessEngine(fetchAndLockRequest);
                identityService = processEngine.getIdentityService();
                identityService.setAuthentication(fetchAndLockRequest.getAuthentication());
                failed = FetchAndLockResult.successful(executeFetchAndLock(fetchAndLockRequest.getDto(), processEngine));
                if (identityService != null) {
                    identityService.clearAuthentication();
                }
            } catch (Exception e) {
                failed = FetchAndLockResult.failed(e);
                if (identityService != null) {
                    identityService.clearAuthentication();
                }
            }
            return failed;
        } catch (Throwable th) {
            if (identityService != null) {
                identityService.clearAuthentication();
            }
            throw th;
        }
    }

    protected List<LockedExternalTaskDto> executeFetchAndLock(FetchExternalTasksExtendedDto fetchExternalTasksExtendedDto, ProcessEngine processEngine) {
        return LockedExternalTaskDto.fromLockedExternalTasks(fetchExternalTasksExtendedDto.buildQuery(processEngine).execute());
    }

    protected void errorTooManyRequests(AsyncResponse asyncResponse) {
        asyncResponse.resume(new InvalidRequestException(Response.Status.INTERNAL_SERVER_ERROR, "At the moment the server has to handle too many requests at the same time. Please try again later."));
    }

    protected void rejectPendingRequests() {
        Iterator<FetchAndLockRequest> it = this.pendingRequests.iterator();
        while (it.hasNext()) {
            it.next().getAsyncResponse().resume(new RestException(Response.Status.INTERNAL_SERVER_ERROR, "Request rejected due to shutdown of application server."));
        }
    }

    protected ProcessEngine getProcessEngine(FetchAndLockRequest fetchAndLockRequest) {
        return EngineUtil.lookupProcessEngine(fetchAndLockRequest.getProcessEngineName());
    }

    protected boolean isExpired(FetchAndLockRequest fetchAndLockRequest) {
        return fetchAndLockRequest.getTimeoutTimestamp() <= ClockUtil.getCurrentTime().getTime();
    }

    @Override // org.camunda.bpm.engine.rest.spi.FetchAndLockHandler
    public void addPendingRequest(FetchExternalTasksExtendedDto fetchExternalTasksExtendedDto, AsyncResponse asyncResponse, ProcessEngine processEngine) {
        Long asyncResponseTimeout = fetchExternalTasksExtendedDto.getAsyncResponseTimeout();
        if (asyncResponseTimeout != null && asyncResponseTimeout.longValue() > MAX_REQUEST_TIMEOUT) {
            asyncResponse.resume(new InvalidRequestException(Response.Status.BAD_REQUEST, "The asynchronous response timeout cannot be set to a value greater than 1800000 milliseconds"));
            return;
        }
        FetchAndLockRequest dto = new FetchAndLockRequest().setProcessEngineName(processEngine.getName()).setAsyncResponse(asyncResponse).setAuthentication(processEngine.getIdentityService().getCurrentAuthentication()).setDto(fetchExternalTasksExtendedDto);
        LOG.log(Level.FINEST, "New request: {0}", dto);
        FetchAndLockResult tryFetchAndLock = tryFetchAndLock(dto);
        LOG.log(Level.FINEST, "Fetch and lock result: {0}", tryFetchAndLock);
        if (!tryFetchAndLock.wasSuccessful()) {
            Throwable throwable = tryFetchAndLock.getThrowable();
            asyncResponse.resume(throwable);
            LOG.log(Level.FINEST, "Resuming request with error", throwable);
            return;
        }
        List<LockedExternalTaskDto> tasks = tryFetchAndLock.getTasks();
        if (!tasks.isEmpty() || fetchExternalTasksExtendedDto.getAsyncResponseTimeout() == null) {
            asyncResponse.resume(tasks);
            LOG.log(Level.FINEST, "Resuming request with {0}", tasks);
        } else {
            addRequest(dto);
            LOG.log(Level.FINEST, "Deferred request");
        }
    }

    @Override // org.camunda.bpm.engine.rest.spi.FetchAndLockHandler
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        ServletContext servletContext;
        int i = DEFAULT_BLOCKING_QUEUE_CAPACITY;
        if (servletContextEvent != null && (servletContext = servletContextEvent.getServletContext()) != null) {
            parseUniqueWorkerRequestParam(servletContext.getInitParameter(UNIQUE_WORKER_REQUEST_PARAM_NAME));
            i = parseBlockingQueueCapacityParam(servletContext.getInitParameter(BLOCKING_QUEUE_CAPACITY_PARAM_NAME));
        }
        initializeQueue(i);
    }

    protected void parseUniqueWorkerRequestParam(String str) {
        if (str != null) {
            this.isUniqueWorkerRequest = Boolean.parseBoolean(str);
        } else {
            this.isUniqueWorkerRequest = false;
        }
    }

    protected void initializeQueue(int i) {
        LOG.log(Level.FINEST, "Initializing queue with capacity [{0}]", Integer.valueOf(i));
        this.queue = new ArrayBlockingQueue(i);
    }

    private static int parseBlockingQueueCapacityParam(String str) {
        int i = DEFAULT_BLOCKING_QUEUE_CAPACITY;
        if (str != null) {
            try {
                int parseInt = Integer.parseInt(str);
                if (parseInt <= 0) {
                    throw new NumberFormatException("Parameter fetch-and-lock-queue-capacity has to be greater than zero");
                }
                i = parseInt;
            } catch (NumberFormatException e) {
                LOG.log(Level.WARNING, "Invalid blocking queue capacity parameter: [" + str + "], falling back to default value", (Throwable) e);
            }
        }
        return i;
    }

    public List<FetchAndLockRequest> getPendingRequests() {
        return this.pendingRequests;
    }
}
