/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.SyncTask;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

@Component
@DependsOn(value={"ProtocolManager"})
public class DataSyncer {
    private final DataStore dataStore;
    private final GlobalConfig partitionConfig;
    private final Serializer serializer;
    private final DistroMapper distroMapper;
    private final ServerMemberManager memberManager;
    private Map<String, String> taskMap = new ConcurrentHashMap<String, String>(16);

    public DataSyncer(DataStore dataStore, GlobalConfig partitionConfig, Serializer serializer, DistroMapper distroMapper, ServerMemberManager memberManager) {
        this.dataStore = dataStore;
        this.partitionConfig = partitionConfig;
        this.serializer = serializer;
        this.distroMapper = distroMapper;
        this.memberManager = memberManager;
    }

    @PostConstruct
    public void init() {
        this.startTimedSync();
    }

    public void submit(SyncTask task, long delay) {
        if (task.getRetryCount() == 0) {
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                if (!StringUtils.isNotBlank((CharSequence)this.taskMap.putIfAbsent(this.buildKey(key, task.getTargetServer()), key))) continue;
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync already in process, key: {}", (Object)key);
                }
                iterator.remove();
            }
        }
        if (task.getKeys().isEmpty()) {
            return;
        }
        GlobalExecutor.submitDataSync(() -> {
            Map<String, Datum> datumMap;
            if (this.getServers() == null || this.getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }
            List<String> keys = task.getKeys();
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }
            if ((datumMap = this.dataStore.batchGet(keys)) == null || datumMap.isEmpty()) {
                for (String key : keys) {
                    this.taskMap.remove(this.buildKey(key, task.getTargetServer()));
                }
                return;
            }
            byte[] data = this.serializer.serialize(datumMap);
            long timestamp = System.currentTimeMillis();
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            if (!success) {
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                this.retrySync(syncTask);
            } else {
                for (String key : task.getKeys()) {
                    this.taskMap.remove(this.buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }

    private void retrySync(SyncTask syncTask) {
        Member member = new Member();
        member.setIp(syncTask.getTargetServer().split(":")[0]);
        member.setPort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!this.getServers().contains(member)) {
            if (syncTask.getKeys() != null) {
                for (String key : syncTask.getKeys()) {
                    this.taskMap.remove(this.buildKey(key, syncTask.getTargetServer()));
                }
            }
            return;
        }
        this.submit(syncTask, this.partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync() {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    public Collection<Member> getServers() {
        return this.memberManager.allMembers();
    }

    public String buildKey(String key, String targetServer) {
        return key + "@@@@" + targetServer;
    }

    public class TimedSync
    implements Runnable {
        @Override
        public void run() {
            try {
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", DataSyncer.this.getServers());
                }
                HashMap<String, String> keyChecksums = new HashMap<String, String>(64);
                for (String key : DataSyncer.this.dataStore.keys()) {
                    Datum datum;
                    if (!DataSyncer.this.distroMapper.responsible(KeyBuilder.getServiceName(key)) || (datum = DataSyncer.this.dataStore.get(key)) == null) continue;
                    keyChecksums.put(key, datum.value.getChecksum());
                }
                if (keyChecksums.isEmpty()) {
                    return;
                }
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                }
                for (Member member : DataSyncer.this.getServers()) {
                    if (NetUtils.localServer().equals(member.getAddress())) continue;
                    NamingProxy.syncCheckSums(keyChecksums, member.getAddress());
                }
            }
            catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", (Throwable)e);
            }
        }
    }
}

