/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.ws;

import com.taosdata.jdbc.AbstractConnection;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.common.Column;
import com.taosdata.jdbc.common.ColumnInfo;
import com.taosdata.jdbc.common.DataLengthCfg;
import com.taosdata.jdbc.common.SerializeBlock;
import com.taosdata.jdbc.common.TableInfo;
import com.taosdata.jdbc.enums.FieldBindType;
import com.taosdata.jdbc.rs.ConnectionParam;
import com.taosdata.jdbc.utils.ReqId;
import com.taosdata.jdbc.utils.SyncObj;
import com.taosdata.jdbc.utils.Utils;
import com.taosdata.jdbc.ws.AbsWSPreparedStatement;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.WSRetryableStmt;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.CommonResp;
import com.taosdata.jdbc.ws.stmt2.entity.EWBackendThreadInfo;
import com.taosdata.jdbc.ws.stmt2.entity.EWRawBlock;
import com.taosdata.jdbc.ws.stmt2.entity.Field;
import com.taosdata.jdbc.ws.stmt2.entity.PstmtConInfo;
import com.taosdata.jdbc.ws.stmt2.entity.Stmt2PrepareResp;
import com.taosdata.jdbc.ws.stmt2.entity.StmtInfo;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WSEWPreparedStatement
extends AbsWSPreparedStatement {
    private static final Logger log = LoggerFactory.getLogger(WSEWPreparedStatement.class);
    private final boolean copyData;
    private final int writeThreadNum;
    private final ThreadPoolExecutor writerThreads;
    private final ArrayList<EWBackendThreadInfo> backendThreadInfoList;
    private final AtomicInteger remainingUnprocessedRows = new AtomicInteger(0);
    private final AtomicInteger batchInsertedRows = new AtomicInteger(0);
    private final AtomicInteger flushIn = new AtomicInteger(0);
    private final List<WorkerThread> workerThreadList;
    private final SyncObj syncObj = new SyncObj();
    private static final ForkJoinPool serializeExecutor = Utils.getForkJoinPool();
    private int addBatchCounts = 0;

    public WSEWPreparedStatement(Transport transport, ConnectionParam param, String database, AbstractConnection connection, String sql, Long instanceId, Stmt2PrepareResp prepareResp) throws SQLException {
        super(transport, param, database, connection, sql, instanceId, prepareResp);
        this.copyData = param.isCopyData();
        this.writeThreadNum = param.getBackendWriteThreadNum();
        this.writerThreads = (ThreadPoolExecutor)Executors.newFixedThreadPool(this.writeThreadNum);
        this.backendThreadInfoList = new ArrayList(this.writeThreadNum);
        for (int i = 0; i < this.writeThreadNum; ++i) {
            this.backendThreadInfoList.add(new EWBackendThreadInfo(param.getCacheSizeByRow(), param.getCacheSizeByRow() / param.getBatchSizeByRow()));
        }
        this.workerThreadList = new ArrayList<WorkerThread>(this.writeThreadNum);
        CommonResp res = null;
        for (int i = 0; i < this.writeThreadNum; ++i) {
            WorkerThread workerThread = new WorkerThread(this.backendThreadInfoList.get(i), new StmtInfo(prepareResp, sql), new PstmtConInfo(transport, param, database, connection, instanceId), this.closed, this.remainingUnprocessedRows, this.batchInsertedRows, this.flushIn, this.syncObj);
            this.workerThreadList.add(workerThread);
            workerThread.initStmt(param.getRetryTimes());
        }
        if (res != null && res.getCode() != Code.SUCCESS.getCode()) {
            for (WorkerThread workerThread : this.workerThreadList) {
                workerThread.releaseStmt();
            }
            throw new SQLException("(0x" + Integer.toHexString(res.getCode()) + "):" + res.getMessage());
        }
        for (WorkerThread workerThread : this.workerThreadList) {
            this.writerThreads.submit(workerThread);
        }
    }

    private Map<Integer, Column> copyMap(Map<Integer, Column> originalMap) {
        HashMap<Integer, Column> dstMap = new HashMap<Integer, Column>();
        originalMap.forEach((key, src) -> {
            Column dst = src;
            if (this.copyData && src.getData() instanceof byte[]) {
                byte[] srcBytes = (byte[])src.getData();
                byte[] copiedValue = new byte[srcBytes.length];
                System.arraycopy(srcBytes, 0, copiedValue, 0, srcBytes.length);
                dst = new Column(copiedValue, src.getType(), src.getIndex());
            }
            dstMap.put((Integer)key, dst);
        });
        return dstMap;
    }

    @Override
    public boolean execute() throws SQLException {
        if (this.isClosed()) {
            throw TSDBError.createSQLException(8964);
        }
        if (!this.stmtInfo.isInsert()) {
            throw TSDBError.createSQLException(8963, "Only support insert.");
        }
        this.executeUpdate();
        return !this.stmtInfo.isInsert();
    }

    @Override
    public ResultSet executeQuery() throws SQLException {
        throw TSDBError.createSQLException(8963, "Only support insert.");
    }

    @Override
    public int executeUpdate() throws SQLException {
        if (this.isClosed()) {
            throw TSDBError.createSQLException(8964);
        }
        this.waitWriteCompleted();
        Exception lastError = null;
        for (WorkerThread workerThread : this.workerThreadList) {
            Exception tempEx = workerThread.getAndClearLastError();
            if (tempEx == null || lastError != null) continue;
            lastError = tempEx;
        }
        int totalRowsInserted = this.batchInsertedRows.getAndSet(0);
        if (lastError != null) {
            throw new SQLException("InsertedRows: " + totalRowsInserted + ", ErrorInfo: " + lastError.getMessage(), "", 9104);
        }
        return totalRowsInserted;
    }

    private void waitWriteCompleted() {
        this.flushIn.incrementAndGet();
        while (this.remainingUnprocessedRows.get() != 0) {
            try {
                this.syncObj.await();
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private void checkDataLength(Map<Integer, Column> map) throws SQLException {
        for (int i = 0; i < this.stmtInfo.getFields().size(); ++i) {
            Object data;
            Field field = this.stmtInfo.getFields().get(i);
            Column column = map.get(i + 1);
            if (DataLengthCfg.getDataLength(column.getType()) != null || field.getBindType() == FieldBindType.TAOS_FIELD_TBNAME.getValue()) continue;
            if (column.getData() instanceof byte[] && ((byte[])(data = (byte[])column.getData())).length > field.getBytes()) {
                throw TSDBError.createSQLException(8963, "data length is too long, column index " + i);
            }
            if (!(column.getData() instanceof String) || ((String)(data = (Object)((String)column.getData()))).getBytes().length <= field.getBytes()) continue;
            throw TSDBError.createSQLException(8963, "data length is too long, column index " + i);
        }
    }

    private static void triggerSerializeProgressive(EWBackendThreadInfo ewBackendThreadInfo, StmtInfo stmtInfo, int batchSize) {
        if (!ewBackendThreadInfo.getWriteQueue().isEmpty() && ewBackendThreadInfo.getSerialQueue().remainingCapacity() > 0 && ewBackendThreadInfo.getSerializeRunning().compareAndSet(false, true)) {
            WSEWSerializationTask serializeTask = new WSEWSerializationTask(ewBackendThreadInfo, Math.min(ewBackendThreadInfo.getWriteQueue().size(), batchSize), stmtInfo, true);
            serializeExecutor.submit(serializeTask);
        }
    }

    private static void triggerSerializeIfNeeded(EWBackendThreadInfo ewBackendThreadInfo, StmtInfo stmtInfo, int batchSize) {
        if (ewBackendThreadInfo.getWriteQueue().size() >= batchSize && ewBackendThreadInfo.getSerialQueue().remainingCapacity() > 0 && ewBackendThreadInfo.getSerializeRunning().compareAndSet(false, true)) {
            WSEWSerializationTask serializeTask = new WSEWSerializationTask(ewBackendThreadInfo, batchSize, stmtInfo, false);
            serializeExecutor.submit(serializeTask);
        }
    }

    @Override
    public void addBatch() throws SQLException {
        if (this.colOrderedMap.size() == this.stmtInfo.getFields().size()) {
            int hashCode;
            Object o;
            Map<Integer, Column> map = this.copyMap(this.colOrderedMap);
            if (this.param.isStrictCheck()) {
                this.checkDataLength(map);
            }
            if ((o = map.get(this.stmtInfo.getToBeBindTableNameIndex() + 1).getData()) instanceof String) {
                hashCode = o.hashCode();
            } else if (o instanceof byte[]) {
                hashCode = Arrays.hashCode((byte[])o);
            } else {
                throw TSDBError.createSQLException(8963, "error type tbname.");
            }
            if (hashCode < 0) {
                hashCode = -hashCode;
            }
            int index = hashCode % this.writeThreadNum;
            EWBackendThreadInfo ewBackendThreadInfo = this.backendThreadInfoList.get(index);
            try {
                ewBackendThreadInfo.getWriteQueue().put(map);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SQLException("Interrupted while adding batch", e);
            }
            WSEWPreparedStatement.triggerSerializeIfNeeded(ewBackendThreadInfo, this.stmtInfo, this.param.getBatchSizeByRow());
            this.remainingUnprocessedRows.incrementAndGet();
            ++this.addBatchCounts;
        } else {
            throw TSDBError.createSQLException(8963, "Only support standard jdbc bind api.");
        }
    }

    @Override
    public int[] executeBatch() throws SQLException {
        int[] ints = new int[this.addBatchCounts];
        int len = ints.length;
        for (int i = 0; i < len; ++i) {
            ints[i] = -2;
        }
        this.addBatchCounts = 0;
        return ints;
    }

    @Override
    public void close() throws SQLException {
        this.waitWriteCompleted();
        if (this.isClosed()) {
            return;
        }
        super.close();
        if (this.writerThreads != null) {
            while (this.writerThreads.getActiveCount() != 0) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!this.writerThreads.isShutdown()) {
                this.writerThreads.shutdown();
            }
        }
        for (WorkerThread workerThread : this.workerThreadList) {
            workerThread.releaseStmt();
        }
    }

    @Override
    public ResultSetMetaData getMetaData() throws SQLException {
        throw TSDBError.createSQLException(8962, "Fast write mode only support insert.");
    }

    @Override
    public void columnDataAddBatch() throws SQLException {
        throw TSDBError.createSQLException(8962);
    }

    @Override
    public void columnDataExecuteBatch() throws SQLException {
        throw TSDBError.createSQLException(8962);
    }

    @Override
    public void columnDataCloseBatch() throws SQLException {
        throw TSDBError.createSQLException(8962);
    }

    static class WorkerThread
    extends WSRetryableStmt
    implements Runnable {
        private static final Logger log = LoggerFactory.getLogger(WorkerThread.class);
        private final EWBackendThreadInfo backendThreadInfo;
        private final AtomicBoolean isClosed;
        private final AtomicInteger remainingUnprocessedRows;
        private final AtomicInteger flushIn;
        private final SyncObj syncObj;

        public WorkerThread(EWBackendThreadInfo backendThreadInfo, StmtInfo stmtInfo, PstmtConInfo conInfo, AtomicBoolean isClosed, AtomicInteger remainingUnprocessedRows, AtomicInteger batchInsertedRows, AtomicInteger flushIn, SyncObj syncObj) {
            super(conInfo.getConnection(), conInfo.getParam(), conInfo.getDatabase(), conInfo.getTransport(), conInfo.getInstanceId(), stmtInfo, batchInsertedRows);
            this.backendThreadInfo = backendThreadInfo;
            this.stmtInfo = stmtInfo;
            this.isClosed = isClosed;
            this.remainingUnprocessedRows = remainingUnprocessedRows;
            this.flushIn = flushIn;
            this.syncObj = syncObj;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int flushInLocal = 0;
            ArrayBlockingQueue<Map<Integer, Column>> writeQueue = this.backendThreadInfo.getWriteQueue();
            ArrayBlockingQueue<EWRawBlock> serialQueue = this.backendThreadInfo.getSerialQueue();
            AtomicBoolean serializeRunning = this.backendThreadInfo.getSerializeRunning();
            while (!this.isClosed.get() || !serialQueue.isEmpty() || !writeQueue.isEmpty() || serializeRunning.get()) {
                int rowCount = 0;
                try {
                    EWRawBlock ewRawBlock;
                    if (this.flushIn.get() != flushInLocal) {
                        flushInLocal = this.flushIn.get();
                        this.syncObj.signal();
                    }
                    if ((ewRawBlock = serialQueue.poll(50L, TimeUnit.MILLISECONDS)) == null) {
                        WSEWPreparedStatement.triggerSerializeProgressive(this.backendThreadInfo, this.stmtInfo, this.param.getBatchSizeByRow());
                        continue;
                    }
                    rowCount = ewRawBlock.getRowCount();
                    this.lastError = ewRawBlock.getLastError();
                    if (this.lastError != null) continue;
                    this.writeBlockWithRetry(ewRawBlock.getByteBuf());
                    WSEWPreparedStatement.triggerSerializeIfNeeded(this.backendThreadInfo, this.stmtInfo, this.param.getBatchSizeByRow());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                catch (SQLException e) {
                    this.lastError = e;
                    log.error("Error in write data to server, stmt id: {}rows: {}, code: {}, msg: {}", new Object[]{this.stmtInfo.getStmtId(), rowCount, e.getErrorCode(), e.getMessage()});
                }
                finally {
                    if (rowCount <= 0) continue;
                    this.remainingUnprocessedRows.addAndGet(-rowCount);
                }
            }
            this.syncObj.signal();
        }

        public Exception getAndClearLastError() {
            SQLException tmp = this.lastError;
            this.lastError = null;
            return tmp;
        }
    }

    static class WSEWSerializationTask
    extends RecursiveAction {
        final transient ArrayBlockingQueue<Map<Integer, Column>> writeQueue;
        final transient ArrayBlockingQueue<EWRawBlock> serialQueue;
        final int batchSize;
        private transient TableInfo tableInfo = TableInfo.getEmptyTableInfo();
        private final HashMap<ByteBuffer, TableInfo> tableInfoMap = new HashMap();
        private final transient StmtInfo stmtInfo;
        private final AtomicBoolean running;
        private final boolean isProgressive;

        public WSEWSerializationTask(EWBackendThreadInfo ewBackendThreadInfo, int batchSize, StmtInfo stmtInfo, boolean isProgressive) {
            this.writeQueue = ewBackendThreadInfo.getWriteQueue();
            this.serialQueue = ewBackendThreadInfo.getSerialQueue();
            this.running = ewBackendThreadInfo.getSerializeRunning();
            this.batchSize = batchSize;
            this.stmtInfo = stmtInfo;
            this.isProgressive = isProgressive;
        }

        public void processOneRow(Map<Integer, Column> colOrderedMap) throws SQLException {
            if (this.isTableInfoEmpty()) {
                AbsWSPreparedStatement.bindAllToTableInfo(this.stmtInfo.getFields(), colOrderedMap, this.tableInfo);
            } else {
                ByteBuffer tempTableName;
                Object tbname = colOrderedMap.get(this.stmtInfo.getToBeBindTableNameIndex() + 1).getData();
                if (tbname instanceof String) {
                    tempTableName = ByteBuffer.wrap(((String)tbname).getBytes());
                } else if (tbname instanceof byte[]) {
                    tempTableName = ByteBuffer.wrap((byte[])tbname);
                } else {
                    throw TSDBError.createSQLException(8963, "table name must be string or binary");
                }
                if (this.tableInfo.getTableName().equals(tempTableName)) {
                    this.bindColToTableInfo(this.tableInfo, colOrderedMap);
                } else if (this.tableInfoMap.containsKey(tempTableName)) {
                    TableInfo tbInfo = this.tableInfoMap.get(tempTableName);
                    this.bindColToTableInfo(tbInfo, colOrderedMap);
                } else {
                    this.tableInfoMap.put(this.tableInfo.getTableName(), this.tableInfo);
                    this.tableInfo = TableInfo.getEmptyTableInfo();
                    AbsWSPreparedStatement.bindAllToTableInfo(this.stmtInfo.getFields(), colOrderedMap, this.tableInfo);
                }
            }
        }

        private boolean isTableInfoEmpty() {
            return this.tableInfo.getTableName().capacity() == 0 && this.tableInfo.getTagInfo().isEmpty() && this.tableInfo.getDataList().isEmpty();
        }

        private void bindColToTableInfo(TableInfo tableInfo, Map<Integer, Column> colOrderedMap) {
            for (ColumnInfo columnInfo : tableInfo.getDataList()) {
                columnInfo.add(colOrderedMap.get(columnInfo.getIndex()).getData());
            }
        }

        private void putEWRawBlock(EWRawBlock ewRawBlock) {
            try {
                this.serialQueue.put(ewRawBlock);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void compute() {
            SQLException lastError = null;
            while (this.writeQueue.size() >= this.batchSize && this.serialQueue.remainingCapacity() > 0) {
                for (int i = 0; i < this.batchSize; ++i) {
                    try {
                        Map<Integer, Column> map = this.writeQueue.take();
                        this.processOneRow(map);
                        continue;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        continue;
                    }
                    catch (SQLException e) {
                        lastError = e;
                    }
                }
                if (!this.isTableInfoEmpty()) {
                    this.tableInfoMap.put(this.tableInfo.getTableName(), this.tableInfo);
                }
                if (lastError != null) {
                    this.tableInfo = TableInfo.getEmptyTableInfo();
                    this.tableInfoMap.clear();
                    this.putEWRawBlock(new EWRawBlock(null, this.batchSize, lastError));
                    break;
                }
                try {
                    long reqId = ReqId.getReqID();
                    ByteBuf rawBlock = SerializeBlock.getStmt2BindBlock(this.tableInfoMap, this.stmtInfo, reqId);
                    this.putEWRawBlock(new EWRawBlock(rawBlock, this.batchSize, lastError));
                    log.trace("buffer allocated: {}", (Object)Integer.toHexString(System.identityHashCode(rawBlock)));
                }
                catch (SQLException e) {
                    lastError = e;
                    this.putEWRawBlock(new EWRawBlock(null, this.batchSize, lastError));
                    log.error("Error in serialize data to block, stmt id: {}", (Object)this.stmtInfo.getStmtId(), (Object)e);
                    break;
                }
                catch (Exception e) {
                    lastError = new SQLException("Error in serialize data to block, stmt id: " + this.stmtInfo.getStmtId(), e);
                    this.putEWRawBlock(new EWRawBlock(null, this.batchSize, lastError));
                    log.error("Error in serialize data to block, stmt id: {}", (Object)this.stmtInfo.getStmtId(), (Object)e);
                    break;
                }
                finally {
                    this.tableInfo = TableInfo.getEmptyTableInfo();
                    this.tableInfoMap.clear();
                }
                if (!this.isProgressive) continue;
            }
            this.running.set(false);
        }
    }
}

