/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.util.Objects;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.protocols.BaseBundler;
import org.apache.activemq.artemis.shaded.org.jgroups.protocols.TP;
import org.apache.activemq.artemis.shaded.org.jgroups.util.RingBuffer;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Runner;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

public class RingBufferBundler
extends BaseBundler {
    protected RingBuffer<Message> rb;
    protected Runner bundler_thread;
    @Property(description="Number of spins before a real lock is acquired")
    protected int num_spins = 40;
    protected static final String THREAD_NAME = "RingBufferBundler";
    protected BiConsumer<Integer, Integer> wait_strategy = SPIN_PARK;
    protected final Runnable run_function = this::readMessages;
    protected static final BiConsumer<Integer, Integer> SPIN = (it, spins) -> {};
    protected static final BiConsumer<Integer, Integer> YIELD = (it, spins) -> Thread.yield();
    protected static final BiConsumer<Integer, Integer> PARK = (it, spins) -> LockSupport.parkNanos(1L);
    protected static final BiConsumer<Integer, Integer> SPIN_PARK = (it, spins) -> {
        if (it < spins / 10) {
            return;
        }
        LockSupport.parkNanos(1L);
    };
    protected static final BiConsumer<Integer, Integer> SPIN_YIELD = (it, spins) -> {
        if (it < spins / 10) {
            return;
        }
        Thread.yield();
    };

    public RingBufferBundler() {
    }

    protected RingBufferBundler(RingBuffer<Message> rb) {
        this.rb = rb;
        this.capacity = rb.capacity();
    }

    public RingBufferBundler(int capacity) {
        this(new RingBuffer<Message>(Message.class, RingBufferBundler.assertPositive(capacity, "bundler capacity cannot be " + capacity)));
    }

    public RingBuffer<Message> buf() {
        return this.rb;
    }

    public Thread getThread() {
        return this.bundler_thread.getThread();
    }

    @Override
    public int size() {
        return this.rb.size();
    }

    @Override
    public int getQueueSize() {
        return this.rb.size();
    }

    public int numSpins() {
        return this.num_spins;
    }

    public RingBufferBundler numSpins(int n) {
        this.num_spins = n;
        return this;
    }

    @Property(description="The wait strategy: spin, yield, park, spin-park, spin-yield", writable=false)
    public String waitStrategy() {
        return RingBufferBundler.print(this.wait_strategy);
    }

    @Property
    public RingBufferBundler waitStrategy(String st) {
        this.wait_strategy = this.createWaitStrategy(st, YIELD);
        return this;
    }

    @Override
    public void init(TP transport) {
        super.init(transport);
        if (this.rb == null) {
            this.rb = new RingBuffer<Message>(Message.class, RingBufferBundler.assertPositive(this.capacity, "bundler capacity cannot be " + this.capacity));
            this.capacity = this.rb.capacity();
        }
        this.bundler_thread = new Runner(transport.getThreadFactory(), THREAD_NAME, this.run_function, () -> this.rb.clear());
    }

    @Override
    public void start() {
        this.bundler_thread.start();
    }

    @Override
    public void stop() {
        this.bundler_thread.stop();
    }

    @Override
    public void send(Message msg) throws Exception {
        this.rb.put(msg);
    }

    public void sendBundledMessages(Message[] buf, int read_index, int available_msgs) {
        byte[] cluster_name = this.transport.cluster_name.chars();
        int start = read_index;
        int end = this.index(start + available_msgs - 1);
        while (true) {
            Message msg;
            if ((msg = buf[start]) == null) {
                if (start == end) break;
                start = this.advance(start);
                continue;
            }
            Address dest = msg.getDest();
            try {
                this.output.position(0);
                Util.writeMessageListHeader(dest, msg.getSrc(), cluster_name, 1, this.output, dest == null);
                int size_pos = this.output.position() - 4;
                int num_msgs = this.marshalMessagesToSameDestination(dest, buf, start, end, this.max_size);
                if (num_msgs > 1) {
                    int current_pos = this.output.position();
                    this.output.position(size_pos);
                    this.output.writeInt(num_msgs);
                    this.output.position(current_pos);
                }
                this.transport.doSend(this.output.buffer(), 0, this.output.position(), dest);
                if (this.transport.statsEnabled()) {
                    this.transport.getMessageStats().incrNumBatchesSent(num_msgs);
                }
            }
            catch (Exception ex) {
                this.log.trace("failed to send message(s) to %s: %s", dest == null ? "group" : dest, ex.getMessage());
            }
            if (start == end) break;
            start = this.advance(start);
        }
    }

    protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size) throws Exception {
        int num_msgs = 0;
        int bytes = 0;
        while (true) {
            Message msg;
            if ((msg = buf[start_index]) != null && Objects.equals(dest, msg.getDest())) {
                int size = msg.size() + 2;
                if (bytes + size > max_bundle_size) break;
                bytes += size;
                ++num_msgs;
                buf[start_index] = null;
                this.output.writeShort(msg.getType());
                msg.writeToNoAddrs(msg.getSrc(), this.output, this.transport.getId());
            }
            if (start_index == end_index) break;
            start_index = this.advance(start_index);
        }
        return num_msgs;
    }

    protected void readMessages() {
        try {
            int available_msgs = this.rb.waitForMessages(this.num_spins, this.wait_strategy);
            int read_index = this.rb.readIndexLockless();
            Message[] buf = this.rb.buf();
            this.sendBundledMessages(buf, read_index, available_msgs);
            this.rb.publishReadIndex(available_msgs);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    protected final int advance(int index) {
        return index + 1 == this.capacity ? 0 : index + 1;
    }

    protected final int index(int idx) {
        return idx & this.capacity - 1;
    }

    protected static String print(BiConsumer<Integer, Integer> wait_strategy) {
        if (wait_strategy == null) {
            return null;
        }
        if (wait_strategy == SPIN) {
            return "spin";
        }
        if (wait_strategy == YIELD) {
            return "yield";
        }
        if (wait_strategy == PARK) {
            return "park";
        }
        if (wait_strategy == SPIN_PARK) {
            return "spin-park";
        }
        if (wait_strategy == SPIN_YIELD) {
            return "spin-yield";
        }
        return wait_strategy.getClass().getSimpleName();
    }

    protected BiConsumer<Integer, Integer> createWaitStrategy(String st, BiConsumer<Integer, Integer> default_wait_strategy) {
        if (st == null) {
            return default_wait_strategy;
        }
        switch (st) {
            case "spin": {
                this.wait_strategy = SPIN;
                return this.wait_strategy;
            }
            case "yield": {
                this.wait_strategy = YIELD;
                return this.wait_strategy;
            }
            case "park": {
                this.wait_strategy = PARK;
                return this.wait_strategy;
            }
            case "spin_park": 
            case "spin-park": {
                this.wait_strategy = SPIN_PARK;
                return this.wait_strategy;
            }
            case "spin_yield": 
            case "spin-yield": {
                this.wait_strategy = SPIN_YIELD;
                return this.wait_strategy;
            }
        }
        try {
            Class<?> clazz = Util.loadClass(st, this.getClass());
            return (BiConsumer)clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (Throwable t) {
            this.log.error("failed creating wait_strategy " + st, t);
            return default_wait_strategy;
        }
    }

    protected static int assertPositive(int value, String message) {
        if (value <= 0) {
            throw new IllegalArgumentException(message);
        }
        return value;
    }
}

