package org.apache.seatunnel.engine.server.event;

import com.hazelcast.core.IFunction;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.squareup.okhttp.MediaType;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.RequestBody;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.ResponseBody;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/event/JobEventHttpReportHandler.class */
public class JobEventHttpReportHandler implements EventHandler {
    private static final Logger log = LoggerFactory.getLogger(JobEventHttpReportHandler.class);
    public static final ObjectMapper JSON_MAPPER = new ObjectMapper();
    public static final Duration REPORT_INTERVAL = Duration.ofSeconds(10);
    private final String httpEndpoint;
    private final Map<String, String> httpHeaders;
    private final OkHttpClient httpClient;
    private final MediaType httpMediaType;
    private final Ringbuffer ringbuffer;
    private volatile long committedEventIndex;
    private final ScheduledExecutorService scheduledExecutorService;

    public JobEventHttpReportHandler(String str, Ringbuffer ringbuffer) {
        this(str, REPORT_INTERVAL, ringbuffer);
    }

    public JobEventHttpReportHandler(String str, Map<String, String> map, Ringbuffer ringbuffer) {
        this(str, map, REPORT_INTERVAL, ringbuffer);
    }

    public JobEventHttpReportHandler(String str, Duration duration, Ringbuffer ringbuffer) {
        this(str, Collections.emptyMap(), duration, ringbuffer);
    }

    public JobEventHttpReportHandler(String str, Map<String, String> map, Duration duration, Ringbuffer ringbuffer) {
        this.httpMediaType = MediaType.parse("application/json");
        this.httpEndpoint = str;
        this.httpHeaders = map;
        this.ringbuffer = ringbuffer;
        this.committedEventIndex = ringbuffer.headSequence();
        this.httpClient = createHttpClient();
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("http-report-event-scheduler-%d").build());
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                report();
            } catch (Throwable th) {
                log.error("Failed to report event", th);
            }
        }, 0L, duration.getSeconds(), TimeUnit.SECONDS);
    }

    public void handle(Event event) {
        this.ringbuffer.addAsync(event, OverflowPolicy.OVERWRITE).toCompletableFuture().join();
    }

    @VisibleForTesting
    synchronized void report() throws IOException {
        long headSequence = this.ringbuffer.headSequence();
        if (headSequence > this.committedEventIndex) {
            log.warn("The head sequence {} is greater than the committed event index {}", Long.valueOf(headSequence), Long.valueOf(this.committedEventIndex));
            this.committedEventIndex = headSequence;
        }
        ReadResultSet readResultSet = (ReadResultSet) this.ringbuffer.readManyAsync(this.committedEventIndex, 0, 1000, (IFunction) null).toCompletableFuture().join();
        if (readResultSet.size() <= 0) {
            return;
        }
        Request.Builder post = new Request.Builder().url(this.httpEndpoint).post(RequestBody.create(this.httpMediaType, JSON_MAPPER.writeValueAsString(readResultSet.iterator())));
        Map<String, String> map = this.httpHeaders;
        post.getClass();
        map.forEach(post::header);
        Response execute = this.httpClient.newCall(post.build()).execute();
        ResponseBody body = execute.body();
        Throwable th = null;
        try {
            if (execute.isSuccessful()) {
                this.committedEventIndex += readResultSet.readCount();
            } else {
                log.error("Failed to request http server: {}", execute);
            }
            if (body != null) {
                if (0 == 0) {
                    body.close();
                    return;
                }
                try {
                    body.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (body != null) {
                if (0 != 0) {
                    try {
                        body.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    body.close();
                }
            }
            throw th3;
        }
    }

    public void close() {
        log.info("Close http report handler");
        this.scheduledExecutorService.shutdown();
    }

    private OkHttpClient createHttpClient() {
        OkHttpClient okHttpClient = new OkHttpClient();
        okHttpClient.setConnectTimeout(30L, TimeUnit.SECONDS);
        okHttpClient.setWriteTimeout(10L, TimeUnit.SECONDS);
        return okHttpClient;
    }
}
