/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connectors.hive;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HadoopFileSystemFactory;
import org.apache.flink.connectors.hive.HiveOptions;
import org.apache.flink.connectors.hive.HiveRowDataPartitionComputer;
import org.apache.flink.connectors.hive.HiveRowPartitionComputer;
import org.apache.flink.connectors.hive.HiveTableMetaStoreFactory;
import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.hive.shaded.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.orc.OrcSplitReaderUtil;
import org.apache.flink.orc.writer.ThreadLocalClassLoaderConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.filesystem.FileSystemFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.OutputFormatFactory;
import org.apache.flink.table.filesystem.PartitionComputer;
import org.apache.flink.table.filesystem.RowDataPartitionComputer;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
import org.apache.flink.table.filesystem.stream.StreamingSink;
import org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.orc.TypeDescription;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveTableSink
implements DynamicTableSink,
SupportsPartitioning,
SupportsOverwrite {
    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);
    private final ReadableConfig flinkConf;
    private final JobConf jobConf;
    private final CatalogTable catalogTable;
    private final ObjectIdentifier identifier;
    private final TableSchema tableSchema;
    private final String hiveVersion;
    private final HiveShim hiveShim;
    private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap();
    private boolean overwrite = false;
    private boolean dynamicGrouping = false;
    @Nullable
    private final Integer configuredParallelism;

    public HiveTableSink(ReadableConfig flinkConf, JobConf jobConf, ObjectIdentifier identifier, CatalogTable table, @Nullable Integer configuredParallelism) {
        this.flinkConf = flinkConf;
        this.jobConf = jobConf;
        this.identifier = identifier;
        this.catalogTable = table;
        this.hiveVersion = (String)Preconditions.checkNotNull((Object)jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()), (String)"Hive version is not defined");
        this.hiveShim = HiveShimLoader.loadHiveShim(this.hiveVersion);
        this.tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)table.getSchema());
        this.configuredParallelism = configuredParallelism;
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.tableSchema.toRowDataType());
        return dataStream -> this.consume((DataStream<RowData>)dataStream, context.isBounded(), converter);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private DataStreamSink<?> consume(DataStream<RowData> dataStream, boolean isBounded, DynamicTableSink.DataStructureConverter converter) {
        HiveTableUtil.checkAcidTable(this.catalogTable, this.identifier.toObjectPath());
        try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create((org.apache.hadoop.conf.Configuration)this.jobConf), this.hiveVersion);){
            Table table = client.getTable(this.identifier.getDatabaseName(), this.identifier.getObjectName());
            StorageDescriptor sd = table.getSd();
            Class hiveOutputFormatClz = this.hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
            boolean isCompressed = this.jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
            HiveWriterFactory writerFactory = new HiveWriterFactory(this.jobConf, hiveOutputFormatClz, sd.getSerdeInfo(), this.tableSchema, this.getPartitionKeyArray(), HiveReflectionUtils.getTableMetadata(this.hiveShim, table), this.hiveShim, isCompressed);
            String extension = Utilities.getFileExtension(this.jobConf, isCompressed, (HiveOutputFormat)hiveOutputFormatClz.newInstance());
            OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder().withPartPrefix("part-" + UUID.randomUUID().toString()).withPartSuffix(extension == null ? "" : extension);
            int parallelism = Optional.ofNullable(this.configuredParallelism).orElse(dataStream.getParallelism());
            if (isBounded) {
                OutputFileConfig fileNaming = fileNamingBuilder.build();
                DataStreamSink<Row> dataStreamSink = this.createBatchSink(dataStream, converter, sd, writerFactory, fileNaming, parallelism);
                return dataStreamSink;
            }
            if (this.overwrite) {
                throw new IllegalStateException("Streaming mode not support overwrite.");
            }
            Properties tableProps = HiveReflectionUtils.getTableMetadata(this.hiveShim, table);
            DataStreamSink<?> dataStreamSink = this.createStreamSink(dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
            return dataStreamSink;
        }
        catch (TException e) {
            throw new CatalogException("Failed to query Hive metaStore", (Throwable)e);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Failed to create staging dir", (Throwable)e);
        }
        catch (ClassNotFoundException e) {
            throw new FlinkHiveException("Failed to get output format class", e);
        }
        catch (IllegalAccessException | InstantiationException e) {
            throw new FlinkHiveException("Failed to instantiate output format instance", e);
        }
    }

    private DataStreamSink<Row> createBatchSink(DataStream<RowData> dataStream, DynamicTableSink.DataStructureConverter converter, StorageDescriptor sd, HiveWriterFactory recordWriterFactory, OutputFileConfig fileNaming, int parallelism) throws IOException {
        FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder();
        builder.setPartitionComputer((PartitionComputer)new HiveRowPartitionComputer(this.hiveShim, JobConfUtils.getDefaultPartitionName(this.jobConf), this.tableSchema.getFieldNames(), this.tableSchema.getFieldDataTypes(), this.getPartitionKeyArray()));
        builder.setDynamicGrouped(this.dynamicGrouping);
        builder.setPartitionColumns(this.getPartitionKeyArray());
        builder.setFileSystemFactory((FileSystemFactory)this.fsFactory());
        builder.setFormatFactory((OutputFormatFactory)new HiveOutputFormatFactory(recordWriterFactory));
        builder.setMetaStoreFactory((TableMetaStoreFactory)this.msFactory());
        builder.setOverwrite(this.overwrite);
        builder.setStaticPartitions(this.staticPartitionSpec);
        builder.setTempPath(new Path(this.toStagingDir(sd.getLocation(), (org.apache.hadoop.conf.Configuration)this.jobConf)));
        builder.setOutputFileConfig(fileNaming);
        return dataStream.map((MapFunction & Serializable)value -> (Row)converter.toExternal(value)).writeUsingOutputFormat((OutputFormat)builder.build()).setParallelism(parallelism);
    }

    private DataStreamSink<?> createStreamSink(DataStream<RowData> dataStream, StorageDescriptor sd, Properties tableProps, HiveWriterFactory recordWriterFactory, OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder, int parallelism) {
        DataStream writerStream;
        StreamingFileSink.BulkFormatBuilder builder;
        Configuration conf = new Configuration();
        this.catalogTable.getOptions().forEach((arg_0, arg_1) -> ((Configuration)conf).setString(arg_0, arg_1));
        String commitPolicies = conf.getString(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
        if (!this.getPartitionKeys().isEmpty() && StringUtils.isNullOrWhitespaceOnly((String)commitPolicies)) {
            throw new FlinkHiveException(String.format("Streaming write to partitioned hive table %s without providing a commit policy. Make sure to set a proper value for %s", this.identifier, FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
        }
        HiveRowDataPartitionComputer partComputer = new HiveRowDataPartitionComputer(this.hiveShim, JobConfUtils.getDefaultPartitionName(this.jobConf), this.tableSchema.getFieldNames(), this.tableSchema.getFieldDataTypes(), this.getPartitionKeyArray());
        FileSystemTableSink.TableBucketAssigner assigner = new FileSystemTableSink.TableBucketAssigner((PartitionComputer)partComputer);
        HiveRollingPolicy rollingPolicy = new HiveRollingPolicy(((MemorySize)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE)).getBytes(), ((Duration)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL)).toMillis());
        boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
        if (autoCompaction) {
            fileNamingBuilder.withPartPrefix(CompactOperator.convertToUncompacted((String)fileNamingBuilder.build().getPartPrefix()));
        }
        OutputFileConfig outputFileConfig = fileNamingBuilder.build();
        Path path = new Path(sd.getLocation());
        if (((Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)).booleanValue()) {
            builder = this.bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
            LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
        } else {
            Optional<BulkWriter.Factory<RowData>> bulkFactory = this.createBulkWriterFactory(this.getPartitionKeyArray(), sd);
            if (bulkFactory.isPresent()) {
                builder = ((StreamingFileSink.DefaultBulkFormatBuilder)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)path, (BulkWriter.Factory)new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), (RowDataPartitionComputer)partComputer)).withBucketAssigner((BucketAssigner)assigner)).withRollingPolicy((CheckpointRollingPolicy)rollingPolicy)).withOutputFileConfig(outputFileConfig);
                LOG.info("Hive streaming sink: Use native parquet&orc writer.");
            } else {
                builder = this.bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
                LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
            }
        }
        long bucketCheckInterval = ((Duration)conf.get(FileSystemOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL)).toMillis();
        if (autoCompaction) {
            long compactionSize = ((MemorySize)conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE).orElse(conf.get(FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE))).getBytes();
            writerStream = StreamingSink.compactionWriter(dataStream, (long)bucketCheckInterval, (StreamingFileSink.BucketsBuilder)builder, (FileSystemFactory)this.fsFactory(), (Path)path, this.createCompactReaderFactory(sd, tableProps), (long)compactionSize, (int)parallelism);
        } else {
            writerStream = StreamingSink.writer(dataStream, (long)bucketCheckInterval, builder, (int)parallelism);
        }
        return StreamingSink.sink((DataStream)writerStream, (Path)path, (ObjectIdentifier)this.identifier, this.getPartitionKeys(), (TableMetaStoreFactory)this.msFactory(), (FileSystemFactory)this.fsFactory(), (Configuration)conf);
    }

    private CompactReader.Factory<RowData> createCompactReaderFactory(StorageDescriptor sd, Properties properties) {
        return new HiveCompactReaderFactory(sd, properties, this.jobConf, this.catalogTable, this.hiveVersion, (RowType)this.tableSchema.toRowDataType().getLogicalType(), (Boolean)this.flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
    }

    private HiveTableMetaStoreFactory msFactory() {
        return new HiveTableMetaStoreFactory(this.jobConf, this.hiveVersion, this.identifier.getDatabaseName(), this.identifier.getObjectName());
    }

    private HadoopFileSystemFactory fsFactory() {
        return new HadoopFileSystemFactory(this.jobConf);
    }

    private StreamingFileSink.BucketsBuilder<RowData, String, ? extends StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilderForMRWriter(HiveWriterFactory recordWriterFactory, StorageDescriptor sd, FileSystemTableSink.TableBucketAssigner assigner, HiveRollingPolicy rollingPolicy, OutputFileConfig outputFileConfig) {
        HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
        return ((HadoopPathBasedBulkFormatBuilder)((Object)new HadoopPathBasedBulkFormatBuilder(new org.apache.hadoop.fs.Path(sd.getLocation()), hadoopBulkFactory, (org.apache.hadoop.conf.Configuration)this.jobConf, (BucketAssigner<RowData, String>)assigner).withRollingPolicy(rollingPolicy))).withOutputFileConfig(outputFileConfig);
    }

    private Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(String[] partitionColumns, StorageDescriptor sd) {
        String serLib = sd.getSerdeInfo().getSerializationLib().toLowerCase();
        int formatFieldCount = this.tableSchema.getFieldCount() - partitionColumns.length;
        String[] formatNames = new String[formatFieldCount];
        LogicalType[] formatTypes = new LogicalType[formatFieldCount];
        for (int i = 0; i < formatFieldCount; ++i) {
            formatNames[i] = (String)this.tableSchema.getFieldName(i).get();
            formatTypes[i] = ((DataType)this.tableSchema.getFieldDataType(i).get()).getLogicalType();
        }
        RowType formatType = RowType.of((LogicalType[])formatTypes, (String[])formatNames);
        if (serLib.contains("parquet")) {
            org.apache.hadoop.conf.Configuration formatConf = new org.apache.hadoop.conf.Configuration((org.apache.hadoop.conf.Configuration)this.jobConf);
            sd.getSerdeInfo().getParameters().forEach((arg_0, arg_1) -> ((org.apache.hadoop.conf.Configuration)formatConf).set(arg_0, arg_1));
            return Optional.of(ParquetRowDataBuilder.createWriterFactory(formatType, formatConf, this.hiveVersion.startsWith("3.")));
        }
        if (serLib.contains("orc")) {
            ThreadLocalClassLoaderConfiguration formatConf = new ThreadLocalClassLoaderConfiguration((org.apache.hadoop.conf.Configuration)this.jobConf);
            sd.getSerdeInfo().getParameters().forEach((arg_0, arg_1) -> ((org.apache.hadoop.conf.Configuration)formatConf).set(arg_0, arg_1));
            TypeDescription typeDescription = OrcSplitReaderUtil.logicalTypeToOrcType((LogicalType)formatType);
            return Optional.of(this.hiveShim.createOrcBulkWriterFactory(formatConf, typeDescription.toString(), formatTypes));
        }
        return Optional.empty();
    }

    public boolean requiresPartitionGrouping(boolean supportsGrouping) {
        this.dynamicGrouping = supportsGrouping;
        return supportsGrouping;
    }

    private String toStagingDir(String finalDir, org.apache.hadoop.conf.Configuration conf) throws IOException {
        org.apache.hadoop.fs.Path path;
        FileSystem fs;
        String res = finalDir;
        if (!finalDir.endsWith("/")) {
            res = res + "/";
        }
        Preconditions.checkState(((fs = (path = new org.apache.hadoop.fs.Path(res = res + ".staging_" + System.currentTimeMillis())).getFileSystem(conf)).exists(path) || fs.mkdirs(path) ? 1 : 0) != 0, (Object)("Failed to create staging dir " + path));
        fs.deleteOnExit(path);
        return res;
    }

    private List<String> getPartitionKeys() {
        return this.catalogTable.getPartitionKeys();
    }

    private String[] getPartitionKeyArray() {
        return this.getPartitionKeys().toArray(new String[0]);
    }

    public void applyStaticPartition(Map<String, String> partition) {
        this.staticPartitionSpec = new LinkedHashMap();
        for (String partitionCol : this.getPartitionKeys()) {
            if (!partition.containsKey(partitionCol)) continue;
            this.staticPartitionSpec.put(partitionCol, partition.get(partitionCol));
        }
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.insertOnly();
    }

    public DynamicTableSink copy() {
        HiveTableSink sink = new HiveTableSink(this.flinkConf, this.jobConf, this.identifier, this.catalogTable, this.configuredParallelism);
        sink.staticPartitionSpec = this.staticPartitionSpec;
        sink.overwrite = this.overwrite;
        sink.dynamicGrouping = this.dynamicGrouping;
        return sink;
    }

    public String asSummaryString() {
        return "HiveSink";
    }

    private static class HiveRollingPolicy
    extends CheckpointRollingPolicy<RowData, String> {
        private final long rollingFileSize;
        private final long rollingTimeInterval;

        private HiveRollingPolicy(long rollingFileSize, long rollingTimeInterval) {
            Preconditions.checkArgument((rollingFileSize > 0L ? 1 : 0) != 0);
            Preconditions.checkArgument((rollingTimeInterval > 0L ? 1 : 0) != 0);
            this.rollingFileSize = rollingFileSize;
            this.rollingTimeInterval = rollingTimeInterval;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
            return true;
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
            return false;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
            try {
                return currentTime - partFileState.getCreationTime() >= this.rollingTimeInterval || partFileState.getSize() > this.rollingFileSize;
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }
}

