package org.apache.seatunnel.engine.server;

import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.server.event.JobEventReportOperation;
import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/EventService.class */
public class EventService {
    private static final Logger log = LoggerFactory.getLogger(EventService.class);
    private final BlockingQueue<Event> eventBuffer = new ArrayBlockingQueue(TaskGroupWithIntermediateBlockingQueue.QUEUE_SIZE);
    private ExecutorService eventForwardService;
    private final NodeEngineImpl nodeEngine;

    public EventService(NodeEngineImpl nodeEngineImpl) {
        initEventForwardService();
        this.nodeEngine = nodeEngineImpl;
    }

    private void initEventForwardService() {
        this.eventForwardService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
        this.eventForwardService.submit(() -> {
            ArrayList arrayList = new ArrayList();
            RetryUtils.RetryMaterial retryMaterial = new RetryUtils.RetryMaterial(2, true, exc -> {
                return true;
            });
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    arrayList.clear();
                    arrayList.add(this.eventBuffer.take());
                    this.eventBuffer.drainTo(arrayList, 500);
                    JobEventReportOperation jobEventReportOperation = new JobEventReportOperation(arrayList);
                    RetryUtils.retryWithException(() -> {
                        return NodeEngineUtil.sendOperationToMasterNode(this.nodeEngine, jobEventReportOperation).join();
                    }, retryMaterial);
                    log.debug("Event forward success, events " + arrayList.size());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.info("Event forward thread interrupted");
                } catch (Throwable th) {
                    log.warn("Event forward failed, discard events " + arrayList.size(), th);
                }
            }
        });
    }

    public void reportEvent(Event event) {
        while (!this.eventBuffer.offer(event)) {
            this.eventBuffer.poll();
            log.warn("Event buffer is full, discard the oldest event");
        }
    }

    public void shutdownNow() {
        if (this.eventForwardService != null) {
            this.eventForwardService.shutdownNow();
        }
    }
}
