/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.client.config.impl;

import com.alibaba.nacos.api.ability.ClientAbilities;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.ClientConfigMetricRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.impl.CacheData;
import com.alibaba.nacos.client.config.impl.ConfigTransportClient;
import com.alibaba.nacos.client.config.impl.Limiter;
import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor;
import com.alibaba.nacos.client.config.impl.ServerListManager;
import com.alibaba.nacos.client.config.impl.SpasAdapter;
import com.alibaba.nacos.client.config.utils.ContentUtils;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.utils.AppNameUtils;
import com.alibaba.nacos.client.utils.EnvUtil;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.TenantUtil;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.alibaba.nacos.shaded.com.google.gson.JsonObject;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

public class ClientWorker
implements Closeable {
    private static final Logger LOGGER = LogUtils.logger(ClientWorker.class);
    private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference(new HashMap());
    private final ConfigFilterChainManager configFilterChainManager;
    private boolean isHealthServer = true;
    private String uuid = UUID.randomUUID().toString();
    private long timeout;
    private ConfigTransportClient agent;
    private int taskPenaltyTime;
    private boolean enableRemoteSyncConfig = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addListeners(String dataId, String group, List<? extends Listener> listeners) {
        CacheData cache;
        group = this.null2defaultGroup(group);
        CacheData cacheData = cache = this.addCacheDataIfAbsent(dataId, group);
        synchronized (cacheData) {
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setSyncWithServer(false);
            this.agent.notifyListenConfig();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        CacheData cache;
        group = this.null2defaultGroup(group);
        String tenant = this.agent.getTenant();
        CacheData cacheData = cache = this.addCacheDataIfAbsent(dataId, group, tenant);
        synchronized (cacheData) {
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setSyncWithServer(false);
            this.agent.notifyListenConfig();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTenantListenersWithContent(String dataId, String group, String content, List<? extends Listener> listeners) throws NacosException {
        CacheData cache;
        group = this.null2defaultGroup(group);
        String tenant = this.agent.getTenant();
        CacheData cacheData = cache = this.addCacheDataIfAbsent(dataId, group, tenant);
        synchronized (cacheData) {
            cache.setContent(content);
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setSyncWithServer(false);
            this.agent.notifyListenConfig();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeListener(String dataId, String group, Listener listener) {
        CacheData cache = this.getCache(dataId, group = this.null2defaultGroup(group));
        if (null != cache) {
            CacheData cacheData = cache;
            synchronized (cacheData) {
                cache.removeListener(listener);
                if (cache.getListeners().isEmpty()) {
                    cache.setSyncWithServer(false);
                    this.agent.removeCache(dataId, group);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTenantListener(String dataId, String group, Listener listener) {
        String tenant;
        CacheData cache = this.getCache(dataId, group = this.null2defaultGroup(group), tenant = this.agent.getTenant());
        if (null != cache) {
            CacheData cacheData = cache;
            synchronized (cacheData) {
                cache.removeListener(listener);
                if (cache.getListeners().isEmpty()) {
                    cache.setSyncWithServer(false);
                    this.agent.removeCache(dataId, group);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeCache(String dataId, String group) {
        String groupKey = GroupKey.getKey(dataId, group);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.remove(groupKey);
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [unsubscribe] {}", (Object)this.agent.getName(), (Object)groupKey);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeCache(String dataId, String group, String tenant) {
        String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.remove(groupKey);
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [unsubscribe] {}", (Object)this.agent.getName(), (Object)groupKey);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
    }

    public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException {
        return this.agent.removeConfig(dataId, group, tenant, tag);
    }

    public boolean publishConfig(String dataId, String group, String tenant, String appName, String tag, String betaIps, String content, String casMd5) throws NacosException {
        return this.agent.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, casMd5);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheData addCacheDataIfAbsent(String dataId, String group) {
        CacheData cache = this.getCache(dataId, group);
        if (null != cache) {
            return cache;
        }
        String key = GroupKey.getKey(dataId, group);
        cache = new CacheData(this.configFilterChainManager, this.agent.getName(), dataId, group);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            CacheData cacheFromMap = this.getCache(dataId, group);
            if (null != cacheFromMap) {
                cache = cacheFromMap;
                cache.setInitializing(true);
            } else {
                int taskId = this.cacheMap.get().size() / (int)ParamUtil.getPerTaskConfigSize();
                cache.setTaskId(taskId);
            }
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.put(key, cache);
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [subscribe] {}", (Object)this.agent.getName(), (Object)key);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
        return cache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
        CacheData cache = this.getCache(dataId, group, tenant);
        if (null != cache) {
            return cache;
        }
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            CacheData cacheFromMap = this.getCache(dataId, group, tenant);
            if (null != cacheFromMap) {
                cache = cacheFromMap;
                cache.setInitializing(true);
            } else {
                cache = new CacheData(this.configFilterChainManager, this.agent.getName(), dataId, group, tenant);
                int taskId = this.cacheMap.get().size() / (int)ParamUtil.getPerTaskConfigSize();
                cache.setTaskId(taskId);
                if (this.enableRemoteSyncConfig) {
                    String[] ct = this.getServerConfig(dataId, group, tenant, 3000L, false);
                    cache.setContent(ct[0]);
                }
            }
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.put(key, cache);
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [subscribe] {}", (Object)this.agent.getName(), (Object)key);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
        return cache;
    }

    public CacheData getCache(String dataId, String group) {
        return this.getCache(dataId, group, TenantUtil.getUserTenantForAcm());
    }

    public CacheData getCache(String dataId, String group, String tenant) {
        if (null == dataId || null == group) {
            throw new IllegalArgumentException();
        }
        return this.cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
    }

    public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify) throws NacosException {
        if (StringUtils.isBlank((String)group)) {
            group = "DEFAULT_GROUP";
        }
        return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
    }

    private void checkLocalConfig(String agentName, CacheData cacheData) {
        String dataId = cacheData.dataId;
        String group = cacheData.group;
        String tenant = cacheData.tenant;
        File path = LocalConfigInfoProcessor.getFailoverFile(agentName, dataId, group, tenant);
        if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
            String content = LocalConfigInfoProcessor.getFailover(agentName, dataId, group, tenant);
            String md5 = MD5Utils.md5Hex((String)content, (String)"UTF-8");
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", new Object[]{agentName, dataId, group, tenant, md5, ContentUtils.truncateContent(content)});
            return;
        }
        if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
            cacheData.setUseLocalConfigInfo(false);
            LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", new Object[]{agentName, dataId, group, tenant});
            return;
        }
        if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
            String content = LocalConfigInfoProcessor.getFailover(agentName, dataId, group, tenant);
            String md5 = MD5Utils.md5Hex((String)content, (String)"UTF-8");
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", new Object[]{agentName, dataId, group, tenant, md5, ContentUtils.truncateContent(content)});
        }
    }

    private String null2defaultGroup(String group) {
        return null == group ? "DEFAULT_GROUP" : group.trim();
    }

    public ClientWorker(ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, Properties properties) throws NacosException {
        this.configFilterChainManager = configFilterChainManager;
        this.init(properties);
        this.agent = new ConfigRpcTransportClient(properties, serverListManager);
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker");
                t.setDaemon(true);
                return t;
            }
        });
        this.agent.setExecutor(executorService);
        this.agent.start();
    }

    private void refreshContentAndCheck(String groupKey, boolean notify) {
        if (this.cacheMap.get() != null && this.cacheMap.get().containsKey(groupKey)) {
            CacheData cache = this.cacheMap.get().get(groupKey);
            this.refreshContentAndCheck(cache, notify);
        }
    }

    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
        try {
            String[] ct = this.getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
            cacheData.setContent(ct[0]);
            if (null != ct[1]) {
                cacheData.setType(ct[1]);
            }
            if (notify) {
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", new Object[]{this.agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]});
            }
            cacheData.checkListenerMd5();
        }
        catch (Exception e) {
            LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", new Object[]{cacheData.dataId, cacheData.group, cacheData.tenant, e});
        }
    }

    private void init(Properties properties) {
        this.timeout = Math.max(ConvertUtils.toInt((String)properties.getProperty("configLongPollTimeout"), (int)30000), 10000);
        this.taskPenaltyTime = ConvertUtils.toInt((String)properties.getProperty("configRetryTime"), (int)2000);
        this.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty("enableRemoteSyncConfig"));
    }

    private Map<String, Object> getMetrics(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
        HashMap<String, Object> metric = new HashMap<String, Object>(16);
        metric.put("listenConfigSize", String.valueOf(this.cacheMap.get().size()));
        metric.put("clientVersion", VersionUtils.getFullClientVersion());
        metric.put("snapshotDir", LocalConfigInfoProcessor.LOCAL_SNAPSHOT_PATH);
        boolean isFixServer = this.agent.serverListManager.isFixed;
        metric.put("isFixedServer", isFixServer);
        metric.put("addressUrl", this.agent.serverListManager.addressServerUrl);
        metric.put("serverUrls", this.agent.serverListManager.getUrlString());
        Map<ClientConfigMetricRequest.MetricsKey, Object> metricValues = this.getMetricsValue(metricsKeys);
        metric.put("metricValues", metricValues);
        HashMap<String, Object> metrics = new HashMap<String, Object>(1);
        metrics.put(this.uuid, JacksonUtils.toJson(metric));
        return metrics;
    }

    private Map<ClientConfigMetricRequest.MetricsKey, Object> getMetricsValue(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
        if (metricsKeys == null) {
            return null;
        }
        HashMap<ClientConfigMetricRequest.MetricsKey, Object> values = new HashMap<ClientConfigMetricRequest.MetricsKey, Object>(16);
        for (ClientConfigMetricRequest.MetricsKey metricsKey : metricsKeys) {
            if ("cacheData".equals(metricsKey.getType())) {
                CacheData cacheData = this.cacheMap.get().get(metricsKey.getKey());
                values.putIfAbsent(metricsKey, cacheData == null ? null : cacheData.getContent() + ":" + cacheData.getMd5());
            }
            if (!"snapshotData".equals(metricsKey.getType())) continue;
            String[] configStr = GroupKey.parseKey(metricsKey.getKey());
            String snapshot = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), configStr[0], configStr[1], configStr[2]);
            values.putIfAbsent(metricsKey, snapshot == null ? null : snapshot + ":" + MD5Utils.md5Hex((String)snapshot, (String)"UTF-8"));
        }
        return values;
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LOGGER.info("{} do shutdown begin", (Object)className);
        ThreadUtils.shutdownThreadPool((ExecutorService)this.agent.executor, (Logger)LOGGER);
        LOGGER.info("{} do shutdown stop", (Object)className);
    }

    public boolean isHealthServer() {
        return this.isHealthServer;
    }

    private void setHealthServer(boolean isHealthServer) {
        this.isHealthServer = isHealthServer;
    }

    public String getAgentName() {
        return this.agent.getName();
    }

    public class ConfigRpcTransportClient
    extends ConfigTransportClient {
        private final BlockingQueue<Object> listenExecutebell;
        private Object bellItem;
        private long lastAllSyncTime;
        private static final long ALL_SYNC_INTERNAL = 300000L;

        public ConfigRpcTransportClient(Properties properties, ServerListManager serverListManager) {
            super(properties, serverListManager);
            this.listenExecutebell = new ArrayBlockingQueue<Object>(1);
            this.bellItem = new Object();
            this.lastAllSyncTime = System.currentTimeMillis();
        }

        private ConnectionType getConnectionType() {
            return ConnectionType.GRPC;
        }

        private Map<String, String> getLabels() {
            HashMap<String, String> labels = new HashMap<String, String>(2, 1.0f);
            labels.put("source", "sdk");
            labels.put("module", "config");
            labels.put("AppName", AppNameUtils.getAppName());
            labels.put("Vipserver-Tag", EnvUtil.getSelfVipserverTag());
            labels.put("Amory-Tag", EnvUtil.getSelfAmorayTag());
            labels.put("Location-Tag", EnvUtil.getSelfLocationTag());
            return labels;
        }

        private void initRpcClientHandler(final RpcClient rpcClientInner) {
            rpcClientInner.registerServerRequestHandler(request -> {
                if (request instanceof ConfigChangeNotifyRequest) {
                    ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest)request;
                    LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}", new Object[]{rpcClientInner.getName(), configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant()});
                    String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
                    CacheData cacheData = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(groupKey);
                    if (cacheData != null) {
                        cacheData.setSyncWithServer(false);
                        this.notifyListenConfig();
                    }
                    return new ConfigChangeNotifyResponse();
                }
                return null;
            });
            rpcClientInner.registerServerRequestHandler(request -> {
                if (request instanceof ClientConfigMetricRequest) {
                    ClientConfigMetricResponse response = new ClientConfigMetricResponse();
                    response.setMetrics(ClientWorker.this.getMetrics(((ClientConfigMetricRequest)request).getMetricsKeys()));
                    return response;
                }
                return null;
            });
            rpcClientInner.registerConnectionListener(new ConnectionEventListener(){

                public void onConnected() {
                    LOGGER.info("[{}] Connected,notify listen context...", (Object)rpcClientInner.getName());
                    ConfigRpcTransportClient.this.notifyListenConfig();
                }

                public void onDisConnect() {
                    String taskId = (String)rpcClientInner.getLabels().get("taskId");
                    LOGGER.info("[{}] DisConnected,clear listen context...", (Object)rpcClientInner.getName());
                    Collection values = ((Map)ClientWorker.this.cacheMap.get()).values();
                    for (CacheData cacheData : values) {
                        if (StringUtils.isNotBlank((String)taskId)) {
                            if (!Integer.valueOf(taskId).equals(cacheData.getTaskId())) continue;
                            cacheData.setSyncWithServer(false);
                            continue;
                        }
                        cacheData.setSyncWithServer(false);
                    }
                }
            });
            rpcClientInner.serverListFactory(new ServerListFactory(){

                public String genNextServer() {
                    return ConfigRpcTransportClient.this.serverListManager.getNextServerAddr();
                }

                public String getCurrentServer() {
                    return ConfigRpcTransportClient.this.serverListManager.getCurrentServerAddr();
                }

                public List<String> getServerList() {
                    return ((ConfigRpcTransportClient)ConfigRpcTransportClient.this).serverListManager.serverUrls;
                }
            });
        }

        @Override
        public void startInternal() throws NacosException {
            this.executor.schedule(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            while (true) {
                                ConfigRpcTransportClient.this.listenExecutebell.poll(5L, TimeUnit.SECONDS);
                                ConfigRpcTransportClient.this.executeConfigListen();
                            }
                        }
                        catch (Exception e) {
                            LOGGER.error("[ rpc listen execute ] [rpc listen] exception", (Throwable)e);
                            continue;
                        }
                        break;
                    }
                }
            }, 0L, TimeUnit.MILLISECONDS);
        }

        @Override
        public String getName() {
            return "config_rpc_client";
        }

        @Override
        public void notifyListenConfig() {
            this.listenExecutebell.offer(this.bellItem);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void executeConfigListen() {
            RpcClient rpcClient;
            ConfigBatchListenRequest configChangeListenRequest;
            String taskId;
            HashMap<String, LinkedList<CacheData>> listenCachesMap = new HashMap<String, LinkedList<CacheData>>(16);
            HashMap<String, LinkedList<CacheData>> removeListenCachesMap = new HashMap<String, LinkedList<CacheData>>(16);
            long now = System.currentTimeMillis();
            boolean needAllSync = now - this.lastAllSyncTime >= 300000L;
            Iterator iterator = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
            while (iterator.hasNext()) {
                CacheData cache;
                CacheData cacheData = cache = (CacheData)iterator.next();
                synchronized (cacheData) {
                    List<CacheData> cacheDatas;
                    if (cache.isSyncWithServer()) {
                        cache.checkListenerMd5();
                        if (!needAllSync) {
                            continue;
                        }
                    }
                    if (!CollectionUtils.isEmpty(cache.getListeners())) {
                        if (!cache.isUseLocalConfigInfo()) {
                            cacheDatas = (LinkedList<CacheData>)listenCachesMap.get(String.valueOf(cache.getTaskId()));
                            if (cacheDatas == null) {
                                cacheDatas = new LinkedList<CacheData>();
                                listenCachesMap.put(String.valueOf(cache.getTaskId()), (LinkedList<CacheData>)cacheDatas);
                            }
                            cacheDatas.add(cache);
                        }
                    } else if (CollectionUtils.isEmpty(cache.getListeners()) && !cache.isUseLocalConfigInfo()) {
                        cacheDatas = (List)removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
                        if (cacheDatas == null) {
                            cacheDatas = new LinkedList();
                            removeListenCachesMap.put(String.valueOf(cache.getTaskId()), (LinkedList<CacheData>)cacheDatas);
                        }
                        cacheDatas.add(cache);
                    }
                }
            }
            boolean hasChangedKeys = false;
            if (!listenCachesMap.isEmpty()) {
                for (Map.Entry entry : listenCachesMap.entrySet()) {
                    taskId = (String)entry.getKey();
                    List listenCaches = (List)entry.getValue();
                    configChangeListenRequest = this.buildConfigRequest(listenCaches);
                    configChangeListenRequest.setListen(true);
                    try {
                        rpcClient = this.ensureRpcClient(taskId);
                        ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse)this.requestProxy(rpcClient, (Request)configChangeListenRequest);
                        if (configChangeBatchListenResponse == null || !configChangeBatchListenResponse.isSuccess()) continue;
                        HashSet<String> changeKeys = new HashSet<String>();
                        if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
                            hasChangedKeys = true;
                            for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {
                                String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
                                changeKeys.add(changeKey);
                                boolean isInitializing = ((CacheData)((Map)ClientWorker.this.cacheMap.get()).get(changeKey)).isInitializing();
                                ClientWorker.this.refreshContentAndCheck(changeKey, !isInitializing);
                            }
                        }
                        for (CacheData cacheData : listenCaches) {
                            String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
                            if (!changeKeys.contains(groupKey)) {
                                CacheData cacheData2 = cacheData;
                                synchronized (cacheData2) {
                                    if (!cacheData.getListeners().isEmpty()) {
                                        cacheData.setSyncWithServer(true);
                                        continue;
                                    }
                                }
                            }
                            cacheData.setInitializing(false);
                        }
                    }
                    catch (Exception e) {
                        LOGGER.error("Async listen config change error ", (Throwable)e);
                        try {
                            Thread.sleep(50L);
                        }
                        catch (InterruptedException configChangeBatchListenResponse) {}
                    }
                }
            }
            if (!removeListenCachesMap.isEmpty()) {
                for (Map.Entry entry : removeListenCachesMap.entrySet()) {
                    taskId = (String)entry.getKey();
                    List removeListenCaches = (List)entry.getValue();
                    configChangeListenRequest = this.buildConfigRequest(removeListenCaches);
                    configChangeListenRequest.setListen(false);
                    try {
                        rpcClient = this.ensureRpcClient(taskId);
                        boolean removeSuccess = this.unListenConfigChange(rpcClient, configChangeListenRequest);
                        if (removeSuccess) {
                            Iterator iterator2 = removeListenCaches.iterator();
                            while (iterator2.hasNext()) {
                                CacheData cacheData;
                                CacheData cacheData3 = cacheData = (CacheData)iterator2.next();
                                synchronized (cacheData3) {
                                    if (cacheData.getListeners().isEmpty()) {
                                        ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
                                    }
                                }
                            }
                        }
                    }
                    catch (Exception e) {
                        LOGGER.error("async remove listen config change error ", (Throwable)e);
                    }
                    try {
                        Thread.sleep(50L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
            if (needAllSync) {
                this.lastAllSyncTime = now;
            }
            if (hasChangedKeys) {
                this.notifyListenConfig();
            }
        }

        private synchronized RpcClient ensureRpcClient(String taskId) throws NacosException {
            Map<String, String> labels = this.getLabels();
            HashMap<String, String> newLabels = new HashMap<String, String>(labels);
            newLabels.put("taskId", taskId);
            RpcClient rpcClient = RpcClientFactory.createClient((String)("config-" + taskId + "-" + ClientWorker.this.uuid), (ConnectionType)this.getConnectionType(), newLabels);
            if (rpcClient.isWaitInitiated()) {
                this.initRpcClientHandler(rpcClient);
                rpcClient.setTenant(this.getTenant());
                rpcClient.clientAbilities(this.initAbilities());
                rpcClient.start();
            }
            return rpcClient;
        }

        private ClientAbilities initAbilities() {
            ClientAbilities clientAbilities = new ClientAbilities();
            clientAbilities.getRemoteAbility().setSupportRemoteConnection(true);
            clientAbilities.getConfigAbility().setSupportRemoteMetrics(true);
            return clientAbilities;
        }

        private ConfigBatchListenRequest buildConfigRequest(List<CacheData> caches) {
            ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest();
            for (CacheData cacheData : caches) {
                configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant, cacheData.getMd5());
            }
            return configChangeListenRequest;
        }

        @Override
        public void removeCache(String dataId, String group) {
            this.notifyListenConfig();
        }

        private boolean unListenConfigChange(RpcClient rpcClient, ConfigBatchListenRequest configChangeListenRequest) throws NacosException {
            ConfigChangeBatchListenResponse response = (ConfigChangeBatchListenResponse)this.requestProxy(rpcClient, (Request)configChangeListenRequest);
            return response.isSuccess();
        }

        @Override
        public String[] queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify) throws NacosException {
            ConfigQueryRequest request = ConfigQueryRequest.build((String)dataId, (String)group, (String)tenant);
            request.putHeader("notify", String.valueOf(notify));
            ConfigQueryResponse response = (ConfigQueryResponse)this.requestProxy(this.getOneRunningClient(), (Request)request, readTimeouts);
            String[] ct = new String[2];
            if (response.isSuccess()) {
                LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
                ct[0] = response.getContent();
                ct[1] = StringUtils.isNotBlank((String)response.getContentType()) ? response.getContentType() : ConfigType.TEXT.getType();
                return ct;
            }
            if (response.getErrorCode() == 300) {
                LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);
                return ct;
            }
            if (response.getErrorCode() == 400) {
                LOGGER.error("[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}", new Object[]{this.getName(), dataId, group, tenant});
                throw new NacosException(409, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
            }
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", new Object[]{this.getName(), dataId, group, tenant, response});
            throw new NacosException(response.getErrorCode(), "http error, code=" + response.getErrorCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }

        private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException {
            return this.requestProxy(rpcClientInner, request, 3000L);
        }

        private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills) throws NacosException {
            try {
                request.putAllHeader(super.getSecurityHeaders());
                request.putAllHeader(super.getSpasHeaders());
                request.putAllHeader(super.getCommonHeader());
            }
            catch (Exception e) {
                throw new NacosException(-400, (Throwable)e);
            }
            Map<String, String> signHeaders = SpasAdapter.getSignHeaders(this.resourceBuild(request), this.secretKey);
            if (signHeaders != null && !signHeaders.isEmpty()) {
                request.putAllHeader(signHeaders);
            }
            JsonObject asJsonObjectTemp = new Gson().toJsonTree(request).getAsJsonObject();
            asJsonObjectTemp.remove("headers");
            asJsonObjectTemp.remove("requestId");
            boolean limit = Limiter.isLimit(request.getClass() + asJsonObjectTemp.toString());
            if (limit) {
                throw new NacosException(-503, "More than client-side current limit threshold");
            }
            return rpcClientInner.request(request, timeoutMills);
        }

        private String resourceBuild(Request request) {
            if (request instanceof ConfigQueryRequest) {
                String tenant = ((ConfigQueryRequest)request).getTenant();
                String group = ((ConfigQueryRequest)request).getGroup();
                return this.getResource(tenant, group);
            }
            if (request instanceof ConfigPublishRequest) {
                String tenant = ((ConfigPublishRequest)request).getTenant();
                String group = ((ConfigPublishRequest)request).getGroup();
                return this.getResource(tenant, group);
            }
            if (request instanceof ConfigRemoveRequest) {
                String tenant = ((ConfigRemoveRequest)request).getTenant();
                String group = ((ConfigRemoveRequest)request).getGroup();
                return this.getResource(tenant, group);
            }
            return "";
        }

        private String getResource(String tenant, String group) {
            if (StringUtils.isNotBlank((String)tenant) && StringUtils.isNotBlank((String)group)) {
                return tenant + "+" + group;
            }
            if (StringUtils.isNotBlank((String)group)) {
                return group;
            }
            if (StringUtils.isNotBlank((String)tenant)) {
                return tenant;
            }
            return "";
        }

        RpcClient getOneRunningClient() throws NacosException {
            return this.ensureRpcClient("0");
        }

        @Override
        public boolean publishConfig(String dataId, String group, String tenant, String appName, String tag, String betaIps, String content, String casMd5) throws NacosException {
            try {
                ConfigPublishRequest request = new ConfigPublishRequest(dataId, group, tenant, content);
                request.setCasMd5(casMd5);
                request.putAdditionalParam("tag", tag);
                request.putAdditionalParam("appName", appName);
                request.putAdditionalParam("betaIps", betaIps);
                ConfigPublishResponse response = (ConfigPublishResponse)this.requestProxy(this.getOneRunningClient(), (Request)request);
                if (!response.isSuccess()) {
                    LOGGER.warn("[{}] [publish-single] fail, dataId={}, group={}, tenant={}, code={}, msg={}", new Object[]{this.getName(), dataId, group, tenant, response.getErrorCode(), response.getMessage()});
                }
                return response.isSuccess();
            }
            catch (Exception e) {
                LOGGER.warn("[{}] [publish-single] error, dataId={}, group={}, tenant={}, code={}, msg={}", new Object[]{this.getName(), dataId, group, tenant, "unkonw", e.getMessage()});
                return false;
            }
        }

        @Override
        public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException {
            ConfigRemoveRequest request = new ConfigRemoveRequest(dataId, group, tenant, tag);
            ConfigRemoveResponse response = (ConfigRemoveResponse)this.requestProxy(this.getOneRunningClient(), (Request)request);
            return response.isSuccess();
        }
    }
}

