/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.cp.internal.raft.impl;

import com.hazelcast.config.cp.RaftAlgorithmConfig;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.CPMember;
import com.hazelcast.cp.exception.LeaderDemotedException;
import com.hazelcast.cp.exception.StaleAppendRequestException;
import com.hazelcast.cp.internal.raft.MembershipChangeMode;
import com.hazelcast.cp.internal.raft.QueryPolicy;
import com.hazelcast.cp.internal.raft.command.DestroyRaftGroupCmd;
import com.hazelcast.cp.internal.raft.command.RaftGroupCmd;
import com.hazelcast.cp.internal.raft.impl.RaftEndpoint;
import com.hazelcast.cp.internal.raft.impl.RaftIntegration;
import com.hazelcast.cp.internal.raft.impl.RaftNode;
import com.hazelcast.cp.internal.raft.impl.RaftNodeStatus;
import com.hazelcast.cp.internal.raft.impl.RaftRole;
import com.hazelcast.cp.internal.raft.impl.command.UpdateRaftGroupMembersCmd;
import com.hazelcast.cp.internal.raft.impl.dto.AppendFailureResponse;
import com.hazelcast.cp.internal.raft.impl.dto.AppendRequest;
import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse;
import com.hazelcast.cp.internal.raft.impl.dto.InstallSnapshot;
import com.hazelcast.cp.internal.raft.impl.dto.PreVoteRequest;
import com.hazelcast.cp.internal.raft.impl.dto.PreVoteResponse;
import com.hazelcast.cp.internal.raft.impl.dto.TriggerLeaderElection;
import com.hazelcast.cp.internal.raft.impl.dto.VoteRequest;
import com.hazelcast.cp.internal.raft.impl.dto.VoteResponse;
import com.hazelcast.cp.internal.raft.impl.handler.AppendFailureResponseHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.AppendRequestHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.AppendSuccessResponseHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.InstallSnapshotHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.PreVoteRequestHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.PreVoteResponseHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.TriggerLeaderElectionHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.VoteRequestHandlerTask;
import com.hazelcast.cp.internal.raft.impl.handler.VoteResponseHandlerTask;
import com.hazelcast.cp.internal.raft.impl.log.LogEntry;
import com.hazelcast.cp.internal.raft.impl.log.RaftLog;
import com.hazelcast.cp.internal.raft.impl.log.SnapshotEntry;
import com.hazelcast.cp.internal.raft.impl.persistence.NopRaftStateStore;
import com.hazelcast.cp.internal.raft.impl.persistence.RaftStateStore;
import com.hazelcast.cp.internal.raft.impl.persistence.RestoredRaftState;
import com.hazelcast.cp.internal.raft.impl.state.FollowerState;
import com.hazelcast.cp.internal.raft.impl.state.LeaderState;
import com.hazelcast.cp.internal.raft.impl.state.QueryState;
import com.hazelcast.cp.internal.raft.impl.state.RaftGroupMembers;
import com.hazelcast.cp.internal.raft.impl.state.RaftState;
import com.hazelcast.cp.internal.raft.impl.task.InitLeadershipTransferTask;
import com.hazelcast.cp.internal.raft.impl.task.MembershipChangeTask;
import com.hazelcast.cp.internal.raft.impl.task.PreVoteTask;
import com.hazelcast.cp.internal.raft.impl.task.QueryTask;
import com.hazelcast.cp.internal.raft.impl.task.RaftNodeStatusAwareTask;
import com.hazelcast.cp.internal.raft.impl.task.ReplicateTask;
import com.hazelcast.cp.internal.raft.impl.util.PostponedResponse;
import com.hazelcast.internal.util.BiTuple;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.RandomPicker;
import com.hazelcast.internal.util.collection.Long2ObjectHashMap;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

