package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.client.FluxExchangeable;
import dev.miku.r2dbc.mysql.message.client.ClientMessage;
import dev.miku.r2dbc.mysql.message.server.CompleteMessage;
import dev.miku.r2dbc.mysql.message.server.ErrorMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import java.util.Queue;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/* compiled from: QueryFlow.java */
/* loaded from: input_file:dev/miku/r2dbc/mysql/BaseFluxExchangeable.class */
abstract class BaseFluxExchangeable extends FluxExchangeable<ServerMessage> {
    protected final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer((Queue) Queues.one().get());

    public final void subscribe(CoreSubscriber<? super ClientMessage> coreSubscriber) {
        this.requests.asFlux().subscribe(coreSubscriber);
        tryNextOrComplete(null);
    }

    @Override // java.util.function.BiConsumer
    public final void accept(ServerMessage serverMessage, SynchronousSink<ServerMessage> synchronousSink) {
        if (serverMessage instanceof ErrorMessage) {
            synchronousSink.next(((ErrorMessage) serverMessage).offendedBy(offendingSql()));
            synchronousSink.complete();
            return;
        }
        synchronousSink.next(serverMessage);
        if ((serverMessage instanceof CompleteMessage) && ((CompleteMessage) serverMessage).isDone()) {
            tryNextOrComplete(synchronousSink);
        }
    }

    protected abstract void tryNextOrComplete(@Nullable SynchronousSink<ServerMessage> synchronousSink);

    protected abstract String offendingSql();
}
