/*
 * Decompiled with CFR 0.152.
 */
package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.Binding;
import dev.miku.r2dbc.mysql.QueryLogger;
import dev.miku.r2dbc.mysql.cache.PrepareCache;
import dev.miku.r2dbc.mysql.client.FluxExchangeable;
import dev.miku.r2dbc.mysql.message.client.ClientMessage;
import dev.miku.r2dbc.mysql.message.client.PrepareQueryMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedCloseMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedExecuteMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedFetchMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedResetMessage;
import dev.miku.r2dbc.mysql.message.server.CompleteMessage;
import dev.miku.r2dbc.mysql.message.server.EofMessage;
import dev.miku.r2dbc.mysql.message.server.ErrorMessage;
import dev.miku.r2dbc.mysql.message.server.OkMessage;
import dev.miku.r2dbc.mysql.message.server.PreparedOkMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.ServerStatusMessage;
import dev.miku.r2dbc.mysql.message.server.SyntheticMetadataMessage;
import io.netty.util.ReferenceCountUtil;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

final class PrepareExchangeable
extends FluxExchangeable<ServerMessage> {
    private static final Logger logger = Loggers.getLogger(PrepareExchangeable.class);
    private static final int PREPARE_OR_RESET = 1;
    private static final int EXECUTE = 2;
    private static final int FETCH = 3;
    private final AtomicBoolean disposed = new AtomicBoolean();
    private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer((Queue)Queues.one().get());
    private final PrepareCache cache;
    private final String sql;
    private final Iterator<Binding> bindings;
    private final int fetchSize;
    private int mode = 1;
    @Nullable
    private Integer statementId;
    private boolean shouldClose;

    PrepareExchangeable(PrepareCache cache, String sql, Iterator<Binding> bindings, int fetchSize) {
        this.cache = cache;
        this.sql = sql;
        this.bindings = bindings;
        this.fetchSize = fetchSize;
    }

    public void subscribe(CoreSubscriber<? super ClientMessage> actual) {
        this.requests.asFlux().subscribe(actual);
        Integer statementId = this.cache.getIfPresent(this.sql);
        if (statementId == null) {
            logger.debug("Prepare cache mismatch, try to preparing");
            this.shouldClose = true;
            QueryLogger.log(this.sql);
            Sinks.EmitResult result = this.requests.tryEmitNext((Object)new PrepareQueryMessage(this.sql));
            if (result != Sinks.EmitResult.OK) {
                logger.error("Fail to emit prepare query message due to {}", new Object[]{result});
            }
        } else {
            logger.debug("Prepare cache matched statement {} when getting", new Object[]{statementId});
            this.shouldClose = false;
            this.statementId = statementId;
            Sinks.EmitResult result = this.requests.tryEmitNext((Object)new PreparedResetMessage(statementId));
            if (result != Sinks.EmitResult.OK) {
                logger.error("Fail to emit reset statement message due to {}", new Object[]{result});
            }
        }
    }

    @Override
    public void accept(ServerMessage message, SynchronousSink<ServerMessage> sink) {
        if (message instanceof ErrorMessage) {
            sink.next((Object)((ErrorMessage)message).offendedBy(this.sql));
            sink.complete();
            return;
        }
        switch (this.mode) {
            case 1: {
                if (message instanceof OkMessage) {
                    Integer statementId = this.statementId;
                    if (statementId == null) {
                        logger.error("Reset succeed but statement ID was null");
                        return;
                    }
                    this.doNextExecute(statementId, sink);
                    break;
                }
                if (message instanceof PreparedOkMessage) {
                    PreparedOkMessage ok = (PreparedOkMessage)message;
                    int statementId = ok.getStatementId();
                    int columns = ok.getTotalColumns();
                    int parameters = ok.getTotalParameters();
                    this.statementId = statementId;
                    if (columns > -parameters) break;
                    this.putToCache(statementId);
                    this.doNextExecute(statementId, sink);
                    break;
                }
                if (message instanceof SyntheticMetadataMessage && ((SyntheticMetadataMessage)message).isCompleted()) {
                    Integer statementId = this.statementId;
                    if (statementId == null) {
                        logger.error("Prepared OK message not found");
                        return;
                    }
                    this.putToCache(statementId);
                    this.doNextExecute(statementId, sink);
                    break;
                }
                ReferenceCountUtil.safeRelease((Object)message);
                break;
            }
            case 2: {
                if (message instanceof CompleteMessage && ((CompleteMessage)message).isDone()) {
                    this.onCompleteMessage((CompleteMessage)message, sink);
                    break;
                }
                if (message instanceof SyntheticMetadataMessage) {
                    EofMessage eof = ((SyntheticMetadataMessage)message).getEof();
                    if (eof instanceof ServerStatusMessage && (((ServerStatusMessage)((Object)eof)).getServerStatuses() & 0x40) != 0) {
                        if (!this.doNextFetch(sink)) break;
                        sink.next((Object)message);
                        break;
                    }
                    this.setMode(3);
                    sink.next((Object)message);
                    break;
                }
                sink.next((Object)message);
                break;
            }
            default: {
                if (message instanceof CompleteMessage && ((CompleteMessage)message).isDone()) {
                    this.onCompleteMessage((CompleteMessage)message, sink);
                    break;
                }
                sink.next((Object)message);
            }
        }
    }

    public void dispose() {
        if (this.disposed.compareAndSet(false, true)) {
            Integer statementId = this.statementId;
            if (this.shouldClose && statementId != null) {
                logger.debug("Closing statement {} after used", new Object[]{statementId});
                Sinks.EmitResult result = this.requests.tryEmitNext((Object)new PreparedCloseMessage(statementId));
                if (result != Sinks.EmitResult.OK) {
                    logger.error("Fail to close statement {} due to {}", new Object[]{statementId, result});
                }
            }
            this.requests.tryEmitComplete();
            while (this.bindings.hasNext()) {
                this.bindings.next().clear();
            }
        }
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }

    private void putToCache(Integer statementId) {
        boolean putSucceed;
        try {
            putSucceed = this.cache.putIfAbsent(this.sql, statementId, evictId -> {
                logger.debug("Prepare cache evicts statement {} when putting", new Object[]{evictId});
                Sinks.EmitResult result = this.requests.tryEmitNext((Object)new PreparedCloseMessage(evictId));
                if (result != Sinks.EmitResult.OK) {
                    logger.error("Fail to close evicted statement {} due to {}", new Object[]{statementId, result});
                }
            });
        }
        catch (Throwable e) {
            logger.error("Put statement {} to cache failed", new Object[]{statementId, e});
            putSucceed = false;
        }
        this.shouldClose = !putSucceed;
        logger.debug("Prepare cache put statement {} is {}", new Object[]{statementId, putSucceed ? "succeed" : "fails"});
    }

    private void doNextExecute(int statementId, SynchronousSink<ServerMessage> sink) {
        this.setMode(2);
        PreparedExecuteMessage message = this.bindings.next().toExecuteMessage(statementId, this.fetchSize <= 0);
        Sinks.EmitResult result = this.requests.tryEmitNext((Object)message);
        if (result != Sinks.EmitResult.OK) {
            logger.error("Fail to execute {} due to {}", new Object[]{statementId, result});
            message.dispose();
            sink.complete();
        }
    }

    private boolean doNextFetch(SynchronousSink<ServerMessage> sink) {
        Integer statementId = this.statementId;
        if (statementId == null) {
            sink.error((Throwable)new IllegalStateException("Statement ID must not be null when fetching"));
            return false;
        }
        this.setMode(3);
        Sinks.EmitResult result = this.requests.tryEmitNext((Object)new PreparedFetchMessage(statementId, this.fetchSize));
        if (result == Sinks.EmitResult.OK) {
            return true;
        }
        logger.error("Fail to fetch {} due to {}", new Object[]{statementId, result});
        sink.complete();
        return false;
    }

    private void setMode(int mode) {
        logger.debug("Mode is changed to {}", new Object[]{mode == 2 ? "EXECUTE" : "FETCH"});
        this.mode = mode;
    }

    private void onCompleteMessage(CompleteMessage message, SynchronousSink<ServerMessage> sink) {
        short statuses;
        if (((Boolean)this.requests.scanOrDefault(Scannable.Attr.TERMINATED, (Object)Boolean.FALSE)).booleanValue()) {
            logger.error("Unexpected terminated on requests");
            sink.next((Object)message);
            sink.complete();
            return;
        }
        if (message instanceof ServerStatusMessage && ((statuses = ((ServerStatusMessage)((Object)message)).getServerStatuses()) & 0x40) != 0 && (statuses & 0x80) == 0) {
            this.doNextFetch(sink);
            return;
        }
        sink.next((Object)message);
        if (this.bindings.hasNext()) {
            Integer statementId = this.statementId;
            if (statementId == null) {
                sink.error((Throwable)new IllegalStateException("Statement ID must not be null when executing"));
                return;
            }
            this.doNextExecute(statementId, sink);
        } else {
            sink.complete();
        }
    }
}

