/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.config.server.remote;

import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.api.remote.AbstractPushCallBack;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.api.remote.request.ServerRequest;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.GroupKey;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RpcPushService;
import com.alibaba.nacos.core.remote.control.TpsMonitorManager;
import com.alibaba.nacos.core.remote.control.TpsMonitorPoint;
import com.alibaba.nacos.core.utils.Loggers;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component(value="rpcConfigChangeNotifier")
public class RpcConfigChangeNotifier
extends Subscriber<LocalDataChangeEvent> {
    private static final String POINT_CONFIG_PUSH = "CONFIG_PUSH_COUNT";
    private static final String POINT_CONFIG_PUSH_SUCCESS = "CONFIG_PUSH_SUCCESS";
    private static final String POINT_CONFIG_PUSH_FAIL = "CONFIG_PUSH_FAIL";
    @Autowired
    private TpsMonitorManager tpsMonitorManager;
    @Autowired
    ConfigChangeListenContext configChangeListenContext;
    @Autowired
    private RpcPushService rpcPushService;
    @Autowired
    private ConnectionManager connectionManager;

    public RpcConfigChangeNotifier() {
        NotifyCenter.registerSubscriber((Subscriber)this);
    }

    @PostConstruct
    private void registerTpsPoint() {
        this.tpsMonitorManager.registerTpsControlPoint(new TpsMonitorPoint(POINT_CONFIG_PUSH));
        this.tpsMonitorManager.registerTpsControlPoint(new TpsMonitorPoint(POINT_CONFIG_PUSH_SUCCESS));
        this.tpsMonitorManager.registerTpsControlPoint(new TpsMonitorPoint(POINT_CONFIG_PUSH_FAIL));
    }

    public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta, List<String> betaIps, String tag) {
        Set<String> listeners = this.configChangeListenContext.getListeners(groupKey);
        if (!CollectionUtils.isEmpty(listeners)) {
            int notifyClientCount = 0;
            for (String client : listeners) {
                Connection connection = this.connectionManager.getConnection(client);
                if (connection == null) continue;
                String clientIp = connection.getMetaInfo().getClientIp();
                String clientTag = connection.getMetaInfo().getTag();
                if (isBeta && betaIps != null && !betaIps.contains(clientIp) || StringUtils.isNotBlank((String)tag) && !tag.equals(clientTag)) continue;
                ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build((String)dataId, (String)group, (String)tenant);
                RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, connection.getMetaInfo().getAppName());
                this.push(rpcPushRetryTask);
                ++notifyClientCount;
            }
            Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", (Object)notifyClientCount, (Object)groupKey);
        }
    }

    public void onEvent(LocalDataChangeEvent event) {
        String groupKey = event.groupKey;
        boolean isBeta = event.isBeta;
        List<String> betaIps = event.betaIps;
        String[] strings = GroupKey.parseKey(groupKey);
        String dataId = strings[0];
        String group = strings[1];
        String tenant = strings.length > 2 ? strings[2] : "";
        String tag = event.tag;
        this.configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);
    }

    public Class<? extends Event> subscribeType() {
        return LocalDataChangeEvent.class;
    }

    private void push(RpcPushTask retryTask) {
        ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
        if (retryTask.isOverTimes()) {
            Loggers.REMOTE_PUSH.warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client.", new Object[]{notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(), retryTask.connectionId});
            this.connectionManager.unregister(retryTask.connectionId);
            return;
        }
        if (this.connectionManager.getConnection(retryTask.connectionId) != null) {
            ConfigExecutor.getClientConfigNotifierServiceExecutor().schedule(retryTask, (long)(retryTask.tryTimes * 2), TimeUnit.SECONDS);
        }
    }

    class RpcPushTask
    implements Runnable {
        ConfigChangeNotifyRequest notifyRequest;
        int maxRetryTimes = -1;
        int tryTimes = 0;
        String connectionId;
        String clientIp;
        String appName;

        public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId, String clientIp, String appName) {
            this.notifyRequest = notifyRequest;
            this.maxRetryTimes = maxRetryTimes;
            this.connectionId = connectionId;
            this.clientIp = clientIp;
            this.appName = appName;
        }

        public boolean isOverTimes() {
            return this.maxRetryTimes > 0 && this.tryTimes >= this.maxRetryTimes;
        }

        @Override
        public void run() {
            ++this.tryTimes;
            if (!RpcConfigChangeNotifier.this.tpsMonitorManager.applyTpsForClientIp(RpcConfigChangeNotifier.POINT_CONFIG_PUSH, this.connectionId, this.clientIp)) {
                RpcConfigChangeNotifier.this.push(this);
            } else {
                RpcConfigChangeNotifier.this.rpcPushService.pushWithCallback(this.connectionId, (ServerRequest)this.notifyRequest, (PushCallBack)new AbstractPushCallBack(3000L){

                    public void onSuccess() {
                        RpcConfigChangeNotifier.this.tpsMonitorManager.applyTpsForClientIp(RpcConfigChangeNotifier.POINT_CONFIG_PUSH_SUCCESS, RpcPushTask.this.connectionId, RpcPushTask.this.clientIp);
                    }

                    public void onFail(Throwable e) {
                        RpcConfigChangeNotifier.this.tpsMonitorManager.applyTpsForClientIp(RpcConfigChangeNotifier.POINT_CONFIG_PUSH_FAIL, RpcPushTask.this.connectionId, RpcPushTask.this.clientIp);
                        Loggers.REMOTE_PUSH.warn("Push fail", e);
                        RpcConfigChangeNotifier.this.push(RpcPushTask.this);
                    }
                }, (Executor)ConfigExecutor.getClientConfigNotifierServiceExecutor());
            }
        }
    }
}

