/*
 * Decompiled with CFR 0.152.
 */
package com.github.loki4j.logback;

import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.LifeCycle;
import com.github.loki4j.common.Batcher;
import com.github.loki4j.common.BinaryBatch;
import com.github.loki4j.common.ByteBufferQueue;
import com.github.loki4j.common.LogRecord;
import com.github.loki4j.common.LogRecordBatch;
import com.github.loki4j.common.LogRecordStream;
import com.github.loki4j.common.LokiResponse;
import com.github.loki4j.common.LokiThreadFactory;
import com.github.loki4j.common.Writer;
import com.github.loki4j.logback.HttpSender;
import com.github.loki4j.logback.LoggerMetrics;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

public final class DefaultPipeline
extends ContextAwareBase
implements LifeCycle {
    private final long PARK_NS = TimeUnit.MILLISECONDS.toNanos(1L);
    private final ConcurrentLinkedQueue<LogRecord> buffer = new ConcurrentLinkedQueue();
    private final ByteBufferQueue senderQueue;
    private final Batcher batcher;
    private Optional<Comparator<LogRecord>> recordComparator;
    private final Writer writer;
    private final HttpSender sender;
    private final LoggerMetrics metrics;
    private final boolean drainOnStop;
    private volatile boolean started = false;
    private AtomicBoolean acceptNewEvents = new AtomicBoolean(true);
    private AtomicBoolean drainRequested = new AtomicBoolean(false);
    private AtomicLong lastSendTimeMs = new AtomicLong(System.currentTimeMillis());
    private AtomicLong unsentEvents = new AtomicLong(0L);
    private ScheduledExecutorService scheduler;
    private ExecutorService encoderThreadPool;
    private ExecutorService senderThreadPool;
    private ScheduledFuture<?> drainScheduledFuture;
    private boolean traceEnabled = false;

    public DefaultPipeline(Batcher batcher, Optional<Comparator<LogRecord>> recordComparator, Writer writer, ByteBufferQueue senderQueue, HttpSender sender, LoggerMetrics metrics, boolean drainOnStop) {
        this.batcher = batcher;
        this.recordComparator = recordComparator;
        this.writer = writer;
        this.senderQueue = senderQueue;
        this.sender = sender;
        this.drainOnStop = drainOnStop;
        this.metrics = metrics;
    }

    public void start() {
        this.addInfo("Pipeline is starting...");
        this.started = true;
        this.senderThreadPool = Executors.newFixedThreadPool(1, new LokiThreadFactory("loki4j-sender"));
        this.senderThreadPool.execute(() -> this.runSendLoop());
        this.encoderThreadPool = Executors.newFixedThreadPool(1, new LokiThreadFactory("loki4j-encoder"));
        this.encoderThreadPool.execute(() -> this.runEncodeLoop());
        this.scheduler = Executors.newScheduledThreadPool(1, new LokiThreadFactory("loki4j-scheduler"));
        this.drainScheduledFuture = this.scheduler.scheduleAtFixedRate(() -> this.drain(), 100L, 100L, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        this.drainScheduledFuture.cancel(false);
        if (this.drainOnStop) {
            this.addInfo("Pipeline is draining...");
            this.waitSendQueueLessThan(this.batcher.getCapacity(), Long.MAX_VALUE);
            this.lastSendTimeMs.set(0L);
            this.drain();
            this.waitSendQueueIsEmpty(Long.MAX_VALUE);
            this.addInfo("Drain completed");
        }
        this.started = false;
        this.scheduler.shutdown();
        this.encoderThreadPool.shutdown();
        this.senderThreadPool.shutdown();
    }

    public boolean append(long timestamp, int nanos, Supplier<LogRecordStream> stream, Supplier<String> message) {
        long startedNs = System.nanoTime();
        boolean accepted = false;
        if (this.acceptNewEvents.get()) {
            LogRecord record = LogRecord.create(timestamp, nanos, stream.get(), message.get());
            if (this.batcher.validateLogRecordSize(record)) {
                this.buffer.offer(record);
                this.unsentEvents.incrementAndGet();
                accepted = true;
            } else {
                this.addWarn("Dropping the record that exceeds max batch size: " + record.toString());
            }
        }
        if (this.metrics != null) {
            this.metrics.eventAppended(startedNs, !accepted);
        }
        return accepted;
    }

    private void drain() {
        this.drainRequested.set(true);
        this.trace("drain planned", new Object[0]);
    }

    private void runEncodeLoop() {
        LogRecordBatch batch = new LogRecordBatch(this.batcher.getCapacity());
        while (this.started) {
            try {
                this.encodeStep(batch);
            }
            catch (InterruptedException e) {
                this.stop();
            }
        }
    }

    private void runSendLoop() {
        while (this.started) {
            try {
                this.sendStep();
            }
            catch (InterruptedException e) {
                this.stop();
            }
        }
    }

    private void encodeStep(LogRecordBatch batch) throws InterruptedException {
        while (this.started && this.buffer.isEmpty() && !this.drainRequested.get()) {
            LockSupport.parkNanos((Object)this, this.PARK_NS);
        }
        if (!this.started) {
            return;
        }
        this.trace("check encode actions", new Object[0]);
        LogRecord record = this.buffer.peek();
        while (record != null && batch.isEmpty()) {
            this.batcher.checkSizeBeforeAdd(record, batch);
            if (batch.isEmpty()) {
                this.batcher.add((LogRecord)this.buffer.remove(), batch);
            }
            if (!batch.isEmpty()) continue;
            record = this.buffer.peek();
        }
        if (batch.isEmpty() && this.drainRequested.get()) {
            this.batcher.drain(this.lastSendTimeMs.get(), batch);
        }
        this.drainRequested.set(false);
        if (batch.isEmpty()) {
            return;
        }
        this.writeBatch(batch, this.writer);
        while (this.started && !this.senderQueue.offer(batch.batchId(), batch.size(), this.writer.size(), b -> this.writer.toByteBuffer((ByteBuffer)b))) {
            this.acceptNewEvents.set(false);
            LockSupport.parkNanos((Object)this, this.PARK_NS);
        }
        batch.clear();
        this.acceptNewEvents.set(true);
    }

    private void writeBatch(LogRecordBatch batch, Writer writer) {
        long startedNs = System.nanoTime();
        this.recordComparator.ifPresent(cmp -> batch.sort((Comparator<LogRecord>)cmp));
        writer.serializeBatch(batch);
        this.addInfo(String.format(">>> Batch %s converted to %,d bytes", batch, writer.size()));
        if (this.metrics != null) {
            this.metrics.batchEncoded(startedNs, writer.size());
        }
    }

    private void sendStep() throws InterruptedException {
        BinaryBatch batch = this.senderQueue.borrowBuffer();
        while (this.started && batch == null) {
            LockSupport.parkNanos((Object)this, this.PARK_NS);
            batch = this.senderQueue.borrowBuffer();
        }
        if (!this.started) {
            return;
        }
        try {
            this.sendBatch(batch);
            this.lastSendTimeMs.set(System.currentTimeMillis());
            this.trace("sent items: %s", batch.sizeItems);
        }
        finally {
            this.unsentEvents.addAndGet(-batch.sizeItems);
            this.senderQueue.returnBuffer(batch);
        }
    }

    private LokiResponse sendBatch(BinaryBatch batch) {
        long startedNs = System.nanoTime();
        LokiResponse r = null;
        Exception e = null;
        try {
            r = this.sender.send(batch.data);
        }
        catch (Exception re) {
            e = re;
        }
        if (e != null) {
            this.addError(String.format("Error while sending Batch %s to Loki (%s)", batch, this.sender.getUrl()), e);
        } else if (r.status < 200 || r.status > 299) {
            this.addError(String.format("Loki responded with non-success status %s on batch %s. Error: %s", r.status, batch, r.body));
        } else {
            this.addInfo(String.format("<<< Batch %s: Loki responded with status %s", batch, r.status));
        }
        if (this.metrics != null) {
            this.metrics.batchSent(startedNs, batch.sizeBytes, e != null || r.status > 299);
        }
        return r;
    }

    void waitSendQueueIsEmpty(long timeoutMs) {
        this.waitSendQueueLessThan(1, timeoutMs);
    }

    void waitSendQueueLessThan(int size, long timeoutMs) {
        long elapsedNs;
        long timeoutNs = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
        for (elapsedNs = 0L; this.started && this.unsentEvents.get() >= (long)size && elapsedNs < timeoutNs; elapsedNs += this.PARK_NS) {
            LockSupport.parkNanos(this.PARK_NS);
        }
        this.trace("wait send queue: started=%s, buffer(%s)>=%s, %s ms %s elapsed", this.started, this.unsentEvents.get(), size, timeoutMs, elapsedNs < timeoutNs ? "not" : "");
        if (elapsedNs >= timeoutNs) {
            throw new RuntimeException("Not completed within timeout " + timeoutMs + " ms");
        }
    }

    private void trace(String input, Object ... args) {
        if (this.traceEnabled) {
            this.addInfo(String.format(input, args));
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    public void setTraceEnabled(boolean traceEnabled) {
        this.traceEnabled = traceEnabled;
    }
}

