/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.Binding;
import dev.miku.r2dbc.mysql.CommitRollbackState;
import dev.miku.r2dbc.mysql.ConnectionContext;
import dev.miku.r2dbc.mysql.ConnectionState;
import dev.miku.r2dbc.mysql.LoginExchangeable;
import dev.miku.r2dbc.mysql.MultiQueryExchangeable;
import dev.miku.r2dbc.mysql.PrepareExchangeable;
import dev.miku.r2dbc.mysql.Query;
import dev.miku.r2dbc.mysql.QueryLogger;
import dev.miku.r2dbc.mysql.StartTransactionState;
import dev.miku.r2dbc.mysql.TextQueryExchangeable;
import dev.miku.r2dbc.mysql.TransactionBatchExchangeable;
import dev.miku.r2dbc.mysql.TransactionMultiExchangeable;
import dev.miku.r2dbc.mysql.cache.PrepareCache;
import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.constant.SslMode;
import dev.miku.r2dbc.mysql.message.client.TextQueryMessage;
import dev.miku.r2dbc.mysql.message.server.CompleteMessage;
import dev.miku.r2dbc.mysql.message.server.ErrorMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.util.InternalArrays;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.spi.TransactionDefinition;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

final class QueryFlow {
    static final Logger logger = Loggers.getLogger(QueryFlow.class);
    private static final Predicate<ServerMessage> RESULT_DONE = message -> message instanceof CompleteMessage;
    private static final Consumer<ServerMessage> EXECUTE_VOID = message -> {
        if (message instanceof ErrorMessage) {
            throw ((ErrorMessage)message).toException();
        }
        if (message instanceof ReferenceCounted) {
            ReferenceCountUtil.safeRelease((Object)message);
        }
    };

    static Flux<Flux<ServerMessage>> execute(Client client, String sql, List<Binding> bindings, int fetchSize, PrepareCache cache) {
        return Flux.defer(() -> {
            if (bindings.isEmpty()) {
                return Flux.empty();
            }
            return client.exchange(new PrepareExchangeable(cache, sql, bindings.iterator(), fetchSize)).windowUntil(RESULT_DONE);
        });
    }

    static Flux<Flux<ServerMessage>> execute(Client client, Query query, List<Binding> bindings) {
        return Flux.defer(() -> {
            if (bindings.isEmpty()) {
                return Flux.empty();
            }
            return client.exchange(new TextQueryExchangeable(query, bindings.iterator())).windowUntil(RESULT_DONE);
        });
    }

    static Flux<Flux<ServerMessage>> execute(Client client, String sql) {
        return Flux.defer(() -> QueryFlow.execute0(client, sql).windowUntil(RESULT_DONE));
    }

    static Flux<Flux<ServerMessage>> execute(Client client, List<String> statements) {
        return Flux.defer(() -> {
            switch (statements.size()) {
                case 0: {
                    return Flux.empty();
                }
                case 1: {
                    return QueryFlow.execute0(client, (String)statements.get(0)).windowUntil(RESULT_DONE);
                }
            }
            return client.exchange(new MultiQueryExchangeable(statements.iterator())).windowUntil(RESULT_DONE);
        });
    }

    static Mono<Client> login(Client client, SslMode sslMode, String database, String user, @Nullable CharSequence password, ConnectionContext context) {
        return client.exchange(new LoginExchangeable(client, sslMode, database, user, password, context)).onErrorResume(e -> client.forceClose().then(Mono.error((Throwable)e))).then(Mono.just((Object)client));
    }

    static Mono<Void> executeVoid(Client client, String sql) {
        return Mono.defer(() -> QueryFlow.execute0(client, sql).doOnNext(EXECUTE_VOID).then());
    }

    static Mono<Void> executeVoid(Client client, String ... statements) {
        switch (statements.length) {
            case 0: {
                return Mono.empty();
            }
            case 1: {
                return QueryFlow.executeVoid(client, statements[0]);
            }
        }
        return client.exchange(new MultiQueryExchangeable(InternalArrays.asIterator(statements))).doOnNext(EXECUTE_VOID).then();
    }

    static Mono<Void> beginTransaction(Client client, ConnectionState state, boolean batchSupported, TransactionDefinition definition) {
        StartTransactionState startState = StartTransactionState.of(state, definition);
        if (batchSupported || startState.isSimple()) {
            return client.exchange(new TransactionBatchExchangeable(startState)).then();
        }
        return client.exchange(new TransactionMultiExchangeable(startState)).then();
    }

    static Mono<Void> doneTransaction(Client client, ConnectionState state, boolean commit, long lockWaitTimeout, boolean batchSupported) {
        CommitRollbackState commitState = CommitRollbackState.of(state, commit, lockWaitTimeout);
        if (batchSupported || commitState.isSimple()) {
            return client.exchange(new TransactionBatchExchangeable(commitState)).then();
        }
        return client.exchange(new TransactionMultiExchangeable(commitState)).then();
    }

    private static Flux<ServerMessage> execute0(Client client, String sql) {
        return client.exchange(new TextQueryMessage(sql), (message, sink) -> {
            if (message instanceof ErrorMessage) {
                sink.next((Object)((ErrorMessage)message).offendedBy(sql));
                sink.complete();
            } else {
                sink.next(message);
                if (message instanceof CompleteMessage && ((CompleteMessage)message).isDone()) {
                    sink.complete();
                }
            }
        }).doOnSubscribe(ignored -> QueryLogger.log(sql));
    }

    private QueryFlow() {
    }
}