public final class RaftNodeImpl
implements RaftNode {
    private static final int LEADER_ELECTION_TIMEOUT_RANGE = 1000;
    private static final long RAFT_NODE_INIT_DELAY_MILLIS = 500L;
    private static final float RATIO_TO_KEEP_LOGS_AFTER_SNAPSHOT = 0.1f;
    private final CPGroupId groupId;
    private final ILogger logger;
    private final RaftState state;
    private final RaftIntegration raftIntegration;
    private final Long2ObjectHashMap<InternalCompletableFuture> futures = new Long2ObjectHashMap();
    private final long heartbeatPeriodInMillis;
    private final int leaderElectionTimeout;
    private final int maxUncommittedEntryCount;
    private final int appendRequestMaxEntryCount;
    private final int commitIndexAdvanceCountToSnapshot;
    private final int maxMissedLeaderHeartbeatCount;
    private final long appendRequestBackoffTimeoutInMillis;
    private final int maxNumberOfLogsToKeepAfterSnapshot;
    private final Runnable appendRequestBackoffResetTask;
    private final Runnable flushTask;
    private long lastAppendEntriesTimestamp;
    private boolean appendRequestBackoffResetTaskScheduled;
    private boolean flushTaskSubmitted;
    private volatile RaftNodeStatus status = RaftNodeStatus.INITIAL;

    private RaftNodeImpl(CPGroupId groupId, RaftEndpoint localMember, Collection<RaftEndpoint> members, RaftStateStore stateStore, RaftAlgorithmConfig raftAlgorithmConfig, RaftIntegration raftIntegration) {
        Preconditions.checkNotNull(groupId);
        Preconditions.checkNotNull(localMember);
        Preconditions.checkNotNull(members);
        Preconditions.checkNotNull(stateStore);
        Preconditions.checkNotNull(raftAlgorithmConfig);
        Preconditions.checkNotNull(raftIntegration);
        this.groupId = groupId;
        this.raftIntegration = raftIntegration;
        this.maxUncommittedEntryCount = raftAlgorithmConfig.getUncommittedEntryCountToRejectNewAppends();
        this.appendRequestMaxEntryCount = raftAlgorithmConfig.getAppendRequestMaxEntryCount();
        this.commitIndexAdvanceCountToSnapshot = raftAlgorithmConfig.getCommitIndexAdvanceCountToSnapshot();
        this.leaderElectionTimeout = (int)raftAlgorithmConfig.getLeaderElectionTimeoutInMillis();
        this.heartbeatPeriodInMillis = raftAlgorithmConfig.getLeaderHeartbeatPeriodInMillis();
        this.maxMissedLeaderHeartbeatCount = raftAlgorithmConfig.getMaxMissedLeaderHeartbeatCount();
        this.maxNumberOfLogsToKeepAfterSnapshot = (int)((float)this.commitIndexAdvanceCountToSnapshot * 0.1f);
        this.appendRequestBackoffTimeoutInMillis = raftAlgorithmConfig.getAppendRequestBackoffTimeoutInMillis();
        int logCapacity = this.commitIndexAdvanceCountToSnapshot + this.maxUncommittedEntryCount + this.maxNumberOfLogsToKeepAfterSnapshot;
        this.state = RaftState.newRaftState(groupId, localMember, members, logCapacity, stateStore);
        this.logger = this.getLogger(RaftNode.class);
        this.appendRequestBackoffResetTask = new AppendRequestBackoffResetTask();
        if (stateStore instanceof NopRaftStateStore) {
            this.flushTask = null;
            this.flushTaskSubmitted = true;
        } else {
            this.flushTask = new FlushTask();
        }
    }

    private RaftNodeImpl(CPGroupId groupId, RestoredRaftState restoredState, RaftStateStore stateStore, RaftAlgorithmConfig config, RaftIntegration raftIntegration) {
        Preconditions.checkNotNull(groupId);
        Preconditions.checkNotNull(stateStore);
        Preconditions.checkNotNull(raftIntegration);
        Preconditions.checkNotNull(groupId);
        this.groupId = groupId;
        this.raftIntegration = raftIntegration;
        this.maxUncommittedEntryCount = config.getUncommittedEntryCountToRejectNewAppends();
        this.appendRequestMaxEntryCount = config.getAppendRequestMaxEntryCount();
        this.commitIndexAdvanceCountToSnapshot = config.getCommitIndexAdvanceCountToSnapshot();
        this.leaderElectionTimeout = (int)config.getLeaderElectionTimeoutInMillis();
        this.heartbeatPeriodInMillis = config.getLeaderHeartbeatPeriodInMillis();
        this.maxMissedLeaderHeartbeatCount = config.getMaxMissedLeaderHeartbeatCount();
        this.maxNumberOfLogsToKeepAfterSnapshot = (int)((float)this.commitIndexAdvanceCountToSnapshot * 0.1f);
        this.appendRequestBackoffTimeoutInMillis = config.getAppendRequestBackoffTimeoutInMillis();
        int logCapacity = this.commitIndexAdvanceCountToSnapshot + this.maxUncommittedEntryCount + this.maxNumberOfLogsToKeepAfterSnapshot;
        this.state = RaftState.restoreRaftState(groupId, restoredState, logCapacity, stateStore);
        this.logger = this.getLogger(RaftNode.class);
        this.appendRequestBackoffResetTask = new AppendRequestBackoffResetTask();
        if (stateStore instanceof NopRaftStateStore) {
            this.flushTask = null;
            this.flushTaskSubmitted = true;
        } else {
            this.flushTask = new FlushTask();
        }
    }

    public static RaftNodeImpl newRaftNode(CPGroupId groupId, RaftEndpoint localMember, Collection<RaftEndpoint> members, RaftAlgorithmConfig config, RaftIntegration integration) {
        return new RaftNodeImpl(groupId, Preconditions.checkNotNull(localMember), Preconditions.checkNotNull(members), NopRaftStateStore.INSTANCE, config, integration);
    }

    public static RaftNodeImpl newRaftNode(CPGroupId groupId, RaftEndpoint localMember, Collection<RaftEndpoint> members, RaftAlgorithmConfig config, RaftIntegration integration, RaftStateStore raftStateStore) {
        return new RaftNodeImpl(groupId, Preconditions.checkNotNull(localMember), Preconditions.checkNotNull(members), raftStateStore, config, integration);
    }

    public static RaftNodeImpl restoreRaftNode(CPGroupId groupId, RestoredRaftState restoredState, RaftAlgorithmConfig config, RaftIntegration integration) {
        return new RaftNodeImpl(groupId, restoredState, NopRaftStateStore.INSTANCE, config, integration);
    }

    public static RaftNodeImpl restoreRaftNode(CPGroupId groupId, RestoredRaftState restoredState, RaftAlgorithmConfig config, RaftIntegration integration, RaftStateStore raftStateStore) {
        return new RaftNodeImpl(groupId, restoredState, raftStateStore, config, integration);
    }

    public ILogger getLogger(Class clazz) {
        String name = this.state.name();
        return this.raftIntegration.getLogger(clazz.getName() + "(" + name + ")");
    }

    @Override
    public CPGroupId getGroupId() {
        return this.groupId;
    }

    @Override
    public RaftEndpoint getLocalMember() {
        return this.state.localEndpoint();
    }

    @Override
    public RaftEndpoint getLeader() {
        return this.state.leader();
    }

    @Override
    public RaftNodeStatus getStatus() {
        return this.status;
    }

    @Override
    public Collection<RaftEndpoint> getInitialMembers() {
        return this.state.initialMembers();
    }

    @Override
    public Collection<RaftEndpoint> getCommittedMembers() {
        return this.state.committedGroupMembers().members();
    }

    @Override
    public Collection<RaftEndpoint> getAppliedMembers() {
        return this.state.lastGroupMembers().members();
    }

    @Override
    public InternalCompletableFuture forceSetTerminatedStatus() {
        InternalCompletableFuture resultFuture = this.raftIntegration.newCompletableFuture();
        if (this.isTerminatedOrSteppedDown()) {
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Already stepped down or terminated, not setting `TERMINATED` status.");
            }
            resultFuture.complete(null);
            return resultFuture;
        }
        this.execute(() -> {
            Throwable failure = null;
            try {
                if (this.isTerminatedOrSteppedDown()) {
                    return;
                }
                if (this.status == RaftNodeStatus.INITIAL) {
                    this.setStatus(RaftNodeStatus.TERMINATED);
                    return;
                }
                this.invalidateFuturesFrom(this.state.commitIndex() + 1L);
                LeaderState leaderState = this.state.leaderState();
                if (leaderState != null) {
                    for (BiTuple<Object, InternalCompletableFuture> t : leaderState.queryState().operations()) {
                        ((InternalCompletableFuture)t.element2).completeExceptionally(new LeaderDemotedException(this.state.localEndpoint(), null));
                    }
                }
                this.state.completeLeadershipTransfer(new LeaderDemotedException(this.state.localEndpoint(), null));
                this.setStatus(RaftNodeStatus.TERMINATED);
            }
            catch (Throwable t) {
                failure = t;
                this.logger.severe("Failure during force-termination", t);
                if (this.status != RaftNodeStatus.TERMINATED && this.status != RaftNodeStatus.STEPPED_DOWN) {
                    this.setStatus(RaftNodeStatus.TERMINATED);
                }
            }
            finally {
                if (failure == null) {
                    resultFuture.complete(null);
                } else {
                    resultFuture.completeExceptionally(failure);
                }
            }
        });
        return resultFuture;
    }

    public void start() {
        if (this.status == RaftNodeStatus.TERMINATED) {
            this.logger.warning("Not starting since already terminated...");
            return;
        }
        if (this.status != RaftNodeStatus.INITIAL) {
            throw new IllegalStateException("Cannot start RaftNode when " + this.status);
        }
        if (!this.raftIntegration.isReady()) {
            this.raftIntegration.schedule(this::start, 500L, TimeUnit.MILLISECONDS);
            return;
        }
        this.logger.fine("Starting Raft node: " + this.state.localEndpoint() + " for " + this.groupId + " with " + this.state.memberCount() + " members: " + this.state.members());
        this.execute(() -> {
            if (this.status == RaftNodeStatus.TERMINATED) {
                this.logger.warning("Not starting since already terminated...");
                return;
            }
            if (this.status != RaftNodeStatus.INITIAL) {
                throw new IllegalStateException("Cannot start RaftNode when " + this.status);
            }
            this.initRestoredState();
            try {
                this.state.init();
            }
            catch (IOException e) {
                this.logger.severe("Raft node start failed!", e);
                this.setStatus(RaftNodeStatus.TERMINATED);
                return;
            }
            new PreVoteTask(this, 0).run();
            this.scheduleLeaderFailureDetection();
            if (this.status == RaftNodeStatus.INITIAL) {
                this.setStatus(RaftNodeStatus.ACTIVE);
            }
        });
    }

    private void closeStateStore() {
        try {
            this.state.stateStore().close();
        }
        catch (IOException e) {
            this.logger.severe(e);
        }
    }

    @Override
    public void handlePreVoteRequest(PreVoteRequest request) {
        this.execute(new PreVoteRequestHandlerTask(this, request));
    }

    @Override
    public void handlePreVoteResponse(PreVoteResponse response) {
        this.execute(new PreVoteResponseHandlerTask(this, response));
    }

    @Override
    public void handleVoteRequest(VoteRequest request) {
        this.execute(new VoteRequestHandlerTask(this, request));
    }

    @Override
    public void handleVoteResponse(VoteResponse response) {
        this.execute(new VoteResponseHandlerTask(this, response));
    }

    @Override
    public void handleAppendRequest(AppendRequest request) {
        this.execute(new AppendRequestHandlerTask(this, request));
    }

    @Override
    public void handleAppendResponse(AppendSuccessResponse response) {
        this.execute(new AppendSuccessResponseHandlerTask(this, response));
    }

    @Override
    public void handleAppendResponse(AppendFailureResponse response) {
        this.execute(new AppendFailureResponseHandlerTask(this, response));
    }

    @Override
    public void handleInstallSnapshot(InstallSnapshot request) {
        this.execute(new InstallSnapshotHandlerTask(this, request));
    }

    @Override
    public void handleTriggerLeaderElection(TriggerLeaderElection request) {
        this.execute(new TriggerLeaderElectionHandlerTask(this, request));
    }

    @Override
    public InternalCompletableFuture replicate(Object operation) {
        InternalCompletableFuture resultFuture = this.raftIntegration.newCompletableFuture();
        this.execute(new ReplicateTask(this, operation, resultFuture));
        return resultFuture;
    }

    @Override
    public InternalCompletableFuture replicateMembershipChange(RaftEndpoint member, MembershipChangeMode mode) {
        InternalCompletableFuture resultFuture = this.raftIntegration.newCompletableFuture();
        this.execute(new MembershipChangeTask(this, resultFuture, member, mode));
        return resultFuture;
    }

    @Override
    public InternalCompletableFuture replicateMembershipChange(RaftEndpoint member, MembershipChangeMode mode, long groupMembersCommitIndex) {
        InternalCompletableFuture resultFuture = this.raftIntegration.newCompletableFuture();
        this.raftIntegration.execute(new MembershipChangeTask(this, resultFuture, member, mode, groupMembersCommitIndex));
        return resultFuture;
    }

    @Override
    public InternalCompletableFuture query(Object operation, QueryPolicy queryPolicy) {
        InternalCompletableFuture resultFuture = this.raftIntegration.newCompletableFuture();
        this.raftIntegration.execute(new QueryTask(this, operation, queryPolicy, resultFuture));
        return resultFuture;
    }

    @Override
    public InternalCompletableFuture transferLeadership(RaftEndpoint endpoint) {
        InternalCompletableFuture resultFuture = this.raftIntegration.newCompletableFuture();
        this.raftIntegration.execute(new InitLeadershipTransferTask(this, endpoint, resultFuture));
        return resultFuture;
    }

    @Override
    public boolean isTerminatedOrSteppedDown() {
        return this.status == RaftNodeStatus.TERMINATED || this.status == RaftNodeStatus.STEPPED_DOWN;
    }

    public void setStatus(RaftNodeStatus newStatus) {
        if (this.status == RaftNodeStatus.TERMINATED || this.status == RaftNodeStatus.STEPPED_DOWN) {
            throw new IllegalStateException("Cannot set status: " + newStatus + " since already " + this.status);
        }
        RaftNodeStatus prevStatus = this.status;
        if (prevStatus != newStatus) {
            Level level = Level.WARNING;
            if (newStatus == RaftNodeStatus.ACTIVE) {
                level = Level.INFO;
            } else if ((newStatus == RaftNodeStatus.TERMINATED || newStatus == RaftNodeStatus.STEPPED_DOWN) && prevStatus != RaftNodeStatus.INITIAL) {
                this.closeStateStore();
            }
            this.status = newStatus;
            this.logger.log(level, "Status is set to: " + newStatus);
            this.raftIntegration.onNodeStatusChange(newStatus);
        }
    }

    private void groupDestroyed() {
        if (this.status != RaftNodeStatus.TERMINATED) {
            this.closeStateStore();
            this.status = RaftNodeStatus.TERMINATED;
            this.logger.warning("Status is set to: " + RaftNodeStatus.TERMINATED + " on group destroyed");
        }
        this.raftIntegration.onGroupDestroyed(this.groupId);
    }

    public long getLeaderElectionTimeoutInMillis() {
        return RandomPicker.getInt(this.leaderElectionTimeout, this.leaderElectionTimeout + 1000);
    }

    public boolean canReplicateNewEntry(Object operation) {
        long commitIndex;
        if (this.isTerminatedOrSteppedDown()) {
            return false;
        }
        RaftLog log = this.state.log();
        long lastLogIndex = log.lastLogOrSnapshotIndex();
        if (lastLogIndex - (commitIndex = this.state.commitIndex()) >= (long)this.maxUncommittedEntryCount) {
            return false;
        }
        if (this.status == RaftNodeStatus.TERMINATING) {
            return false;
        }
        if (this.status == RaftNodeStatus.UPDATING_GROUP_MEMBER_LIST) {
            return this.state.lastGroupMembers().isKnownMember(this.getLocalMember()) && !(operation instanceof RaftGroupCmd);
        }
        if (operation instanceof UpdateRaftGroupMembersCmd) {
            LogEntry lastCommittedEntry;
            LogEntry logEntry = lastCommittedEntry = commitIndex == log.snapshotIndex() ? log.snapshot() : log.getLogEntry(commitIndex);
            assert (lastCommittedEntry != null);
            return lastCommittedEntry.term() == this.state.term();
        }
        return this.state.leadershipTransferState() == null;
    }

    public boolean canQueryLinearizable() {
        RaftLog log;
        LogEntry lastCommittedEntry;
        if (this.isTerminatedOrSteppedDown()) {
            return false;
        }
        long commitIndex = this.state.commitIndex();
        LogEntry logEntry = lastCommittedEntry = commitIndex == (log = this.state.log()).snapshotIndex() ? log.snapshot() : log.getLogEntry(commitIndex);
        assert (lastCommittedEntry != null);
        if (lastCommittedEntry.term() != this.state.term()) {
            return false;
        }
        QueryState queryState = this.state.leaderState().queryState();
        return queryState.queryCount() < this.maxUncommittedEntryCount;
    }

    public boolean isLinearizableReadOptimizationEnabled() {
        return this.raftIntegration.isLinearizableReadOptimizationEnabled();
    }

    private void scheduleLeaderFailureDetection() {
        this.schedule(new LeaderFailureDetectionTask(), this.getLeaderElectionTimeoutInMillis());
    }

    private void scheduleHeartbeat() {
        this.schedule(new HeartbeatTask(), this.heartbeatPeriodInMillis);
    }

    public void send(PreVoteRequest request, RaftEndpoint target) {
        this.raftIntegration.send(request, target);
    }

    public void send(PreVoteResponse response, RaftEndpoint target) {
        this.raftIntegration.send(response, target);
    }

    public void send(VoteRequest request, RaftEndpoint target) {
        this.raftIntegration.send(request, target);
    }

    public void send(VoteResponse response, RaftEndpoint target) {
        this.raftIntegration.send(response, target);
    }

    public void send(AppendSuccessResponse response, RaftEndpoint target) {
        this.raftIntegration.send(response, target);
    }

    public void send(AppendFailureResponse response, RaftEndpoint target) {
        this.raftIntegration.send(response, target);
    }

    public void send(TriggerLeaderElection request, RaftEndpoint target) {
        this.raftIntegration.send(request, target);
    }

    public void broadcastAppendRequest() {
        for (RaftEndpoint follower : this.state.remoteMembers()) {
            this.sendAppendRequest(follower);
        }
        this.updateLastAppendEntriesTimestamp();
    }

    public void sendAppendRequest(RaftEndpoint follower) {
        LogEntry[] entries;
        if (!this.raftIntegration.isReachable(follower)) {
            return;
        }
        RaftLog raftLog = this.state.log();
        LeaderState leaderState = this.state.leaderState();
        FollowerState followerState = leaderState.getFollowerState(follower);
        if (followerState.isAppendRequestBackoffSet()) {
            return;
        }
        long nextIndex = followerState.nextIndex();
        if (nextIndex <= raftLog.snapshotIndex() && (!raftLog.containsLogEntry(nextIndex) || nextIndex > 1L && !raftLog.containsLogEntry(nextIndex - 1L))) {
            InstallSnapshot installSnapshot = new InstallSnapshot(this.state.localEndpoint(), this.state.term(), raftLog.snapshot(), leaderState.queryRound(), followerState.setAppendRequestBackoff());
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Sending " + installSnapshot + " to " + follower + " since next index: " + nextIndex + " <= snapshot index: " + raftLog.snapshotIndex());
            }
            this.raftIntegration.send(installSnapshot, follower);
            followerState.setMaxAppendRequestBackoff();
            this.scheduleAppendAckResetTask();
            return;
        }
        int prevEntryTerm = 0;
        long prevEntryIndex = 0L;
        boolean shouldBackoff = true;
        if (nextIndex > 1L) {
            LogEntry prevEntry;
            prevEntryIndex = nextIndex - 1L;
            LogEntry logEntry = prevEntry = raftLog.snapshotIndex() == prevEntryIndex ? raftLog.snapshot() : raftLog.getLogEntry(prevEntryIndex);
            assert (prevEntry != null) : "Prev entry index: " + prevEntryIndex + ", snapshot: " + raftLog.snapshotIndex();
            prevEntryTerm = prevEntry.term();
            long matchIndex = followerState.matchIndex();
            if (matchIndex == 0L) {
                entries = new LogEntry[]{};
            } else if (nextIndex <= raftLog.lastLogOrSnapshotIndex()) {
                long end = Math.min(nextIndex + (long)this.appendRequestMaxEntryCount, raftLog.lastLogOrSnapshotIndex());
                entries = raftLog.getEntriesBetween(nextIndex, end);
            } else {
                entries = new LogEntry[]{};
                shouldBackoff = false;
            }
        } else if (nextIndex == 1L && raftLog.lastLogOrSnapshotIndex() > 0L) {
            long end = Math.min(nextIndex + (long)this.appendRequestMaxEntryCount, raftLog.lastLogOrSnapshotIndex());
            entries = raftLog.getEntriesBetween(nextIndex, end);
        } else {
            entries = new LogEntry[]{};
            shouldBackoff = false;
        }
        if (shouldBackoff) {
            followerState.setAppendRequestBackoff();
        }
        AppendRequest request = new AppendRequest(this.getLocalMember(), this.state.term(), prevEntryTerm, prevEntryIndex, this.state.commitIndex(), entries, leaderState.queryRound(), followerState.flowControlSequenceNumber());
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Sending " + request + " to " + follower + " with next index: " + nextIndex);
        }
        this.raftIntegration.send(request, follower);
        if (entries.length > 0 && entries[entries.length - 1].index() > leaderState.flushedLogIndex()) {
            this.submitFlushTask();
        }
        if (shouldBackoff) {
            this.scheduleAppendAckResetTask();
        }
    }

    public void applyLogEntries() {
        long lastApplied;
        long commitIndex = this.state.commitIndex();
        if (commitIndex == (lastApplied = this.state.lastApplied())) {
            return;
        }
        assert (commitIndex > lastApplied) : "commit index: " + commitIndex + " cannot be smaller than last applied: " + lastApplied;
        RaftLog raftLog = this.state.log();
        for (long idx = this.state.lastApplied() + 1L; idx <= commitIndex; ++idx) {
            LogEntry entry = raftLog.getLogEntry(idx);
            if (entry == null) {
                String msg = "Failed to get log entry at index: " + idx;
                this.logger.severe(msg);
                throw new AssertionError((Object)msg);
            }
            this.applyLogEntry(entry);
            this.state.lastApplied(idx);
        }
        assert (this.status != RaftNodeStatus.TERMINATED || commitIndex == raftLog.lastLogOrSnapshotIndex()) : "commit index: " + commitIndex + " must be equal to " + raftLog.lastLogOrSnapshotIndex() + " on termination.";
        if (this.state.role() == RaftRole.LEADER || this.state.role() == RaftRole.FOLLOWER) {
            this.takeSnapshotIfCommitIndexAdvanced();
        }
    }

    private void applyLogEntry(LogEntry entry) {
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Processing " + entry);
        }
        Object response = null;
        Object operation = entry.operation();
        if (operation instanceof RaftGroupCmd) {
            if (operation instanceof DestroyRaftGroupCmd) {
                this.groupDestroyed();
            } else if (operation instanceof UpdateRaftGroupMembersCmd) {
                if (this.state.lastGroupMembers().index() < entry.index()) {
                    this.setStatus(RaftNodeStatus.UPDATING_GROUP_MEMBER_LIST);
                    UpdateRaftGroupMembersCmd op = (UpdateRaftGroupMembersCmd)operation;
                    this.updateGroupMembers(entry.index(), op.getMembers());
                }
                assert (this.status == RaftNodeStatus.UPDATING_GROUP_MEMBER_LIST) : "STATUS: " + this.status;
                assert (this.state.lastGroupMembers().index() == entry.index());
                this.state.commitGroupMembers();
                UpdateRaftGroupMembersCmd cmd = (UpdateRaftGroupMembersCmd)operation;
                if (cmd.getMember().equals(this.state.localEndpoint()) && cmd.getMode() == MembershipChangeMode.REMOVE) {
                    this.setStatus(RaftNodeStatus.STEPPED_DOWN);
                    this.invalidateFuturesUntil(entry.index() - 1L, new LeaderDemotedException(this.state.localEndpoint(), null));
                } else {
                    this.setStatus(RaftNodeStatus.ACTIVE);
                }
                response = entry.index();
            } else {
                response = new IllegalArgumentException("Invalid command: " + operation);
            }
        } else {
            response = this.raftIntegration.runOperation(operation, entry.index());
        }
        if (response == PostponedResponse.INSTANCE) {
            return;
        }
        this.completeFuture(entry.index(), response);
    }

    public void updateLastAppendEntriesTimestamp() {
        this.lastAppendEntriesTimestamp = Clock.currentTimeMillis();
    }

    public long lastAppendEntriesTimestamp() {
        return this.lastAppendEntriesTimestamp;
    }

    public RaftState state() {
        return this.state;
    }

    public void runQuery(Object operation, InternalCompletableFuture resultFuture) {
        Object result = this.raftIntegration.runOperation(operation, this.state.commitIndex());
        resultFuture.complete(result);
    }

    public void execute(Runnable task) {
        this.raftIntegration.execute(task);
    }

    public void schedule(Runnable task, long delayInMillis) {
        if (this.isTerminatedOrSteppedDown()) {
            return;
        }
        this.raftIntegration.schedule(task, delayInMillis, TimeUnit.MILLISECONDS);
    }

    public void registerFuture(long entryIndex, InternalCompletableFuture future) {
        InternalCompletableFuture f = this.futures.put(entryIndex, future);
        assert (f == null) : "Future object is already registered for entry index: " + entryIndex;
    }

    public void completeFuture(long entryIndex, Object response) {
        InternalCompletableFuture f = this.futures.remove(entryIndex);
        if (f != null) {
            if (response instanceof Throwable) {
                f.completeExceptionally((Throwable)response);
            } else {
                f.complete(response);
            }
        }
    }

    public void invalidateFuturesFrom(long entryIndex) {
        int count = 0;
        Iterator<Map.Entry<Long, InternalCompletableFuture>> iterator = this.futures.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, InternalCompletableFuture> entry = iterator.next();
            long index = entry.getKey();
            if (index < entryIndex) continue;
            entry.getValue().completeExceptionally(new LeaderDemotedException(this.state.localEndpoint(), this.state.leader()));
            iterator.remove();
            ++count;
        }
        if (count > 0) {
            this.logger.warning("Invalidated " + count + " futures from log index: " + entryIndex);
        }
    }

    private void invalidateFuturesUntil(long entryIndex, Throwable response) {
        int count = 0;
        Iterator<Map.Entry<Long, InternalCompletableFuture>> iterator = this.futures.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, InternalCompletableFuture> entry = iterator.next();
            long index = entry.getKey();
            if (index > entryIndex) continue;
            entry.getValue().completeExceptionally(response);
            iterator.remove();
            ++count;
        }
        if (count > 0) {
            this.logger.warning("Invalidated " + count + " futures until log index: " + entryIndex);
        }
    }

    private void takeSnapshotIfCommitIndexAdvanced() {
        long[] matchIndices;
        boolean allMatchIndicesKnown;
        long commitIndex = this.state.commitIndex();
        if (commitIndex - this.state.log().snapshotIndex() < (long)this.commitIndexAdvanceCountToSnapshot) {
            return;
        }
        if (this.isTerminatedOrSteppedDown()) {
            return;
        }
        RaftLog log = this.state.log();
        Object snapshot = this.raftIntegration.takeSnapshot(commitIndex);
        if (snapshot instanceof Throwable) {
            Throwable t = (Throwable)snapshot;
            this.logger.severe("Could not take snapshot at commit index: " + commitIndex, t);
            return;
        }
        int snapshotTerm = log.getLogEntry(commitIndex).term();
        RaftGroupMembers members = this.state.committedGroupMembers();
        SnapshotEntry snapshotEntry = new SnapshotEntry(snapshotTerm, commitIndex, snapshot, members.index(), members.members());
        long highestLogIndexToTruncate = commitIndex - (long)this.maxNumberOfLogsToKeepAfterSnapshot;
        LeaderState leaderState = this.state.leaderState();
        if (leaderState != null && (allMatchIndicesKnown = Arrays.stream(matchIndices = leaderState.matchIndices(), 0, matchIndices.length - 1).noneMatch(i -> i == 0L))) {
            highestLogIndexToTruncate = Arrays.stream(matchIndices).filter(i -> i < commitIndex).filter(i -> i > commitIndex - (long)this.maxNumberOfLogsToKeepAfterSnapshot).map(i -> i - 1L).sorted().findFirst().orElse(commitIndex);
        }
        int truncatedEntryCount = log.setSnapshot(snapshotEntry, highestLogIndexToTruncate);
        if (this.logger.isFineEnabled()) {
            this.logger.fine(snapshotEntry + " is taken, " + truncatedEntryCount + " entries are truncated.");
        }
    }

    public boolean installSnapshot(SnapshotEntry snapshot) {
        long commitIndex = this.state.commitIndex();
        if (commitIndex > snapshot.index()) {
            this.logger.info("Ignored stale " + snapshot + ", commit index at: " + commitIndex);
            return false;
        }
        if (commitIndex == snapshot.index()) {
            this.logger.info("Ignored " + snapshot + " since commit index is same.");
            return true;
        }
        this.state.commitIndex(snapshot.index());
        RaftLog raftLog = this.state.log();
        int truncated = raftLog.setSnapshot(snapshot);
        raftLog.flush();
        if (truncated > 0) {
            this.logger.info(truncated + " entries are truncated to install " + snapshot);
        }
        this.raftIntegration.restoreSnapshot(snapshot.operation(), snapshot.index());
        this.setStatus(RaftNodeStatus.ACTIVE);
        this.state.restoreGroupMembers(snapshot.groupMembersLogIndex(), snapshot.groupMembers());
        this.printMemberState();
        this.state.lastApplied(snapshot.index());
        this.invalidateFuturesUntil(snapshot.index(), new StaleAppendRequestException(this.state.leader()));
        this.logger.info(snapshot + " is installed.");
        return true;
    }

    private void initRestoredState() {
        SnapshotEntry snapshot = this.state.log().snapshot();
        if (SnapshotEntry.isNonInitial(snapshot)) {
            this.printMemberState();
            this.raftIntegration.restoreSnapshot(snapshot.operation(), snapshot.index());
            if (this.logger.isFineEnabled()) {
                this.logger.info(snapshot + " is restored.");
            } else {
                this.logger.info("Snapshot is restored at commitIndex=" + snapshot.index());
            }
        }
        this.applyRestoredRaftGroupCommands(snapshot);
    }

    private void applyRestoredRaftGroupCommands(SnapshotEntry snapshot) {
        long i;
        RaftLog log = this.state.log();
        LogEntry committedEntry = null;
        LogEntry lastAppliedEntry = null;
        long l = i = snapshot != null ? snapshot.index() + 1L : 1L;
        while (i <= log.lastLogOrSnapshotIndex()) {
            LogEntry entry = log.getLogEntry(i);
            assert (entry != null) : "index: " + i;
            if (entry.operation() instanceof RaftGroupCmd) {
                committedEntry = lastAppliedEntry;
                lastAppliedEntry = entry;
            }
            ++i;
        }
        if (committedEntry != null) {
            this.state.commitIndex(committedEntry.index());
            this.applyLogEntries();
        }
        if (lastAppliedEntry != null) {
            if (lastAppliedEntry.operation() instanceof UpdateRaftGroupMembersCmd) {
                this.setStatus(RaftNodeStatus.UPDATING_GROUP_MEMBER_LIST);
                Collection<RaftEndpoint> members = ((UpdateRaftGroupMembersCmd)lastAppliedEntry.operation()).getMembers();
                this.updateGroupMembers(lastAppliedEntry.index(), members);
            } else if (lastAppliedEntry.operation() instanceof DestroyRaftGroupCmd) {
                this.setStatus(RaftNodeStatus.TERMINATING);
            } else {
                throw new IllegalStateException("Invalid group command for restore: " + lastAppliedEntry);
            }
        }
    }

    public void printMemberState() {
        CPGroupId groupId = this.state.groupId();
        StringBuilder sb = new StringBuilder("\n\nCP Group Members {").append("groupId: ").append(groupId.getName()).append("(").append(groupId.getId()).append(")").append(", size:").append(this.state.memberCount()).append(", term:").append(this.state.term()).append(", logIndex:").append(this.state.membersLogIndex()).append("} [");
        for (RaftEndpoint member : this.state.members()) {
            CPMember cpMember = this.raftIntegration.getCPMember(member);
            sb.append("\n\t").append(cpMember != null ? cpMember : member);
            if (this.state.localEndpoint().equals(member)) {
                sb.append(" - ").append((Object)this.state.role()).append(" this");
                continue;
            }
            if (!member.equals(this.state.leader())) continue;
            sb.append(" - ").append((Object)RaftRole.LEADER);
        }
        sb.append("\n]\n");
        this.logger.info(sb.toString());
    }

    public void updateGroupMembers(long logIndex, Collection<RaftEndpoint> members) {
        this.state.updateGroupMembers(logIndex, members);
        this.printMemberState();
    }

    public void resetGroupMembers() {
        this.state.resetGroupMembers();
        this.printMemberState();
    }

    private void scheduleAppendAckResetTask() {
        if (this.appendRequestBackoffResetTaskScheduled) {
            return;
        }
        this.appendRequestBackoffResetTaskScheduled = true;
        this.schedule(this.appendRequestBackoffResetTask, this.appendRequestBackoffTimeoutInMillis);
    }

    private void submitFlushTask() {
        if (this.flushTaskSubmitted) {
            return;
        }
        this.flushTaskSubmitted = true;
        this.raftIntegration.submit(this.flushTask);
    }

    public void toFollower(int term) {
        LeaderState leaderState = this.state.leaderState();
        if (leaderState != null) {
            for (BiTuple<Object, InternalCompletableFuture> t : leaderState.queryState().operations()) {
                ((InternalCompletableFuture)t.element2).completeExceptionally(new LeaderDemotedException(this.state.localEndpoint(), null));
            }
        }
        this.state.toFollower(term);
        this.printMemberState();
    }

    public void leader(RaftEndpoint endpoint) {
        this.state.leader(endpoint);
        this.printMemberState();
    }

    public void toLeader() {
        this.state.toLeader();
        this.appendEntryAfterLeaderElection();
        this.printMemberState();
        this.broadcastAppendRequest();
        this.scheduleHeartbeat();
    }

    private long findQuorumMatchIndex() {
        LeaderState leaderState = this.state.leaderState();
        long[] indices = leaderState.matchIndices();
        if (this.state.isKnownMember(this.state.localEndpoint())) {
            long leaderIndex;
            indices[indices.length - 1] = leaderIndex = this.flushTask == null ? this.state.log().lastLogOrSnapshotIndex() : leaderState.flushedLogIndex();
        } else {
            indices = Arrays.copyOf(indices, indices.length - 1);
        }
        Arrays.sort(indices);
        long quorumMatchIndex = indices[(indices.length - 1) / 2];
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Quorum match index: " + quorumMatchIndex + ", indices: " + Arrays.toString(indices));
        }
        return quorumMatchIndex;
    }

    public boolean tryAdvanceCommitIndex() {
        long commitIndex = this.state.commitIndex();
        RaftLog raftLog = this.state.log();
        for (long quorumMatchIndex = this.findQuorumMatchIndex(); quorumMatchIndex > commitIndex; --quorumMatchIndex) {
            LogEntry entry = raftLog.getLogEntry(quorumMatchIndex);
            if (entry.term() == this.state.term()) {
                this.commitEntries(quorumMatchIndex);
                return true;
            }
            if (!this.logger.isFineEnabled()) continue;
            this.logger.fine("Cannot commit " + entry + " since an entry from the current term: " + this.state.term() + " is needed.");
        }
        return false;
    }

    private void commitEntries(long commitIndex) {
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Setting commit index: " + commitIndex);
        }
        this.state.commitIndex(commitIndex);
        if (this.status == RaftNodeStatus.ACTIVE) {
            this.applyLogEntries();
            this.tryRunQueries();
        } else {
            this.tryRunQueries();
            this.applyLogEntries();
        }
        this.broadcastAppendRequest();
    }

    public boolean tryRunQueries() {
        QueryState queryState = this.state.leaderState().queryState();
        if (queryState.queryCount() == 0) {
            return false;
        }
        long commitIndex = this.state.commitIndex();
        if (!queryState.isMajorityAcked(commitIndex, this.state.majority())) {
            return true;
        }
        Collection<BiTuple<Object, InternalCompletableFuture>> operations = queryState.operations();
        if (this.logger.isFineEnabled()) {
            this.logger.fine("Running " + operations.size() + " queries at commit index: " + commitIndex + ", query round: " + queryState.queryRound());
        }
        for (BiTuple<Object, InternalCompletableFuture> t : operations) {
            this.runQuery(t.element1, (InternalCompletableFuture)t.element2);
        }
        queryState.reset();
        return false;
    }

    private void appendEntryAfterLeaderElection() {
        Object entry = this.raftIntegration.getAppendedEntryOnLeaderElection();
        if (entry != null) {
            RaftLog log = this.state.log();
            log.appendEntries(new LogEntry(this.state.term(), log.lastLogOrSnapshotIndex() + 1L, entry));
        }
    }

    public boolean isHeartbeatTimedOut(long timestamp) {
        long missedHeartbeatThreshold = (long)this.maxMissedLeaderHeartbeatCount * this.heartbeatPeriodInMillis;
        return timestamp + missedHeartbeatThreshold < Clock.currentTimeMillis();
    }

    public boolean isHeartbeatTimedOut() {
        return this.isHeartbeatTimedOut(this.lastAppendEntriesTimestamp);
    }

    public boolean isLeaderAvailable() {
        RaftEndpoint leader = this.state.leader();
        return leader != null && this.raftIntegration.isReachable(leader) && !this.isHeartbeatTimedOut();
    }

    public String toString() {
        return "RaftNode{groupId=" + this.groupId + ", status=" + this.status + ", localEndpoint=" + this.state.localEndpoint() + "}";
    }

    private class AppendRequestBackoffResetTask
    extends RaftNodeStatusAwareTask {
        AppendRequestBackoffResetTask() {
            super(RaftNodeImpl.this);
        }

        @Override
        protected void innerRun() {
            RaftNodeImpl.this.appendRequestBackoffResetTaskScheduled = false;
            LeaderState leaderState = RaftNodeImpl.this.state.leaderState();
            if (leaderState == null) {
                return;
            }
            Map<RaftEndpoint, FollowerState> followerStates = leaderState.getFollowerStates();
            for (Map.Entry<RaftEndpoint, FollowerState> entry : followerStates.entrySet()) {
                FollowerState followerState = entry.getValue();
                if (!followerState.isAppendRequestBackoffSet()) continue;
                if (followerState.completeAppendRequestBackoffRound()) {
                    RaftNodeImpl.this.sendAppendRequest(entry.getKey());
                    continue;
                }
                RaftNodeImpl.this.scheduleAppendAckResetTask();
            }
        }
    }

    private class FlushTask
    extends RaftNodeStatusAwareTask {
        FlushTask() {
            super(RaftNodeImpl.this);
        }

        @Override
        protected void innerRun() {
            RaftNodeImpl.this.flushTaskSubmitted = false;
            RaftLog log = RaftNodeImpl.this.state.log();
            log.flush();
            LeaderState leaderState = RaftNodeImpl.this.state.leaderState();
            if (leaderState != null) {
                leaderState.flushedLogIndex(log.lastLogOrSnapshotIndex());
                RaftNodeImpl.this.tryAdvanceCommitIndex();
            }
        }
    }

    private class LeaderFailureDetectionTask
    extends RaftNodeStatusAwareTask {
        LeaderFailureDetectionTask() {
            super(RaftNodeImpl.this);
        }

        @Override
        protected void innerRun() {
            try {
                if (RaftNodeImpl.this.state.role() == RaftRole.LEADER) {
                    return;
                }
                RaftEndpoint leader = RaftNodeImpl.this.state.leader();
                if (leader == null) {
                    if (RaftNodeImpl.this.state.role() == RaftRole.FOLLOWER) {
                        this.logger.warning("We are FOLLOWER and there is no current leader. Will start new election round...");
                        this.runPreVoteTask();
                    }
                } else if (!RaftNodeImpl.this.raftIntegration.isReachable(leader)) {
                    this.logger.warning("Current leader " + leader + " is not reachable. Will start new election round...");
                    this.resetLeaderAndStartElection();
                } else if (RaftNodeImpl.this.isHeartbeatTimedOut()) {
                    this.logger.warning("Current leader " + leader + "'s heartbeats are timed-out. Will start new election round...");
                    this.resetLeaderAndStartElection();
                } else if (!RaftNodeImpl.this.state.committedGroupMembers().isKnownMember(leader)) {
                    this.logger.warning("Current leader " + leader + " is not member anymore. Will start new election round...");
                    this.resetLeaderAndStartElection();
                }
            }
            finally {
                RaftNodeImpl.this.scheduleLeaderFailureDetection();
            }
        }

        final void resetLeaderAndStartElection() {
            RaftNodeImpl.this.leader(null);
            this.runPreVoteTask();
        }

        private void runPreVoteTask() {
            if (RaftNodeImpl.this.state.preCandidateState() == null) {
                new PreVoteTask(RaftNodeImpl.this, RaftNodeImpl.this.state.term()).run();
            }
        }
    }

    private class HeartbeatTask
    extends RaftNodeStatusAwareTask {
        HeartbeatTask() {
            super(RaftNodeImpl.this);
        }

        @Override
        protected void innerRun() {
            if (RaftNodeImpl.this.state.role() == RaftRole.LEADER) {
                if (RaftNodeImpl.this.isHeartbeatTimedOut(RaftNodeImpl.this.state.leaderState().majorityAppendRequestAckTimestamp(RaftNodeImpl.this.state.majority()))) {
                    this.logger.warning("Demoting to " + RaftRole.FOLLOWER + " since not received acks from majority recently...");
                    RaftNodeImpl.this.toFollower(RaftNodeImpl.this.state.term());
                    RaftNodeImpl.this.invalidateFuturesUntil(RaftNodeImpl.this.state.log().lastLogOrSnapshotIndex(), new StaleAppendRequestException(null));
                    return;
                }
                if (RaftNodeImpl.this.lastAppendEntriesTimestamp < Clock.currentTimeMillis() - RaftNodeImpl.this.heartbeatPeriodInMillis) {
                    RaftNodeImpl.this.broadcastAppendRequest();
                }
                RaftNodeImpl.this.scheduleHeartbeat();
            }
        }
    }
}

