/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;

import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader;
import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={SeaTunnelSource.class})
public class InfluxDBSource
implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, InfluxDBSourceState>,
SupportParallelism,
SupportColumnProjection {
    private static final Logger log = LoggerFactory.getLogger(InfluxDBSource.class);
    private SeaTunnelRowType typeInfo;
    private SourceConfig sourceConfig;
    private List<Integer> columnsIndexList;
    private static final String QUERY_LIMIT = " limit 1";

    public String getPluginName() {
        return "InfluxDB";
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{SourceConfig.SQL.key(), TableSchemaOptions.SCHEMA.key()});
        if (!result.isSuccess()) {
            throw new InfluxdbConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, result.getMsg()));
        }
        try {
            this.sourceConfig = SourceConfig.loadConfig(config);
            this.typeInfo = CatalogTableUtil.buildWithConfig((Config)config).getSeaTunnelRowType();
            this.columnsIndexList = this.initColumnsIndex(InfluxDBClient.getInfluxDB(this.sourceConfig));
        }
        catch (Exception e) {
            throw new InfluxdbConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SOURCE, ExceptionUtils.getMessage((Throwable)e)));
        }
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.typeInfo;
    }

    public SourceReader createReader(SourceReader.Context readerContext) throws Exception {
        return new InfluxdbSourceReader(this.sourceConfig, readerContext, this.typeInfo, this.columnsIndexList);
    }

    public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception {
        return new InfluxDBSourceSplitEnumerator((SourceSplitEnumerator.Context<InfluxDBSourceSplit>)enumeratorContext, this.sourceConfig);
    }

    public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
        return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, this.sourceConfig);
    }

    private List<Integer> initColumnsIndex(InfluxDB influxdb) {
        String sql = this.sourceConfig.getSql();
        String query = sql + QUERY_LIMIT;
        int start = InfluxDBSource.containTzFunction(sql.toLowerCase());
        if (start > 0) {
            StringBuilder tmpSql = new StringBuilder(sql);
            tmpSql.insert(start - 1, QUERY_LIMIT).append(" ");
            query = tmpSql.toString();
        }
        try {
            QueryResult queryResult = influxdb.query(new Query(query, this.sourceConfig.getDatabase()));
            List<QueryResult.Series> serieList = queryResult.getResults().get(0).getSeries();
            ArrayList<String> fieldNames = new ArrayList<String>(serieList.get(0).getColumns());
            return Arrays.stream(this.typeInfo.getFieldNames()).map(fieldNames::indexOf).collect(Collectors.toList());
        }
        catch (Exception e) {
            throw new InfluxdbConnectorException(InfluxdbConnectorErrorCode.GET_COLUMN_INDEX_FAILED, "Get column index of query result exception", e);
        }
    }

    private static int containTzFunction(String sql) {
        Pattern pattern = Pattern.compile("tz\\(.*\\)");
        Matcher matcher = pattern.matcher(sql);
        if (matcher.find()) {
            int start = matcher.start();
            return start;
        }
        return -1;
    }
}

