/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.table;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;

@Internal
public class JdbcDynamicTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    public static final String IDENTIFIER = "jdbc";
    public static final ConfigOption<String> URL = ConfigOptions.key((String)"url").stringType().noDefaultValue().withDescription("the jdbc database url.");
    public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key((String)"table-name").stringType().noDefaultValue().withDescription("the jdbc table name.");
    public static final ConfigOption<String> USERNAME = ConfigOptions.key((String)"username").stringType().noDefaultValue().withDescription("the jdbc user name.");
    public static final ConfigOption<String> PASSWORD = ConfigOptions.key((String)"password").stringType().noDefaultValue().withDescription("the jdbc password.");
    private static final ConfigOption<String> DRIVER = ConfigOptions.key((String)"driver").stringType().noDefaultValue().withDescription("the class name of the JDBC driver to use to connect to this URL. If not set, it will automatically be derived from the URL.");
    private static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions.key((String)"scan.partition.column").stringType().noDefaultValue().withDescription("the column name used for partitioning the input.");
    private static final ConfigOption<Integer> SCAN_PARTITION_NUM = ConfigOptions.key((String)"scan.partition.num").intType().noDefaultValue().withDescription("the number of partitions.");
    private static final ConfigOption<Long> SCAN_PARTITION_LOWER_BOUND = ConfigOptions.key((String)"scan.partition.lower-bound").longType().noDefaultValue().withDescription("the smallest value of the first partition.");
    private static final ConfigOption<Long> SCAN_PARTITION_UPPER_BOUND = ConfigOptions.key((String)"scan.partition.upper-bound").longType().noDefaultValue().withDescription("the largest value of the last partition.");
    private static final ConfigOption<Integer> SCAN_FETCH_SIZE = ConfigOptions.key((String)"scan.fetch-size").intType().defaultValue((Object)0).withDescription("gives the reader a hint as to the number of rows that should be fetched, from the database when reading per round trip. If the value specified is zero, then the hint is ignored. The default value is zero.");
    private static final ConfigOption<Boolean> SCAN_AUTO_COMMIT = ConfigOptions.key((String)"scan.auto-commit").booleanType().defaultValue((Object)true).withDescription("sets whether the driver is in auto-commit mode. The default value is true, per the JDBC spec.");
    private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key((String)"lookup.cache.max-rows").longType().defaultValue((Object)-1L).withDescription("the max number of rows of lookup cache, over this value, the oldest rows will be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is specified. Cache is not enabled as default.");
    private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key((String)"lookup.cache.ttl").durationType().defaultValue((Object)Duration.ofSeconds(10L)).withDescription("the cache time to live.");
    private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key((String)"lookup.max-retries").intType().defaultValue((Object)3).withDescription("the max retry times if lookup database failed.");
    private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key((String)"sink.buffer-flush.max-rows").intType().defaultValue((Object)100).withDescription("the flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100.");
    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key((String)"sink.buffer-flush.interval").durationType().defaultValue((Object)Duration.ofSeconds(1L)).withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s.");
    private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key((String)"sink.max-retries").intType().defaultValue((Object)3).withDescription("the max retry times if writing records to database failed.");

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        this.validateConfigOptions(config);
        JdbcOptions jdbcOptions = this.getJdbcOptions(config);
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)context.getCatalogTable().getSchema());
        return new JdbcDynamicTableSink(jdbcOptions, this.getJdbcExecutionOptions(config), this.getJdbcDmlOptions(jdbcOptions, physicalSchema), physicalSchema);
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ReadableConfig config = helper.getOptions();
        helper.validate();
        this.validateConfigOptions(config);
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)context.getCatalogTable().getSchema());
        return new JdbcDynamicTableSource(this.getJdbcOptions(helper.getOptions()), this.getJdbcReadOptions(helper.getOptions()), this.getJdbcLookupOptions(helper.getOptions()), physicalSchema);
    }

    private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
        String url = (String)readableConfig.get(URL);
        JdbcOptions.Builder builder = JdbcOptions.builder().setDBUrl(url).setTableName((String)readableConfig.get(TABLE_NAME)).setDialect(JdbcDialects.get(url).get());
        readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
        return builder.build();
    }

    private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
        Optional partitionColumnName = readableConfig.getOptional(SCAN_PARTITION_COLUMN);
        JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
        if (partitionColumnName.isPresent()) {
            builder.setPartitionColumnName((String)partitionColumnName.get());
            builder.setPartitionLowerBound((Long)readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
            builder.setPartitionUpperBound((Long)readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
            builder.setNumPartitions((Integer)readableConfig.get(SCAN_PARTITION_NUM));
        }
        readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
        builder.setAutoCommit((Boolean)readableConfig.get(SCAN_AUTO_COMMIT));
        return builder.build();
    }

    private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig readableConfig) {
        return new JdbcLookupOptions((Long)readableConfig.get(LOOKUP_CACHE_MAX_ROWS), ((Duration)readableConfig.get(LOOKUP_CACHE_TTL)).toMillis(), (Integer)readableConfig.get(LOOKUP_MAX_RETRIES));
    }

    private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
        JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
        builder.withBatchSize((Integer)config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
        builder.withBatchIntervalMs(((Duration)config.get(SINK_BUFFER_FLUSH_INTERVAL)).toMillis());
        builder.withMaxRetries((Integer)config.get(SINK_MAX_RETRIES));
        return builder.build();
    }

    private JdbcDmlOptions getJdbcDmlOptions(JdbcOptions jdbcOptions, TableSchema schema) {
        String[] keyFields = schema.getPrimaryKey().map(pk -> pk.getColumns().toArray(new String[0])).orElse(null);
        return JdbcDmlOptions.builder().withTableName(jdbcOptions.getTableName()).withDialect(jdbcOptions.getDialect()).withFieldNames(schema.getFieldNames()).withKeyFields(keyFields).build();
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet requiredOptions = new HashSet();
        requiredOptions.add(URL);
        requiredOptions.add(TABLE_NAME);
        return requiredOptions;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet optionalOptions = new HashSet();
        optionalOptions.add(DRIVER);
        optionalOptions.add(USERNAME);
        optionalOptions.add(PASSWORD);
        optionalOptions.add(SCAN_PARTITION_COLUMN);
        optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
        optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
        optionalOptions.add(SCAN_PARTITION_NUM);
        optionalOptions.add(SCAN_FETCH_SIZE);
        optionalOptions.add(SCAN_AUTO_COMMIT);
        optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
        optionalOptions.add(LOOKUP_CACHE_TTL);
        optionalOptions.add(LOOKUP_MAX_RETRIES);
        optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
        optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
        optionalOptions.add(SINK_MAX_RETRIES);
        return optionalOptions;
    }

    private void validateConfigOptions(ReadableConfig config) {
        long upperBound;
        long lowerBound;
        String jdbcUrl = (String)config.get(URL);
        Optional<JdbcDialect> dialect = JdbcDialects.get(jdbcUrl);
        Preconditions.checkState((boolean)dialect.isPresent(), (Object)("Cannot handle such jdbc url: " + jdbcUrl));
        this.checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD});
        this.checkAllOrNone(config, new ConfigOption[]{SCAN_PARTITION_COLUMN, SCAN_PARTITION_NUM, SCAN_PARTITION_LOWER_BOUND, SCAN_PARTITION_UPPER_BOUND});
        if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent() && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent() && (lowerBound = ((Long)config.get(SCAN_PARTITION_LOWER_BOUND)).longValue()) > (upperBound = ((Long)config.get(SCAN_PARTITION_UPPER_BOUND)).longValue())) {
            throw new IllegalArgumentException(String.format("'%s'='%s' must not be larger than '%s'='%s'.", SCAN_PARTITION_LOWER_BOUND.key(), lowerBound, SCAN_PARTITION_UPPER_BOUND.key(), upperBound));
        }
        this.checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
        if ((Integer)config.get(LOOKUP_MAX_RETRIES) < 0) {
            throw new IllegalArgumentException(String.format("The value of '%s' option shouldn't be negative, but is %s.", LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
        }
        if ((Integer)config.get(SINK_MAX_RETRIES) < 0) {
            throw new IllegalArgumentException(String.format("The value of '%s' option shouldn't be negative, but is %s.", SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
        }
    }

    private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
        int presentCount = 0;
        for (ConfigOption<?> configOption : configOptions) {
            if (!config.getOptional(configOption).isPresent()) continue;
            ++presentCount;
        }
        CharSequence[] propertyNames = (String[])Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
        Preconditions.checkArgument((configOptions.length == presentCount || presentCount == 0 ? 1 : 0) != 0, (Object)("Either all or none of the following options should be provided:\n" + String.join((CharSequence)"\n", propertyNames)));
    }
}

