/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.message.scheduler;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.linkis.common.utils.JavaLog;
import org.apache.linkis.message.builder.DefaultServiceMethodContext;
import org.apache.linkis.message.builder.MessageJob;
import org.apache.linkis.message.builder.ServiceMethodContext;
import org.apache.linkis.message.exception.MessageWarnException;
import org.apache.linkis.message.parser.ImplicitMethod;
import org.apache.linkis.message.scheduler.MessageExecutor;
import org.apache.linkis.message.scheduler.MethodExecuteWrapper;
import org.apache.linkis.message.utils.MessageUtils;
import org.apache.linkis.protocol.message.RequestProtocol;
import org.apache.linkis.rpc.MessageErrorConstants;
import org.apache.linkis.scheduler.queue.Job;

public abstract class AbstractMessageExecutor
extends JavaLog
implements MessageExecutor {
    private Throwable t;

    private void methodErrorHandle(Throwable t) {
        if (t.getCause() != null) {
            this.t = t;
        } else {
            this.t = t;
            this.logger().debug("unexpected error occur");
        }
    }

    private List<MethodExecuteWrapper> getMinOrderMethodWrapper(Map<String, List<MethodExecuteWrapper>> methodWrappers) {
        ArrayList<MethodExecuteWrapper> minOrderMethodWrapper = new ArrayList<MethodExecuteWrapper>();
        methodWrappers.forEach((k, v) -> v.forEach(m -> {
            if (MessageUtils.orderIsMin(m, v)) {
                minOrderMethodWrapper.add((MethodExecuteWrapper)m);
            }
        }));
        return minOrderMethodWrapper;
    }

    private List<MethodExecuteWrapper> getMinOrderMethodWrapper(List<MethodExecuteWrapper> methodWrappers) {
        return methodWrappers.stream().filter(m -> MessageUtils.orderIsMin(m, methodWrappers)).collect(Collectors.toList());
    }

    private boolean shouldBreak(Map<String, List<MethodExecuteWrapper>> methodWrappers) {
        return methodWrappers.values().stream().allMatch(List::isEmpty);
    }

    private void cleanMethodContextThreadLocal(ServiceMethodContext methodContext) {
        if (methodContext instanceof DefaultServiceMethodContext) {
            ((DefaultServiceMethodContext)methodContext).removeJob();
            ((DefaultServiceMethodContext)methodContext).removeSkips();
        }
    }

    private void setMethodContextThreadLocal(ServiceMethodContext methodContext, MessageJob job) {
        if (methodContext instanceof DefaultServiceMethodContext && job instanceof Job) {
            ((DefaultServiceMethodContext)methodContext).setJob((Job)job);
        }
    }

    @Override
    public void run(MessageJob job) throws InterruptedException {
        Map<String, List<MethodExecuteWrapper>> methodWrappers = job.getMethodExecuteWrappers();
        Integer count = methodWrappers.values().stream().map(List::size).reduce(0, Integer::sum);
        if (count == 1) {
            this.runOneJob(job);
        } else {
            this.runMultipleJob(job);
        }
    }

    private void runMultipleJob(MessageJob job) throws InterruptedException {
        RequestProtocol requestProtocol = job.getRequestProtocol();
        ServiceMethodContext methodContext = job.getMethodContext();
        Map<String, List<MethodExecuteWrapper>> methodWrappers = job.getMethodExecuteWrappers();
        Integer count = methodWrappers.values().stream().map(List::size).reduce(0, Integer::sum);
        LinkedBlockingDeque queue = new LinkedBlockingDeque(16);
        CopyOnWriteArrayList<Future> methodFutures = new CopyOnWriteArrayList<Future>();
        CountDownLatch countDownLatch = new CountDownLatch(count);
        this.getMinOrderMethodWrapper(methodWrappers).forEach(queue::offer);
        try {
            while (!Thread.interrupted() && !this.shouldBreak(methodWrappers)) {
                MethodExecuteWrapper methodWrapper = (MethodExecuteWrapper)queue.poll(10L, TimeUnit.MILLISECONDS);
                if (methodWrapper == null) continue;
                methodWrappers.get(methodWrapper.getChainName()).remove(methodWrapper);
                Future<?> methodFuture = this.getExecutorService().submit(() -> {
                    Object result = null;
                    try {
                        if (!methodWrapper.shouldSkip) {
                            this.setMethodContextThreadLocal(methodContext, job);
                            Method method = methodWrapper.getMethod();
                            Object service = methodWrapper.getService();
                            this.info(String.format("message scheduler executor ===> service: %s,method: %s", service.getClass().getName(), method.getName()));
                            ImplicitMethod implicitMethod = methodWrapper.getImplicitMethod();
                            Object implicit = implicitMethod != null ? implicitMethod.getMethod().invoke(implicitMethod.getImplicitObject(), requestProtocol) : requestProtocol;
                            result = methodWrapper.isHasMethodContext() ? (methodWrapper.isMethodContextOnLeft() ? method.invoke(service, methodContext, implicit) : method.invoke(service, implicit, methodContext)) : method.invoke(service, implicit);
                        }
                    }
                    catch (Throwable t) {
                        this.logger().error(String.format("method %s call failed", methodWrapper.getAlias()), t);
                        methodWrappers.forEach((k, v) -> v.forEach(m -> m.setShouldSkip(true)));
                        this.methodErrorHandle(t);
                    }
                    finally {
                        if (result != null) {
                            methodContext.setResult(result);
                        }
                        this.getMinOrderMethodWrapper((List)methodWrappers.get(methodWrapper.getChainName())).forEach(queue::offer);
                        this.cleanMethodContextThreadLocal(methodContext);
                        countDownLatch.countDown();
                    }
                });
                methodFutures.add(methodFuture);
            }
            countDownLatch.await();
        }
        catch (InterruptedException ie) {
            methodFutures.forEach(f -> f.cancel(true));
            throw ie;
        }
        if (this.t != null) {
            throw new MessageWarnException(MessageErrorConstants.MESSAGE_ERROR(), "method call failed", this.t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runOneJob(MessageJob job) {
        RequestProtocol requestProtocol = job.getRequestProtocol();
        ServiceMethodContext methodContext = job.getMethodContext();
        Map<String, List<MethodExecuteWrapper>> methodWrappers = job.getMethodExecuteWrappers();
        List<MethodExecuteWrapper> methodExecuteWrappers = this.getMinOrderMethodWrapper(methodWrappers);
        if (methodExecuteWrappers.size() == 1) {
            MethodExecuteWrapper methodWrapper = methodExecuteWrappers.get(0);
            Object result = null;
            try {
                if (!methodWrapper.shouldSkip) {
                    this.setMethodContextThreadLocal(methodContext, job);
                    Method method = methodWrapper.getMethod();
                    Object service = methodWrapper.getService();
                    this.logger().info(String.format("message scheduler executor ===> service: %s,method: %s", service.getClass().getName(), method.getName()));
                    ImplicitMethod implicitMethod = methodWrapper.getImplicitMethod();
                    Object implicit = implicitMethod != null ? implicitMethod.getMethod().invoke(implicitMethod.getImplicitObject(), requestProtocol) : requestProtocol;
                    result = methodWrapper.isHasMethodContext() ? (methodWrapper.isMethodContextOnLeft() ? method.invoke(service, methodContext, implicit) : method.invoke(service, implicit, methodContext)) : method.invoke(service, implicit);
                }
            }
            catch (Throwable t) {
                this.logger().error(String.format("method %s call failed", methodWrapper.getAlias()), t);
                methodWrappers.forEach((k, v) -> v.forEach(m -> m.setShouldSkip(true)));
                this.methodErrorHandle(t);
            }
            finally {
                if (result != null) {
                    methodContext.setResult(result);
                }
            }
        }
        if (this.t != null) {
            throw new MessageWarnException(10000, "method call failed", this.t);
        }
    }
}

