package io.atomix.core.queue.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import io.atomix.core.queue.Task;
import io.atomix.core.queue.WorkQueueStats;
import io.atomix.core.queue.impl.WorkQueueOperations;
import io.atomix.primitive.service.AbstractPrimitiveService;
import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.session.Session;
import io.atomix.storage.buffer.BufferInput;
import io.atomix.storage.buffer.BufferOutput;
import io.atomix.utils.serializer.KryoNamespace;
import io.atomix.utils.serializer.KryoNamespaces;
import io.atomix.utils.serializer.Serializer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/* loaded from: input_file:io/atomix/core/queue/impl/WorkQueueService.class */
public class WorkQueueService extends AbstractPrimitiveService {
    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.builder().register(KryoNamespaces.BASIC).register(WorkQueueOperations.NAMESPACE).register(WorkQueueEvents.NAMESPACE).register(new Class[]{TaskAssignment.class}).register(new Class[]{new HashMap().keySet().getClass()}).register(new Class[]{ArrayDeque.class}).build());
    private final AtomicLong totalCompleted = new AtomicLong(0);
    private Queue<Task<byte[]>> unassignedTasks = Queues.newArrayDeque();
    private Map<String, TaskAssignment> assignments = Maps.newHashMap();
    private Map<Long, Session> registeredWorkers = Maps.newHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/core/queue/impl/WorkQueueService$TaskAssignment.class */
    public static class TaskAssignment {
        private final long sessionId;
        private final Task<byte[]> task;

        public TaskAssignment(long j, Task<byte[]> task) {
            this.sessionId = j;
            this.task = task;
        }

        public long sessionId() {
            return this.sessionId;
        }

        public Task<byte[]> task() {
            return this.task;
        }

        public String toString() {
            return MoreObjects.toStringHelper(getClass()).add("sessionId", this.sessionId).add("task", this.task).toString();
        }
    }

    public void backup(BufferOutput<?> bufferOutput) {
        HashSet newHashSet = Sets.newHashSet(this.registeredWorkers.keySet());
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        bufferOutput.writeObject(newHashSet, (v1) -> {
            return r2.encode(v1);
        });
        Map<String, TaskAssignment> map = this.assignments;
        Serializer serializer2 = SERIALIZER;
        serializer2.getClass();
        bufferOutput.writeObject(map, (v1) -> {
            return r2.encode(v1);
        });
        Queue<Task<byte[]>> queue = this.unassignedTasks;
        Serializer serializer3 = SERIALIZER;
        serializer3.getClass();
        bufferOutput.writeObject(queue, (v1) -> {
            return r2.encode(v1);
        });
        bufferOutput.writeLong(this.totalCompleted.get());
    }

    public void restore(BufferInput<?> bufferInput) {
        this.registeredWorkers = Maps.newHashMap();
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        for (Long l : (Set) bufferInput.readObject(serializer::decode)) {
            this.registeredWorkers.put(l, getSessions().getSession(l.longValue()));
        }
        Serializer serializer2 = SERIALIZER;
        serializer2.getClass();
        this.assignments = (Map) bufferInput.readObject(serializer2::decode);
        Serializer serializer3 = SERIALIZER;
        serializer3.getClass();
        this.unassignedTasks = (Queue) bufferInput.readObject(serializer3::decode);
        this.totalCompleted.set(bufferInput.readLong());
    }

    protected void configure(ServiceExecutor serviceExecutor) {
        WorkQueueOperations workQueueOperations = WorkQueueOperations.STATS;
        Function function = this::stats;
        Serializer serializer = SERIALIZER;
        serializer.getClass();
        serviceExecutor.register(workQueueOperations, function, (v1) -> {
            return r3.encode(v1);
        });
        serviceExecutor.register(WorkQueueOperations.REGISTER, this::register);
        serviceExecutor.register(WorkQueueOperations.UNREGISTER, this::unregister);
        WorkQueueOperations workQueueOperations2 = WorkQueueOperations.ADD;
        Serializer serializer2 = SERIALIZER;
        serializer2.getClass();
        serviceExecutor.register(workQueueOperations2, serializer2::decode, this::add);
        WorkQueueOperations workQueueOperations3 = WorkQueueOperations.TAKE;
        Serializer serializer3 = SERIALIZER;
        serializer3.getClass();
        Function function2 = serializer3::decode;
        Function function3 = this::take;
        Serializer serializer4 = SERIALIZER;
        serializer4.getClass();
        serviceExecutor.register(workQueueOperations3, function2, function3, (v1) -> {
            return r4.encode(v1);
        });
        WorkQueueOperations workQueueOperations4 = WorkQueueOperations.COMPLETE;
        Serializer serializer5 = SERIALIZER;
        serializer5.getClass();
        serviceExecutor.register(workQueueOperations4, serializer5::decode, this::complete);
        serviceExecutor.register(WorkQueueOperations.CLEAR, this::clear);
    }

    protected WorkQueueStats stats(Commit<Void> commit) {
        return WorkQueueStats.builder().withTotalCompleted(this.totalCompleted.get()).withTotalPending(this.unassignedTasks.size()).withTotalInProgress(this.assignments.size()).build();
    }

    protected void clear(Commit<Void> commit) {
        this.unassignedTasks.clear();
        this.assignments.clear();
        this.registeredWorkers.clear();
        this.totalCompleted.set(0L);
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void register(Commit<Void> commit) {
        this.registeredWorkers.put(commit.session().sessionId().id(), commit.session());
    }

    protected void unregister(Commit<Void> commit) {
        this.registeredWorkers.remove(commit.session().sessionId().id());
    }

    protected void add(Commit<? extends WorkQueueOperations.Add> commit) {
        Collection<byte[]> items = ((WorkQueueOperations.Add) commit.value()).items();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        items.forEach(bArr -> {
            this.unassignedTasks.add(new Task<>(String.format("%d:%d:%d", commit.session().sessionId().id(), Long.valueOf(commit.index()), Integer.valueOf(atomicInteger.getAndIncrement())), bArr));
        });
        this.registeredWorkers.values().forEach(session -> {
            session.publish(WorkQueueEvents.TASK_AVAILABLE);
        });
    }

    protected Collection<Task<byte[]>> take(Commit<? extends WorkQueueOperations.Take> commit) {
        try {
            if (this.unassignedTasks.isEmpty()) {
                return ImmutableList.of();
            }
            long longValue = ((Long) commit.session().sessionId().id()).longValue();
            return (Collection) IntStream.range(0, Math.min(((WorkQueueOperations.Take) commit.value()).maxTasks(), this.unassignedTasks.size())).mapToObj(i -> {
                Task<byte[]> poll = this.unassignedTasks.poll();
                this.assignments.put(poll.taskId(), new TaskAssignment(longValue, poll));
                return poll;
            }).collect(Collectors.toCollection(ArrayList::new));
        } catch (Exception e) {
            getLogger().warn("State machine update failed", e);
            throw Throwables.propagate(e);
        }
    }

    protected void complete(Commit<? extends WorkQueueOperations.Complete> commit) {
        long longValue = ((Long) commit.session().sessionId().id()).longValue();
        try {
            ((WorkQueueOperations.Complete) commit.value()).taskIds().forEach(str -> {
                TaskAssignment taskAssignment = this.assignments.get(str);
                if (taskAssignment == null || taskAssignment.sessionId() != longValue) {
                    return;
                }
                this.assignments.remove(str);
                this.totalCompleted.incrementAndGet();
            });
        } catch (Exception e) {
            getLogger().warn("State machine update failed", e);
            throw Throwables.propagate(e);
        }
    }

    public void onExpire(Session session) {
        evictWorker(((Long) session.sessionId().id()).longValue());
    }

    public void onClose(Session session) {
        evictWorker(((Long) session.sessionId().id()).longValue());
    }

    private void evictWorker(long j) {
        this.registeredWorkers.remove(Long.valueOf(j));
        Iterator<Map.Entry<String, TaskAssignment>> it = this.assignments.entrySet().iterator();
        while (it.hasNext()) {
            TaskAssignment value = it.next().getValue();
            if (value.sessionId() == j) {
                this.unassignedTasks.add(value.task());
                it.remove();
            }
        }
    }
}
