package com.dtflys.forest.http;

import cn.hutool.core.collection.CollectionUtil;
import com.dtflys.forest.annotation.SSEDataMessage;
import com.dtflys.forest.annotation.SSEEventMessage;
import com.dtflys.forest.annotation.SSEIdMessage;
import com.dtflys.forest.annotation.SSEMessage;
import com.dtflys.forest.annotation.SSERetryMessage;
import com.dtflys.forest.converter.ForestConverter;
import com.dtflys.forest.exceptions.ForestRuntimeException;
import com.dtflys.forest.interceptor.Interceptor;
import com.dtflys.forest.interceptor.SSEInterceptor;
import com.dtflys.forest.reflection.MethodLifeCycleHandler;
import com.dtflys.forest.sse.EventSource;
import com.dtflys.forest.sse.ForestSSEListener;
import com.dtflys.forest.sse.SSEMessageConsumer;
import com.dtflys.forest.sse.SSEMessageMethod;
import com.dtflys.forest.sse.SSEMessageResult;
import com.dtflys.forest.sse.SSEState;
import com.dtflys.forest.sse.SSEStringMessageConsumer;
import com.dtflys.forest.utils.ForestDataType;
import com.dtflys.forest.utils.ReflectUtils;
import com.dtflys.forest.utils.StringUtils;
import com.dtflys.forest.utils.TypeReference;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:com/dtflys/forest/http/ForestSSE.class */
public class ForestSSE implements ForestSSEListener<ForestSSE> {
    private ForestRequest<InputStream> request;
    private Consumer<EventSource> onOpenConsumer;
    private Consumer<EventSource> onCloseConsumer;
    private volatile CompletableFuture<? extends ForestSSE> completableFuture;
    private volatile SSEState state = SSEState.INITIALIZED;
    private Map<String, List<SSEStringMessageConsumer>> consumerMap = new ConcurrentHashMap();

    public static ForestSSE fromRequest(ForestRequest forestRequest) {
        ForestSSE forestSSE = new ForestSSE();
        forestSSE.init(forestRequest);
        return forestSSE;
    }

    public static <T extends ForestSSE> T fromClass(ForestRequest forestRequest, Class<T> cls) {
        T t = (T) forestRequest.getConfiguration().getForestObject(cls, false);
        t.init(forestRequest);
        return t;
    }

    protected ForestSSE() {
    }

    void init(ForestRequest forestRequest) {
        if (this.request == null) {
            this.request = forestRequest;
            this.request.isSSE = true;
            this.request.setLifeCycleHandler(new MethodLifeCycleHandler<InputStream>(InputStream.class, InputStream.class) { // from class: com.dtflys.forest.http.ForestSSE.1
            });
            Method[] methods = ReflectUtils.getMethods(getClass());
            for (Interceptor interceptor : forestRequest.getInterceptorChain().getInterceptors()) {
                if (interceptor instanceof SSEInterceptor) {
                    registerMethodArray(interceptor, ReflectUtils.getMethods(interceptor.getClass()));
                }
            }
            registerMethodArray(this, methods);
        }
    }

    private void registerMethodArray(Object obj, Method[] methodArr) {
        for (Method method : methodArr) {
            for (Annotation annotation : method.getAnnotations()) {
                if (annotation instanceof SSEMessage) {
                    registerMessageMethod(obj, method, annotation, null);
                } else if (annotation instanceof SSEDataMessage) {
                    registerMessageMethod(obj, method, annotation, "data");
                } else if (annotation instanceof SSEEventMessage) {
                    registerMessageMethod(obj, method, annotation, "event");
                } else if (annotation instanceof SSEIdMessage) {
                    registerMessageMethod(obj, method, annotation, "id");
                } else if (annotation instanceof SSERetryMessage) {
                    registerMessageMethod(obj, method, annotation, "retry");
                }
            }
        }
    }

