/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.aws.s3;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3RequestUtil;
import org.apache.iceberg.aws.s3.S3URI;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Predicates;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.utils.BinaryUtils;

class S3OutputStream
extends PositionOutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);
    private static final String digestAlgorithm = "MD5";
    private static volatile ExecutorService executorService;
    private final StackTraceElement[] createStack;
    private final S3Client s3;
    private final S3URI location;
    private final AwsProperties awsProperties;
    private final Set<Tag> writeTags;
    private CountingOutputStream stream;
    private final List<FileAndDigest> stagingFiles = Lists.newArrayList();
    private final File stagingDirectory;
    private File currentStagingFile;
    private String multipartUploadId;
    private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
    private final int multiPartSize;
    private final int multiPartThresholdSize;
    private final boolean isChecksumEnabled;
    private final MessageDigest completeMessageDigest;
    private MessageDigest currentPartMessageDigest;
    private final Counter writeBytes;
    private final Counter writeOperations;
    private long pos = 0L;
    private boolean closed = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics) throws IOException {
        if (executorService == null) {
            Class<S3OutputStream> clazz = S3OutputStream.class;
            // MONITORENTER : org.apache.iceberg.aws.s3.S3OutputStream.class
            if (executorService == null) {
                executorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)((ThreadPoolExecutor)Executors.newFixedThreadPool(awsProperties.s3FileIoMultipartUploadThreads(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iceberg-s3fileio-upload-%d").build())));
            }
            // MONITOREXIT : clazz
        }
        this.s3 = s3;
        this.location = location;
        this.awsProperties = awsProperties;
        this.writeTags = awsProperties.s3WriteTags();
        this.createStack = Thread.currentThread().getStackTrace();
        this.multiPartSize = awsProperties.s3FileIoMultiPartSize();
        this.multiPartThresholdSize = (int)((double)this.multiPartSize * awsProperties.s3FileIOMultipartThresholdFactor());
        this.stagingDirectory = new File(awsProperties.s3fileIoStagingDirectory());
        this.isChecksumEnabled = awsProperties.isS3ChecksumEnabled();
        try {
            this.completeMessageDigest = this.isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
        }
        catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Failed to create message digest needed for s3 checksum checks", e);
        }
        this.writeBytes = metrics.counter("write.bytes", MetricsContext.Unit.BYTES);
        this.writeOperations = metrics.counter("write.operations");
        this.newStream();
    }

    public long getPos() {
        return this.pos;
    }

    public void flush() throws IOException {
        this.stream.flush();
    }

    public void write(int b) throws IOException {
        if (this.stream.getCount() >= (long)this.multiPartSize) {
            this.newStream();
            this.uploadParts();
        }
        this.stream.write(b);
        ++this.pos;
        this.writeBytes.increment();
        this.writeOperations.increment();
        if (this.multipartUploadId == null && this.pos >= (long)this.multiPartThresholdSize) {
            this.initializeMultiPartUpload();
            this.uploadParts();
        }
    }

    public void write(byte[] b, int off, int len) throws IOException {
        int remaining = len;
        int relativeOffset = off;
        while (this.stream.getCount() + (long)remaining > (long)this.multiPartSize) {
            int writeSize = this.multiPartSize - (int)this.stream.getCount();
            this.stream.write(b, relativeOffset, writeSize);
            remaining -= writeSize;
            relativeOffset += writeSize;
            this.newStream();
            this.uploadParts();
        }
        this.stream.write(b, relativeOffset, remaining);
        this.pos += (long)len;
        this.writeBytes.increment(len);
        this.writeOperations.increment();
        if (this.multipartUploadId == null && this.pos >= (long)this.multiPartThresholdSize) {
            this.initializeMultiPartUpload();
            this.uploadParts();
        }
    }

    private void newStream() throws IOException {
        if (this.stream != null) {
            this.stream.close();
        }
        this.createStagingDirectoryIfNotExists();
        this.currentStagingFile = File.createTempFile("s3fileio-", ".tmp", this.stagingDirectory);
        this.currentStagingFile.deleteOnExit();
        try {
            this.currentPartMessageDigest = this.isChecksumEnabled ? MessageDigest.getInstance(digestAlgorithm) : null;
        }
        catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Failed to create message digest needed for s3 checksum checks.", e);
        }
        this.stagingFiles.add(new FileAndDigest(this.currentStagingFile, this.currentPartMessageDigest));
        if (this.isChecksumEnabled) {
            DigestOutputStream digestOutputStream = this.multipartUploadId != null ? new DigestOutputStream(new BufferedOutputStream(new FileOutputStream(this.currentStagingFile)), this.currentPartMessageDigest) : new DigestOutputStream(new DigestOutputStream(new BufferedOutputStream(new FileOutputStream(this.currentStagingFile)), this.currentPartMessageDigest), this.completeMessageDigest);
            this.stream = new CountingOutputStream((OutputStream)digestOutputStream);
        } else {
            this.stream = new CountingOutputStream((OutputStream)new BufferedOutputStream(new FileOutputStream(this.currentStagingFile)));
        }
    }

    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        super.close();
        this.closed = true;
        try {
            this.stream.close();
            this.completeUploads();
        }
        finally {
            this.cleanUpStagingFiles();
        }
    }

    private void initializeMultiPartUpload() {
        CreateMultipartUploadRequest.Builder requestBuilder = CreateMultipartUploadRequest.builder().bucket(this.location.bucket()).key(this.location.key());
        if (this.writeTags != null && !this.writeTags.isEmpty()) {
            requestBuilder.tagging((Tagging)Tagging.builder().tagSet(this.writeTags).build());
        }
        S3RequestUtil.configureEncryption(this.awsProperties, requestBuilder);
        S3RequestUtil.configurePermission(this.awsProperties, requestBuilder);
        this.multipartUploadId = this.s3.createMultipartUpload((CreateMultipartUploadRequest)requestBuilder.build()).uploadId();
    }

    private void uploadParts() {
        if (this.multipartUploadId == null) {
            return;
        }
        this.stagingFiles.stream().filter(f -> this.closed || !f.file().equals(this.currentStagingFile)).filter((Predicate<FileAndDigest>)Predicates.not(f -> this.multiPartMap.containsKey(f.file()))).forEach(fileAndDigest -> {
            File f = fileAndDigest.file();
            UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder().bucket(this.location.bucket()).key(this.location.key()).uploadId(this.multipartUploadId).partNumber(Integer.valueOf(this.stagingFiles.indexOf(fileAndDigest) + 1)).contentLength(Long.valueOf(f.length()));
            if (fileAndDigest.hasDigest()) {
                requestBuilder.contentMD5(BinaryUtils.toBase64((byte[])fileAndDigest.digest()));
            }
            S3RequestUtil.configureEncryption(this.awsProperties, requestBuilder);
            UploadPartRequest uploadRequest = (UploadPartRequest)requestBuilder.build();
            CompletionStage future = CompletableFuture.supplyAsync(() -> {
                UploadPartResponse response = this.s3.uploadPart(uploadRequest, RequestBody.fromFile((File)f));
                return (CompletedPart)CompletedPart.builder().eTag(response.eTag()).partNumber(uploadRequest.partNumber()).build();
            }, executorService).whenComplete((result, thrown) -> {
                try {
                    Files.deleteIfExists(f.toPath());
                }
                catch (IOException e) {
                    LOG.warn("Failed to delete staging file: {}", (Object)f, (Object)e);
                }
                if (thrown != null) {
                    LOG.error("Failed to upload part: {}", (Object)uploadRequest, thrown);
                }
            });
            this.multiPartMap.put(f, (CompletableFuture<CompletedPart>)future);
        });
    }

    private void completeMultiPartUpload() {
        List completedParts;
        Preconditions.checkState((boolean)this.closed, (Object)("Complete upload called on open stream: " + this.location));
        try {
            completedParts = this.multiPartMap.values().stream().map(CompletableFuture::join).sorted(Comparator.comparing(CompletedPart::partNumber)).collect(Collectors.toList());
        }
        catch (CompletionException ce) {
            this.multiPartMap.values().forEach(c -> c.cancel(true));
            this.abortUpload();
            throw ce;
        }
        CompleteMultipartUploadRequest request = (CompleteMultipartUploadRequest)CompleteMultipartUploadRequest.builder().bucket(this.location.bucket()).key(this.location.key()).uploadId(this.multipartUploadId).multipartUpload((CompletedMultipartUpload)CompletedMultipartUpload.builder().parts(completedParts).build()).build();
        Tasks.foreach((Object[])new CompleteMultipartUploadRequest[]{request}).noRetry().onFailure((r, thrown) -> {
            LOG.error("Failed to complete multipart upload request: {}", r, (Object)thrown);
            this.abortUpload();
        }).throwFailureWhenFinished().run(arg_0 -> ((S3Client)this.s3).completeMultipartUpload(arg_0));
    }

    private void abortUpload() {
        if (this.multipartUploadId != null) {
            try {
                this.s3.abortMultipartUpload((AbortMultipartUploadRequest)AbortMultipartUploadRequest.builder().bucket(this.location.bucket()).key(this.location.key()).uploadId(this.multipartUploadId).build());
            }
            finally {
                this.cleanUpStagingFiles();
            }
        }
    }

    private void cleanUpStagingFiles() {
        Tasks.foreach(this.stagingFiles.stream().map(FileAndDigest::file)).suppressFailureWhenFinished().onFailure((file, thrown) -> LOG.warn("Failed to delete staging file: {}", file, (Object)thrown)).run(File::delete);
    }

    private void completeUploads() {
        if (this.multipartUploadId == null) {
            long contentLength = this.stagingFiles.stream().map(FileAndDigest::file).mapToLong(File::length).sum();
            ContentStreamProvider contentProvider = () -> new BufferedInputStream(this.stagingFiles.stream().map(FileAndDigest::file).map(S3OutputStream::uncheckedInputStream).reduce(SequenceInputStream::new).orElseGet(() -> new ByteArrayInputStream(new byte[0])));
            PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder().bucket(this.location.bucket()).key(this.location.key());
            if (this.writeTags != null && !this.writeTags.isEmpty()) {
                requestBuilder.tagging((Tagging)Tagging.builder().tagSet(this.writeTags).build());
            }
            if (this.isChecksumEnabled) {
                requestBuilder.contentMD5(BinaryUtils.toBase64((byte[])this.completeMessageDigest.digest()));
            }
            S3RequestUtil.configureEncryption(this.awsProperties, requestBuilder);
            S3RequestUtil.configurePermission(this.awsProperties, requestBuilder);
            this.s3.putObject((PutObjectRequest)requestBuilder.build(), RequestBody.fromContentProvider((ContentStreamProvider)contentProvider, (long)contentLength, (String)"application/octet-stream"));
        } else {
            this.uploadParts();
            this.completeMultiPartUpload();
        }
    }

    private static InputStream uncheckedInputStream(File file) {
        try {
            return new FileInputStream(file);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void createStagingDirectoryIfNotExists() throws IOException, SecurityException {
        if (!this.stagingDirectory.exists()) {
            LOG.info("Staging directory does not exist, trying to create one: {}", (Object)this.stagingDirectory.getAbsolutePath());
            boolean createdStagingDirectory = this.stagingDirectory.mkdirs();
            if (createdStagingDirectory) {
                LOG.info("Successfully created staging directory: {}", (Object)this.stagingDirectory.getAbsolutePath());
            } else if (this.stagingDirectory.exists()) {
                LOG.info("Successfully created staging directory by another process: {}", (Object)this.stagingDirectory.getAbsolutePath());
            } else {
                throw new IOException("Failed to create staging directory due to some unknown reason: " + this.stagingDirectory.getAbsolutePath());
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (!this.closed) {
            this.close();
            String trace = Joiner.on((String)"\n\t").join((Object[])Arrays.copyOfRange(this.createStack, 1, this.createStack.length));
            LOG.warn("Unclosed output stream created by:\n\t{}", (Object)trace);
        }
    }

    private static class FileAndDigest {
        private final File file;
        private final MessageDigest digest;

        FileAndDigest(File file, MessageDigest digest) {
            this.file = file;
            this.digest = digest;
        }

        File file() {
            return this.file;
        }

        byte[] digest() {
            return this.digest.digest();
        }

        public boolean hasDigest() {
            return this.digest != null;
        }
    }
}

