package org.jetlinks.rule.engine.defaults;

import java.util.function.Function;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.Input;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/EventBusInput.class */
public class EventBusInput implements Input {
    protected final String instanceId;
    protected final String nodeId;
    protected final EventBus bus;
    static Subscription.Feature[] features = {Subscription.Feature.local, Subscription.Feature.broker, Subscription.Feature.shared, Subscription.Feature.sharedLocalFirst};

    @Override // org.jetlinks.rule.engine.api.task.Input
    public Flux<RuleData> accept() {
        return this.bus.subscribe(Subscription.of("rule-engine:" + this.nodeId, RuleConstants.Topics.input(this.instanceId, this.nodeId), Subscription.Feature.values()), RuleData.class);
    }

    @Override // org.jetlinks.rule.engine.api.task.Input
    public Disposable accept(Function<RuleData, Mono<Boolean>> function) {
        return this.bus.subscribe(Subscription.of("rule-engine:" + this.nodeId, RuleConstants.Topics.input(this.instanceId, this.nodeId), features), topicPayload -> {
            return ((Mono) function.apply(topicPayload.decode(RuleData.class))).then();
        });
    }

    public EventBusInput(String str, String str2, EventBus eventBus) {
        this.instanceId = str;
        this.nodeId = str2;
        this.bus = eventBus;
    }
}
