package com.taosdata.jdbc.ws.tmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.enums.WSFunction;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.TopicPartition;
import com.taosdata.jdbc.ws.FutureResponse;
import com.taosdata.jdbc.ws.InFlightRequest;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.FetchBlockResp;
import com.taosdata.jdbc.ws.entity.Response;
import com.taosdata.jdbc.ws.tmq.entity.CommitResp;
import com.taosdata.jdbc.ws.tmq.entity.ConsumerParam;
import com.taosdata.jdbc.ws.tmq.entity.PollResp;
import com.taosdata.jdbc.ws.tmq.entity.SubscribeResp;
import com.taosdata.jdbc.ws.tmq.entity.TMQRequestFactory;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;

/* loaded from: input_file:com/taosdata/jdbc/ws/tmq/WSConsumer.class */
public class WSConsumer<V> implements Consumer<V> {
    private Transport transport;
    private ConsumerParam param;
    private TMQRequestFactory factory;
    private long offset = 0;

    @Override // com.taosdata.jdbc.common.Consumer
    public void create(Properties properties) throws SQLException {
        this.factory = new TMQRequestFactory();
        this.param = new ConsumerParam(properties);
        InFlightRequest inFlightRequest = new InFlightRequest(this.param.getConnectionParam().getRequestTimeout(), this.param.getConnectionParam().getMaxRequest());
        this.transport = new Transport(WSFunction.TMQ, this.param.getConnectionParam(), inFlightRequest);
        this.transport.setTextMessageHandler(str -> {
            JSONObject parseObject = JSON.parseObject(str);
            Response response = (Response) parseObject.toJavaObject(ConsumerAction.of(parseObject.getString("action")).getResponseClazz());
            FutureResponse remove = inFlightRequest.remove(response.getAction(), Long.valueOf(response.getReqId()));
            if (null != remove) {
                remove.getFuture().complete(response);
            }
        });
        this.transport.setBinaryMessageHandler(byteBuffer -> {
            byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
            byteBuffer.position(8);
            long j = byteBuffer.getLong();
            byteBuffer.position(24);
            FutureResponse remove = inFlightRequest.remove(ConsumerAction.FETCH_BLOCK.getAction(), Long.valueOf(j));
            if (null != remove) {
                remove.getFuture().complete(new FetchBlockResp(j, byteBuffer));
            }
        });
        Transport.checkConnection(this.transport, this.param.getConnectionParam().getConnectTimeout());
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void subscribe(Collection<String> collection) throws SQLException {
        SubscribeResp subscribeResp = (SubscribeResp) this.transport.send(this.factory.generateSubscribe(this.param.getConnectionParam().getUser(), this.param.getConnectionParam().getPassword(), this.param.getConnectionParam().getDatabase(), this.param.getGroupId(), this.param.getClientId(), this.param.getOffsetRest(), (String[]) collection.toArray(new String[0])));
        if (Code.SUCCESS.getCode() != subscribeResp.getCode()) {
            throw new SQLException("subscribe topic error, code: " + subscribeResp.getCode() + ", message: " + subscribeResp.getMessage());
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void unsubscribe() throws SQLException {
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Set<String> subscription() throws SQLException {
        throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD);
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public ConsumerRecords<V> poll(Duration duration, Deserializer<V> deserializer) throws SQLException {
        PollResp pollResp = (PollResp) this.transport.send(this.factory.generatePoll(duration.toMillis()));
        if (Code.SUCCESS.getCode() != pollResp.getCode()) {
            throw new SQLException("consumer poll error, code: " + pollResp.getCode() + ", message: " + pollResp.getMessage());
        }
        if (!pollResp.isHaveMessage()) {
            return ConsumerRecords.emptyRecord();
        }
        this.offset = pollResp.getMessageId();
        ConsumerRecords<V> consumerRecords = new ConsumerRecords<>(pollResp.getMessageId());
        WSConsumerResultSet wSConsumerResultSet = new WSConsumerResultSet(this.transport, this.factory, pollResp.getMessageId(), pollResp.getDatabase());
        Throwable th = null;
        while (wSConsumerResultSet.next()) {
            try {
                try {
                    consumerRecords.put(new TopicPartition(pollResp.getTopic(), pollResp.getDatabase(), pollResp.getVgroupId()), deserializer.deserialize(wSConsumerResultSet));
                } finally {
                }
            } catch (Throwable th2) {
                if (wSConsumerResultSet != null) {
                    if (th != null) {
                        try {
                            wSConsumerResultSet.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        wSConsumerResultSet.close();
                    }
                }
                throw th2;
            }
        }
        if (wSConsumerResultSet != null) {
            if (0 != 0) {
                try {
                    wSConsumerResultSet.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                wSConsumerResultSet.close();
            }
        }
        if (this.param.isAutoCommit()) {
            commitSync();
        }
        return consumerRecords;
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public synchronized void commitSync() throws SQLException {
        if (0 != this.offset) {
            CommitResp commitResp = (CommitResp) this.transport.send(this.factory.generateCommit(this.offset));
            if (Code.SUCCESS.getCode() != commitResp.getCode()) {
                throw new SQLException("consumer commit error. code: " + commitResp.getCode() + ", message: " + commitResp.getMessage());
            }
            this.offset = 0L;
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void close() throws SQLException {
        this.transport.close();
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void commitAsync(OffsetCommitCallback<V> offsetCommitCallback) {
    }
}
