package io.atomix.core.queue.impl;

import com.google.common.collect.ImmutableList;
import io.atomix.core.queue.AsyncWorkQueue;
import io.atomix.core.queue.Task;
import io.atomix.core.queue.WorkQueue;
import io.atomix.core.queue.WorkQueueStats;
import io.atomix.core.queue.impl.WorkQueueOperations;
import io.atomix.primitive.impl.AbstractAsyncPrimitive;
import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.utils.AbstractAccumulator;
import io.atomix.utils.Accumulator;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/core/queue/impl/WorkQueueProxy.class */
public class WorkQueueProxy extends AbstractAsyncPrimitive implements AsyncWorkQueue<byte[]> {
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder().register(KryoNamespaces.BASIC).register(WorkQueueOperations.NAMESPACE).register(WorkQueueEvents.NAMESPACE).build());
    private final Logger log;
    private final ExecutorService executor;
    private final AtomicReference<TaskProcessor> taskProcessor;
    private final Timer timer;
    private final AtomicBoolean isRegistered;

    /* loaded from: input_file:io/atomix/core/queue/impl/WorkQueueProxy$CompletedTaskAccumulator.class */
    private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
        CompletedTaskAccumulator(Timer timer, int i, int i2) {
            super(timer, i, i2, Integer.MAX_VALUE);
        }

        public void processItems(List<String> list) {
            WorkQueueProxy.this.complete(list);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/core/queue/impl/WorkQueueProxy$TaskProcessor.class */
    public class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
        private final AtomicInteger headRoom;
        private final Consumer<byte[]> backingConsumer;
        private final Executor executor;
        private final Accumulator<String> taskCompleter;

        public TaskProcessor(Consumer<byte[]> consumer, int i, Executor executor, Accumulator<String> accumulator) {
            this.backingConsumer = consumer;
            this.headRoom = new AtomicInteger(i);
            this.executor = executor;
            this.taskCompleter = accumulator;
        }

        public int headRoom() {
            return this.headRoom.get();
        }

        @Override // java.util.function.Consumer
        public void accept(Collection<Task<byte[]>> collection) {
            if (collection == null) {
                return;
            }
            this.headRoom.addAndGet((-1) * collection.size());
            collection.forEach(task -> {
                this.executor.execute(() -> {
                    try {
                        this.backingConsumer.accept(task.payload());
                        this.taskCompleter.add(task.taskId());
                    } catch (Exception e) {
                        WorkQueueProxy.this.log.debug("Task execution failed", e);
                    } finally {
                        this.headRoom.incrementAndGet();
                        WorkQueueProxy.this.resumeWork();
                    }
                });
            });
        }
    }

    public WorkQueueProxy(PrimitiveProxy primitiveProxy) {
        super(primitiveProxy);
        this.log = LoggerFactory.getLogger(getClass());
        this.taskProcessor = new AtomicReference<>();
        this.timer = new Timer("atomix-work-queue-completer");
        this.isRegistered = new AtomicBoolean(false);
        this.executor = Executors.newSingleThreadExecutor(Threads.namedThreads("atomix-work-queue-" + primitiveProxy.name() + "-%d", this.log));
        primitiveProxy.addStateChangeListener(state -> {
            if (state == PrimitiveProxy.State.CONNECTED && this.isRegistered.get()) {
                primitiveProxy.invoke(WorkQueueOperations.REGISTER);
            }
        });
        primitiveProxy.addEventListener(WorkQueueEvents.TASK_AVAILABLE, this::resumeWork);
    }

    public CompletableFuture<Void> destroy() {
        this.executor.shutdown();
        this.timer.cancel();
        return this.proxy.invoke(WorkQueueOperations.CLEAR);
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    public CompletableFuture<Void> addMultiple(Collection<byte[]> collection) {
        if (collection.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        PrimitiveProxy primitiveProxy = this.proxy;
        WorkQueueOperations workQueueOperations = WorkQueueOperations.ADD;
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        return primitiveProxy.invoke(workQueueOperations, (v1) -> {
            return r2.encode(v1);
        }, new WorkQueueOperations.Add(collection));
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    public CompletableFuture<Collection<Task<byte[]>>> take(int i) {
        if (i <= 0) {
            return CompletableFuture.completedFuture(ImmutableList.of());
        }
        PrimitiveProxy primitiveProxy = this.proxy;
        WorkQueueOperations workQueueOperations = WorkQueueOperations.TAKE;
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        Function function = (v1) -> {
            return r2.encode(v1);
        };
        WorkQueueOperations.Take take = new WorkQueueOperations.Take(i);
        Serializer serializer2 = SERIALIZER;
        serializer2.getClass();
        return primitiveProxy.invoke(workQueueOperations, function, take, serializer2::decode);
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    public CompletableFuture<Void> complete(Collection<String> collection) {
        if (collection.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        PrimitiveProxy primitiveProxy = this.proxy;
        WorkQueueOperations workQueueOperations = WorkQueueOperations.COMPLETE;
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        return primitiveProxy.invoke(workQueueOperations, (v1) -> {
            return r2.encode(v1);
        }, new WorkQueueOperations.Complete(collection));
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> consumer, int i, Executor executor) {
        this.taskProcessor.set(new TaskProcessor(consumer, i, executor, new CompletedTaskAccumulator(this.timer, 50, 50)));
        return register().thenCompose(r5 -> {
            return take(i);
        }).thenAccept((Consumer<? super U>) this.taskProcessor.get());
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    public CompletableFuture<Void> stopProcessing() {
        return unregister();
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    public CompletableFuture<WorkQueueStats> stats() {
        PrimitiveProxy primitiveProxy = this.proxy;
        WorkQueueOperations workQueueOperations = WorkQueueOperations.STATS;
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        return primitiveProxy.invoke(workQueueOperations, serializer::decode);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resumeWork() {
        TaskProcessor taskProcessor = this.taskProcessor.get();
        if (taskProcessor == null) {
            return;
        }
        take(taskProcessor.headRoom()).whenCompleteAsync((collection, th) -> {
            taskProcessor.accept((Collection<Task<byte[]>>) collection);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> register() {
        return this.proxy.invoke(WorkQueueOperations.REGISTER).thenRun(() -> {
            this.isRegistered.set(true);
        });
    }

    private CompletableFuture<Void> unregister() {
        return this.proxy.invoke(WorkQueueOperations.UNREGISTER).thenRun(() -> {
            this.isRegistered.set(false);
        });
    }

    @Override // io.atomix.core.queue.AsyncWorkQueue
    /* renamed from: sync */
    public WorkQueue<byte[]> mo103sync(Duration duration) {
        return new BlockingWorkQueue(this, duration.toMillis());
    }
}
