/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.neo4j.source;

import java.io.IOException;
import java.lang.reflect.Array;
import java.util.List;
import java.util.Objects;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
import org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Query;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.value.LossyCoercion;

public class Neo4jSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private final SingleSplitReaderContext context;
    private final Neo4jSourceQueryInfo neo4jSourceQueryInfo;
    private final SeaTunnelRowType rowType;
    private final Driver driver;
    private Session session;

    public Neo4jSourceReader(SingleSplitReaderContext context, Neo4jSourceQueryInfo neo4jSourceQueryInfo, SeaTunnelRowType rowType) {
        this.context = context;
        this.neo4jSourceQueryInfo = neo4jSourceQueryInfo;
        this.driver = neo4jSourceQueryInfo.getDriverBuilder().build();
        this.rowType = rowType;
    }

    public void open() throws Exception {
        this.session = this.driver.session(SessionConfig.forDatabase(this.neo4jSourceQueryInfo.getDriverBuilder().getDatabase()));
    }

    public void close() throws IOException {
        this.session.close();
        this.driver.close();
    }

    @Override
    public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
        Query query = new Query(this.neo4jSourceQueryInfo.getQuery());
        this.session.readTransaction(tx -> {
            Result result = tx.run(query);
            result.stream().forEach(row -> {
                Object[] fields = new Object[this.rowType.getTotalFields()];
                for (int i = 0; i < this.rowType.getTotalFields(); ++i) {
                    String fieldName = this.rowType.getFieldName(i);
                    SeaTunnelDataType fieldType = this.rowType.getFieldType(i);
                    Value value = row.get(fieldName);
                    fields[i] = Neo4jSourceReader.convertType(fieldType, value);
                }
                output.collect((Object)new SeaTunnelRow(fields));
            });
            return null;
        });
        this.context.signalNoMoreElement();
    }

    public static Object convertType(SeaTunnelDataType<?> dataType, Value value) throws Neo4jConnectorException, LossyCoercion {
        Objects.requireNonNull(dataType);
        Objects.requireNonNull(value);
        switch (dataType.getSqlType()) {
            case STRING: {
                return value.asString();
            }
            case BOOLEAN: {
                return value.asBoolean();
            }
            case BIGINT: {
                return value.asLong();
            }
            case DOUBLE: {
                return value.asDouble();
            }
            case NULL: {
                return null;
            }
            case BYTES: {
                return value.asByteArray();
            }
            case DATE: {
                return value.asLocalDate();
            }
            case TIME: {
                return value.asLocalTime();
            }
            case TIMESTAMP: {
                return value.asLocalDateTime();
            }
            case MAP: {
                if (!((MapType)dataType).getKeyType().equals(BasicType.STRING_TYPE)) {
                    throw new Neo4jConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Key Type of MapType must String type");
                }
                SeaTunnelDataType valueType = ((MapType)dataType).getValueType();
                return value.asMap(v -> valueType.getTypeClass().cast(Neo4jSourceReader.convertType(valueType, v)));
            }
            case ARRAY: {
                SeaTunnelDataType elementType = ((ArrayType)dataType).getElementType();
                List<Object> list = value.asList(v -> elementType.getTypeClass().cast(Neo4jSourceReader.convertType(elementType, v)));
                Object array = Array.newInstance(elementType.getTypeClass(), list.size());
                for (int i = 0; i < list.size(); ++i) {
                    Array.set(array, i, list.get(i));
                }
                return array;
            }
            case INT: {
                return value.asInt();
            }
            case FLOAT: {
                return Float.valueOf(value.asFloat());
            }
        }
        throw new Neo4jConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "not supported data type: " + dataType);
    }
}

