/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.service.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.service.ServerListService;
import com.alibaba.nacos.config.server.service.notify.NotifyTask;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.config.server.utils.RunningConfigUtils;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher;
import com.alibaba.nacos.core.utils.SystemUtils;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AsyncNotifyService
extends EventDispatcher.AbstractEventListener {
    private static final Executor EXECUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
    private RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(PropertyUtil.getNotifyConnectTimeout()).setSocketTimeout(PropertyUtil.getNotifySocketTimeout()).build();
    private CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setDefaultRequestConfig(this.requestConfig).build();
    private static final Logger log = LoggerFactory.getLogger(AsyncNotifyService.class);
    private ServerListService serverListService;
    private static int MIN_RETRY_INTERVAL = 500;
    private static int INCREASE_STEPS = 1000;
    private static int MAX_COUNT = 6;

    @Override
    public List<Class<? extends EventDispatcher.Event>> interest() {
        ArrayList<Class<? extends EventDispatcher.Event>> types = new ArrayList<Class<? extends EventDispatcher.Event>>();
        types.add(ConfigDataChangeEvent.class);
        return types;
    }

    @Override
    public void onEvent(EventDispatcher.Event event) {
        if (event instanceof ConfigDataChangeEvent) {
            ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
            long dumpTs = evt.lastModifiedTs;
            String dataId = evt.dataId;
            String group = evt.group;
            String tenant = evt.tenant;
            String tag = evt.tag;
            List<String> ipList = this.serverListService.getServerList();
            LinkedList<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
            for (int i = 0; i < ipList.size(); ++i) {
                queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, ipList.get(i), evt.isBeta));
            }
            EXECUTOR.execute(new AsyncTask(this.httpclient, queue));
        }
    }

    @Autowired
    public AsyncNotifyService(ServerListService serverListService) {
        this.serverListService = serverListService;
        this.httpclient.start();
    }

    public Executor getExecutor() {
        return EXECUTOR;
    }

    private void asyncTaskExecute(NotifySingleTask task) {
        int delay = AsyncNotifyService.getDelayTime(task);
        LinkedList<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
        queue.add(task);
        AsyncTask asyncTask = new AsyncTask(this.httpclient, queue);
        ((ScheduledThreadPoolExecutor)EXECUTOR).schedule(asyncTask, (long)delay, TimeUnit.MILLISECONDS);
    }

    private static int getDelayTime(NotifySingleTask task) {
        int failCount = task.getFailCount();
        int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
        if (failCount <= MAX_COUNT) {
            task.setFailCount(failCount + 1);
        }
        return delay;
    }

    static class NotifyThreadFactory
    implements ThreadFactory {
        NotifyThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "com.alibaba.nacos.AsyncNotifyServiceThread");
            thread.setDaemon(true);
            return thread;
        }
    }

    static class NotifySingleTask
    extends NotifyTask {
        private String target;
        public String url;
        private boolean isBeta;
        private static final String URL_PATTERN = "http://{0}{1}/v1/cs/communication/dataChange?dataId={2}&group={3}";
        private static final String URL_PATTERN_TENANT = "http://{0}{1}/v1/cs/communication/dataChange?dataId={2}&group={3}&tenant={4}";
        private int failCount;

        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target) {
            this(dataId, group, tenant, lastModified, target, false);
        }

        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target, boolean isBeta) {
            this(dataId, group, tenant, null, lastModified, target, isBeta);
        }

        public NotifySingleTask(String dataId, String group, String tenant, String tag, long lastModified, String target, boolean isBeta) {
            super(dataId, group, tenant, lastModified);
            this.target = target;
            this.isBeta = isBeta;
            try {
                dataId = URLEncoder.encode(dataId, "UTF-8");
                group = URLEncoder.encode(group, "UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                log.error("URLEncoder encode error", (Throwable)e);
            }
            this.url = StringUtils.isBlank((CharSequence)tenant) ? MessageFormat.format(URL_PATTERN, target, RunningConfigUtils.getContextPath(), dataId, group) : MessageFormat.format(URL_PATTERN_TENANT, target, RunningConfigUtils.getContextPath(), dataId, group, tenant);
            if (StringUtils.isNotEmpty((CharSequence)tag)) {
                this.url = this.url + "&tag=" + tag;
            }
            this.failCount = 0;
        }

        @Override
        public void setFailCount(int count) {
            this.failCount = count;
        }

        @Override
        public int getFailCount() {
            return this.failCount;
        }

        public String getTargetIP() {
            return this.target;
        }
    }

    class AsyncNotifyCallBack
    implements FutureCallback<HttpResponse> {
        private NotifySingleTask task;
        private CloseableHttpAsyncClient httpClient;

        public AsyncNotifyCallBack(CloseableHttpAsyncClient httpClient, NotifySingleTask task) {
            this.task = task;
            this.httpClient = httpClient;
        }

        public void completed(HttpResponse response) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            if (response.getStatusLine().getStatusCode() == 200) {
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), SystemUtils.LOCAL_IP, "ok", delayed, this.task.target);
            } else {
                log.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), response.getStatusLine().getStatusCode()});
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), SystemUtils.LOCAL_IP, "error", delayed, this.task.target);
                AsyncNotifyService.this.asyncTaskExecute(this.task);
                LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
                MetricsMonitor.getConfigNotifyException().increment();
            }
            HttpClientUtils.closeQuietly((HttpResponse)response);
        }

        public void failed(Exception ex) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            log.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), ex.toString()});
            ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), SystemUtils.LOCAL_IP, "exception", delayed, this.task.target);
            AsyncNotifyService.this.asyncTaskExecute(this.task);
            LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }

        public void cancelled() {
            LogUtil.notifyLog.error("[notify-exception] target:{} dataId:{} group:{} ts:{} method:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), "CANCELED"});
            AsyncNotifyService.this.asyncTaskExecute(this.task);
            LogUtil.notifyLog.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.target, this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }
    }

    class AsyncTask
    implements Runnable {
        private Queue<NotifySingleTask> queue;
        private CloseableHttpAsyncClient httpclient;

        public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
            this.httpclient = httpclient;
            this.queue = queue;
        }

        @Override
        public void run() {
            this.executeAsyncInvoke();
        }

        private void executeAsyncInvoke() {
            while (!this.queue.isEmpty()) {
                NotifySingleTask task = this.queue.poll();
                String targetIp = task.getTargetIP();
                if (!AsyncNotifyService.this.serverListService.getServerList().contains(targetIp)) continue;
                if (AsyncNotifyService.this.serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) {
                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), SystemUtils.LOCAL_IP, "unhealth", 0L, task.target);
                    AsyncNotifyService.this.asyncTaskExecute(task);
                    continue;
                }
                HttpGet request = new HttpGet(task.url);
                request.setHeader("lastModified", String.valueOf(task.getLastModified()));
                request.setHeader("opHandleIp", SystemUtils.LOCAL_IP);
                if (task.isBeta) {
                    request.setHeader("isBeta", "true");
                }
                this.httpclient.execute((HttpUriRequest)request, (FutureCallback)new AsyncNotifyCallBack(this.httpclient, task));
            }
        }
    }
}