    private void registerMessageMethod(Object obj, Method method, Annotation annotation, String str) {
        Map<String, Object> attributesFromAnnotation = ReflectUtils.getAttributesFromAnnotation(annotation);
        String valueOf = String.valueOf(attributesFromAnnotation.getOrDefault("valueRegex", ""));
        String valueOf2 = String.valueOf(attributesFromAnnotation.getOrDefault("valuePrefix", ""));
        String valueOf3 = String.valueOf(attributesFromAnnotation.getOrDefault("valuePostfix", ""));
        String valueOf4 = str != null ? str : String.valueOf(attributesFromAnnotation.getOrDefault("name", ""));
        SSEMessageMethod sSEMessageMethod = new SSEMessageMethod(this, obj, method);
        if (StringUtils.isEmpty(valueOf) && StringUtils.isEmpty(valueOf2) && StringUtils.isEmpty(valueOf3)) {
            addConsumer(valueOf4, (eventSource, str2, str3) -> {
                sSEMessageMethod.invoke(eventSource);
            });
        } else {
            addConsumerMatches(valueOf4, valueOf, valueOf2, valueOf3, (eventSource2, str4, str5) -> {
                sSEMessageMethod.invoke(eventSource2);
            });
        }
    }

    public ForestSSE addConsumer(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        this.consumerMap.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArrayList();
        }).add(sSEStringMessageConsumer);
        return this;
    }

    public <T> ForestSSE addConsumer(String str, Class<T> cls, SSEMessageConsumer<T> sSEMessageConsumer) {
        this.consumerMap.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArrayList();
        }).add((eventSource, str3, str4) -> {
            if (cls.isAssignableFrom(CharSequence.class)) {
                sSEMessageConsumer.onMessage(eventSource, str3, String.valueOf(str4));
            } else {
                sSEMessageConsumer.onMessage(eventSource, str3, eventSource.value(cls));
            }
        });
        return this;
    }

    public <T> ForestSSE addConsumer(String str, TypeReference<T> typeReference, SSEMessageConsumer<T> sSEMessageConsumer) {
        this.consumerMap.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArrayList();
        }).add((eventSource, str3, str4) -> {
            sSEMessageConsumer.onMessage(eventSource, str3, this.request.getConfiguration().getConverter(ForestDataType.AUTO).convertToJavaObject((ForestConverter) str4, (Type) typeReference));
        });
        return this;
    }

    public ForestSSE addConsumer(String str, final Function<EventSource, Boolean> function, final SSEStringMessageConsumer sSEStringMessageConsumer) {
        this.consumerMap.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArrayList();
        }).add(new SSEStringMessageConsumer() { // from class: com.dtflys.forest.http.ForestSSE.2
            @Override // com.dtflys.forest.sse.SSEMessageConsumer
            public void onMessage(EventSource eventSource, String str3, String str4) {
                sSEStringMessageConsumer.onMessage(eventSource, str3, str4);
            }

            @Override // com.dtflys.forest.sse.SSEMessageConsumer
            public boolean matches(EventSource eventSource) {
                return ((Boolean) function.apply(eventSource)).booleanValue();
            }
        });
        return this;
    }

    public ForestSSE addConsumerMatchesPrefix(String str, String str2, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer(str, eventSource -> {
            return Boolean.valueOf(eventSource.value().startsWith(str2));
        }, sSEStringMessageConsumer);
    }

    public ForestSSE addConsumerMatchesPostfix(String str, String str2, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer(str, eventSource -> {
            return Boolean.valueOf(eventSource.value().endsWith(str2));
        }, sSEStringMessageConsumer);
    }

    public ForestSSE addConsumerMatches(String str, String str2, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer(str, eventSource -> {
            return Boolean.valueOf(eventSource.value().matches(str2));
        }, sSEStringMessageConsumer);
    }

    public ForestSSE addConsumerMatches(String str, String str2, String str3, String str4, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer(str, eventSource -> {
            String value = eventSource.value();
            if (StringUtils.isNotEmpty(str2) && !value.matches(str2)) {
                return false;
            }
            if (!StringUtils.isNotEmpty(str3) || value.startsWith(str3)) {
                return !StringUtils.isNotEmpty(str4) || value.endsWith(str4);
            }
            return false;
        }, sSEStringMessageConsumer);
    }

    public ForestSSE setOnOpen(Consumer<EventSource> consumer) {
        this.onOpenConsumer = consumer;
        return this;
    }

    public ForestSSE setOnClose(Consumer<EventSource> consumer) {
        this.onCloseConsumer = consumer;
        return this;
    }

    public ForestSSE addOnData(SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer("data", sSEStringMessageConsumer);
    }

    public <T> ForestSSE addOnData(Class<T> cls, SSEMessageConsumer<T> sSEMessageConsumer) {
        return addConsumer("data", cls, sSEMessageConsumer);
    }

    public ForestSSE addOnDataMatchesPrefix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPrefix("data", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnDataMatchesPostfix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPostfix("data", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnEvent(SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer("event", sSEStringMessageConsumer);
    }

    public <T> ForestSSE addOnEvent(Class<T> cls, SSEMessageConsumer<T> sSEMessageConsumer) {
        return addConsumer("event", cls, sSEMessageConsumer);
    }

    public ForestSSE addOnEventMatchesPrefix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPrefix("event", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnEventMatchesPostfix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPostfix("event", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnId(SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer("id", sSEStringMessageConsumer);
    }

    public <T> ForestSSE addOnId(Class<T> cls, SSEMessageConsumer<T> sSEMessageConsumer) {
        return addConsumer("id", cls, sSEMessageConsumer);
    }

    public ForestSSE addOnIdMatchesPrefix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPrefix("id", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnIdMatchesPostfix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPostfix("id", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnRetry(SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumer("retry", sSEStringMessageConsumer);
    }

    public <T> ForestSSE addOnRetry(Class<T> cls, SSEMessageConsumer<T> sSEMessageConsumer) {
        return addConsumer("retry", cls, sSEMessageConsumer);
    }

    public ForestSSE addOnRetryMatchesPrefix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPrefix("retry", str, sSEStringMessageConsumer);
    }

    public ForestSSE addOnRetryMatchesPostfix(String str, SSEStringMessageConsumer sSEStringMessageConsumer) {
        return addConsumerMatchesPostfix("retry", str, sSEStringMessageConsumer);
    }

    private void doOnOpen(EventSource eventSource) {
        for (Interceptor interceptor : eventSource.request().getInterceptorChain().getInterceptors()) {
            if (interceptor instanceof SSEInterceptor) {
                ((SSEInterceptor) interceptor).onSSEOpen(eventSource);
            }
        }
        onOpen(eventSource);
        if (this.onOpenConsumer != null) {
            this.onOpenConsumer.accept(eventSource);
        }
    }

    protected void onOpen(EventSource eventSource) {
    }

    private void doOnClose(EventSource eventSource) {
        ForestRequest request = eventSource.request();
        ForestResponse response = eventSource.response();
        try {
            for (Interceptor interceptor : request.getInterceptorChain().getInterceptors()) {
                if (interceptor instanceof SSEInterceptor) {
                    ((SSEInterceptor) interceptor).onSSEClose(eventSource);
                }
            }
            if (this.onCloseConsumer != null) {
                this.onCloseConsumer.accept(eventSource);
            }
            onClose(eventSource);
            response.close();
            this.state = SSEState.CLOSED;
        } catch (Throwable th) {
            response.close();
            this.state = SSEState.CLOSED;
            throw th;
        }
    }

    protected void onClose(EventSource eventSource) {
    }

    @Override // com.dtflys.forest.sse.SSEMessageConsumer
    public void onMessage(EventSource eventSource, String str, String str2) {
        List<SSEStringMessageConsumer> list = this.consumerMap.get(str);
        if (CollectionUtil.isEmpty(list)) {
            return;
        }
        for (SSEStringMessageConsumer sSEStringMessageConsumer : list) {
            if (sSEStringMessageConsumer.matches(eventSource)) {
                sSEStringMessageConsumer.onMessage(eventSource, str, str2);
                if (this.state != SSEState.LISTENING || SSEMessageResult.CLOSE.equals(eventSource.messageResult())) {
                    return;
                }
            }
        }
    }

    @Override // com.dtflys.forest.sse.ForestSSEListener
    public ForestRequest getRequest() {
        return this.request;
    }

    private EventSource parseEventSource(ForestResponse forestResponse, String str) {
        String[] split = str.split("\\:", 2);
        if (split.length == 1) {
            return new EventSource(this, "", this.request, forestResponse, str, str);
        }
        return new EventSource(this, split[0].trim(), this.request, forestResponse, str, StringUtils.trimBegin(split[1]));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.dtflys.forest.sse.ForestSSEListener
    public <R extends ForestSSE> R listen() {
        ForestResponse forestResponse;
        boolean isAsync = this.request.isAsync();
        this.state = SSEState.REQUESTING;
        if (isAsync) {
            try {
                forestResponse = (ForestResponse) this.request.executeAsCompletableFuture(new TypeReference<ForestResponse<InputStream>>() { // from class: com.dtflys.forest.http.ForestSSE.3
                }).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new ForestRuntimeException(e);
            }
        } else {
            forestResponse = (ForestResponse) this.request.execute(new TypeReference<ForestResponse<InputStream>>() { // from class: com.dtflys.forest.http.ForestSSE.4
            });
        }
        if (forestResponse != null && !forestResponse.isError()) {
            this.state = SSEState.LISTENING;
            EventSource eventSource = new EventSource(this, "open", this.request, forestResponse);
            doOnOpen(eventSource);
            if (SSEState.LISTENING != this.state || SSEMessageResult.CLOSE.equals(eventSource.messageResult())) {
                doOnClose(new EventSource(this, "close", this.request, forestResponse));
                return this;
            }
            try {
                String str = (String) Optional.ofNullable(forestResponse.getCharset()).orElse("UTF-8");
                ForestResponse forestResponse2 = forestResponse;
                forestResponse.openStream((inputStream, forestResponse3) -> {
                    try {
                        try {
                            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, str));
                            while (true) {
                                String readLine = bufferedReader.readLine();
                                if (readLine == null) {
                                    break;
                                }
                                if (!StringUtils.isEmpty(readLine)) {
                                    EventSource parseEventSource = parseEventSource(forestResponse2, readLine);
                                    onMessage(parseEventSource, parseEventSource.name(), parseEventSource.value());
                                    if (SSEState.LISTENING != this.state) {
                                        break;
                                    }
                                }
                            }
                            doOnClose(new EventSource(this, "close", this.request, forestResponse2));
                        } catch (IOException e2) {
                            throw new ForestRuntimeException(e2);
                        }
                    } catch (Throwable th) {
                        doOnClose(new EventSource(this, "close", this.request, forestResponse2));
                        throw th;
                    }
                });
                return this;
            } catch (Exception e2) {
                throw new ForestRuntimeException(e2);
            }
        }
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.dtflys.forest.sse.ForestSSEListener
    public <R extends ForestSSE> R asyncListen() {
        this.completableFuture = CompletableFuture.supplyAsync(this::listen);
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.dtflys.forest.sse.ForestSSEListener
    public <R extends ForestSSE> R asyncListen(ExecutorService executorService) {
        this.completableFuture = CompletableFuture.supplyAsync(this::listen, executorService);
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.dtflys.forest.sse.ForestSSEListener
    public <R extends ForestSSE> R await() {
        if (this.completableFuture != null) {
            this.completableFuture.join();
        }
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.dtflys.forest.sse.ForestSSEListener
    public <R extends ForestSSE> R close() {
        this.state = SSEState.CLOSING;
        return this;
    }
}
