package org.jetlinks.sdk.server.commons.cmd;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jetlinks.core.command.AbstractCommand;
import org.jetlinks.core.command.CommandHandler;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.sdk.server.commons.cmd.SubscribeCommand;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/sdk/server/commons/cmd/SubscribeCommand.class */
public abstract class SubscribeCommand<T, Self extends SubscribeCommand<T, Self>> extends AbstractCommand<Flux<T>, Self> implements UnboundedResponseCommand<T> {
    private transient Function<T, Mono<T>> callback;

    public Self withCallback(Function<T, Mono<T>> function) {
        this.callback = function;
        return (Self) castSelf();
    }

    public Function<T, Mono<T>> getCallback() {
        return this.callback == null ? Mono::just : this.callback;
    }

    public static <T, C extends SubscribeCommand<T, C>> CommandHandler<C, Flux<T>> createHandler(Supplier<FunctionMetadata> supplier, BiFunction<C, Function<T, Mono<T>>, Disposable> biFunction, Supplier<C> supplier2) {
        return CommandHandler.of(supplier, (subscribeCommand, commandSupport) -> {
            Function<T, Mono<T>> callback = subscribeCommand.getCallback();
            return Flux.create(fluxSink -> {
                fluxSink.onDispose((Disposable) biFunction.apply(subscribeCommand, obj -> {
                    Mono mono = (Mono) callback.apply(subscribeCommand.createResponseData(obj));
                    fluxSink.getClass();
                    return mono.doOnNext(fluxSink::next);
                }));
            });
        }, supplier2);
    }
}
