/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import java.io.File;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.VariablesSubstitute;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWriteStrategy
implements WriteStrategy {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final FileSinkConfig fileSinkConfig;
    protected final CompressFormat compressFormat;
    protected final List<Integer> sinkColumnsIndexInRow;
    protected String jobId;
    protected int subTaskIndex;
    protected HadoopConf hadoopConf;
    protected HadoopFileSystemProxy hadoopFileSystemProxy;
    protected String transactionId;
    protected String uuidPrefix;
    protected String transactionDirectory;
    protected LinkedHashMap<String, String> needMoveFiles;
    protected LinkedHashMap<String, String> beingWrittenFile = new LinkedHashMap();
    private LinkedHashMap<String, List<String>> partitionDirAndValuesMap;
    protected SeaTunnelRowType seaTunnelRowType;
    protected Long checkpointId = 0L;
    protected int partId = 0;
    protected int batchSize;
    protected int currentBatchSize = 0;

    public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
        this.fileSinkConfig = fileSinkConfig;
        this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
        this.batchSize = fileSinkConfig.getBatchSize();
        this.compressFormat = fileSinkConfig.getCompressFormat();
    }

    @Override
    public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIndex) {
        this.hadoopConf = conf;
        this.hadoopFileSystemProxy = new HadoopFileSystemProxy(conf);
        this.jobId = jobId;
        this.subTaskIndex = subTaskIndex;
        this.uuidPrefix = uuidPrefix;
    }

    @Override
    public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException {
        if (this.currentBatchSize >= this.batchSize) {
            this.newFilePart();
            this.currentBatchSize = 0;
        }
        ++this.currentBatchSize;
    }

    public synchronized void newFilePart() {
        ++this.partId;
        this.beingWrittenFile.clear();
        this.log.debug("new file part: {}", (Object)this.partId);
    }

    protected SeaTunnelRowType buildSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
        SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes();
        String[] fieldNames = seaTunnelRowType.getFieldNames();
        ArrayList newFieldNames = new ArrayList();
        ArrayList newFieldTypes = new ArrayList();
        sinkColumnsIndex.forEach(index -> {
            newFieldNames.add(fieldNames[index]);
            newFieldTypes.add(fieldTypes[index]);
        });
        return new SeaTunnelRowType(newFieldNames.toArray(new String[0]), newFieldTypes.toArray(new SeaTunnelDataType[0]));
    }

    @Override
    public Configuration getConfiguration(HadoopConf hadoopConf) {
        Configuration configuration = hadoopConf.toConfiguration();
        this.hadoopConf.setExtraOptionsForConfiguration(configuration);
        return configuration;
    }

    @Override
    public void setCatalogTable(CatalogTable catalogTable) {
        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
    }

    @Override
    public LinkedHashMap<String, List<String>> generatorPartitionDir(SeaTunnelRow seaTunnelRow) {
        String partitionDir;
        List<Integer> partitionFieldsIndexInRow = this.fileSinkConfig.getPartitionFieldsIndexInRow();
        LinkedHashMap<String, List<String>> partitionDirAndValuesMap = new LinkedHashMap<String, List<String>>(1);
        if (CollectionUtils.isEmpty(partitionFieldsIndexInRow)) {
            partitionDirAndValuesMap.put("NON_PARTITION", null);
            return partitionDirAndValuesMap;
        }
        List<String> partitionFieldList = this.fileSinkConfig.getPartitionFieldList();
        String partitionDirExpression = this.fileSinkConfig.getPartitionDirExpression();
        String[] keys = new String[partitionFieldList.size()];
        String[] values = new String[partitionFieldList.size()];
        for (int i = 0; i < partitionFieldList.size(); ++i) {
            keys[i] = "k" + i;
            values[i] = "v" + i;
        }
        ArrayList<String> vals = new ArrayList<String>(partitionFieldsIndexInRow.size());
        if (StringUtils.isBlank(partitionDirExpression)) {
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i < partitionFieldsIndexInRow.size(); ++i) {
                stringBuilder.append(partitionFieldList.get(i)).append("=").append(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)]);
                if (i < partitionFieldsIndexInRow.size() - 1) {
                    stringBuilder.append("/");
                }
                vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
            }
            partitionDir = stringBuilder.toString();
        } else {
            HashMap<String, String> valueMap = new HashMap<String, String>(partitionFieldList.size() * 2);
            for (int i = 0; i < partitionFieldsIndexInRow.size(); ++i) {
                valueMap.put(keys[i], partitionFieldList.get(i));
                valueMap.put(values[i], seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
                vals.add(seaTunnelRow.getFields()[partitionFieldsIndexInRow.get(i)].toString());
            }
            partitionDir = VariablesSubstitute.substitute((String)partitionDirExpression, valueMap);
        }
        partitionDirAndValuesMap.put(partitionDir, vals);
        return partitionDirAndValuesMap;
    }

    @Override
    public String generateFileName(String transactionId) {
        String fileNameExpression = this.fileSinkConfig.getFileNameExpression();
        FileFormat fileFormat = this.fileSinkConfig.getFileFormat();
        String suffix = fileFormat.getSuffix();
        suffix = this.compressFormat.getCompressCodec() + suffix;
        if (StringUtils.isBlank(fileNameExpression)) {
            return transactionId + suffix;
        }
        String timeFormat = this.fileSinkConfig.getFileNameTimeFormat();
        DateTimeFormatter df = DateTimeFormatter.ofPattern(timeFormat);
        String formattedDate = df.format(ZonedDateTime.now());
        HashMap<String, String> valuesMap = new HashMap<String, String>();
        valuesMap.put("uuid", UUID.randomUUID().toString());
        valuesMap.put("now", formattedDate);
        valuesMap.put(timeFormat, formattedDate);
        valuesMap.put("transactionId", transactionId);
        String substitute = VariablesSubstitute.substitute((String)fileNameExpression, valuesMap) + "_" + this.partId;
        return substitute + suffix;
    }

    @Override
    public Optional<FileCommitInfo> prepareCommit() {
        this.finishAndCloseFile();
        LinkedHashMap<String, String> commitMap = new LinkedHashMap<String, String>(this.needMoveFiles);
        LinkedHashMap copyMap = this.partitionDirAndValuesMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList((Collection)e.getValue()), (e1, e2) -> e1, LinkedHashMap::new));
        return Optional.of(new FileCommitInfo(commitMap, copyMap, this.transactionDirectory));
    }

    @Override
    public void abortPrepare() {
        this.abortPrepare(this.transactionId);
    }

    @Override
    public void abortPrepare(String transactionId) {
        try {
            this.hadoopFileSystemProxy.deleteFile(this.getTransactionDir(transactionId));
        }
        catch (IOException e) {
            throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "Abort transaction " + transactionId + " error, delete transaction directory failed", e);
        }
    }

    @Override
    public void beginTransaction(Long checkpointId) {
        this.checkpointId = checkpointId;
        this.transactionId = this.getTransactionId(checkpointId);
        this.transactionDirectory = this.getTransactionDir(this.transactionId);
        this.needMoveFiles = new LinkedHashMap();
        this.partitionDirAndValuesMap = new LinkedHashMap();
    }

    private String getTransactionId(Long checkpointId) {
        return "T_" + this.jobId + "_" + this.uuidPrefix + "_" + this.subTaskIndex + "_" + checkpointId;
    }

    @Override
    public List<FileSinkState> snapshotState(long checkpointId) {
        LinkedHashMap commitMap = this.partitionDirAndValuesMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList((Collection)e.getValue()), (e1, e2) -> e1, LinkedHashMap::new));
        ArrayList fileState = Lists.newArrayList((Object[])new FileSinkState[]{new FileSinkState(this.transactionId, this.uuidPrefix, this.checkpointId, new LinkedHashMap<String, String>(this.needMoveFiles), commitMap, this.getTransactionDir(this.transactionId))});
        this.beingWrittenFile.clear();
        this.beginTransaction(checkpointId + 1L);
        return fileState;
    }

    private String getTransactionDir(@NonNull String transactionId) {
        if (transactionId == null) {
            throw new NullPointerException("transactionId is marked non-null but is null");
        }
        String transactionDirectoryPrefix = AbstractWriteStrategy.getTransactionDirPrefix(this.fileSinkConfig.getTmpPath(), this.jobId, this.uuidPrefix);
        return String.join((CharSequence)File.separator, transactionDirectoryPrefix, transactionId);
    }

    public static String getTransactionDirPrefix(String tmpPath, String jobId, String uuidPrefix) {
        CharSequence[] strings = new String[]{tmpPath, "seatunnel", jobId, uuidPrefix};
        return String.join((CharSequence)File.separator, strings);
    }

    public String getOrCreateFilePathBeingWritten(@NonNull SeaTunnelRow seaTunnelRow) {
        if (seaTunnelRow == null) {
            throw new NullPointerException("seaTunnelRow is marked non-null but is null");
        }
        LinkedHashMap<String, List<String>> dataPartitionDirAndValuesMap = this.generatorPartitionDir(seaTunnelRow);
        String beingWrittenFileKey = dataPartitionDirAndValuesMap.keySet().toArray()[0].toString();
        String beingWrittenFilePath = this.beingWrittenFile.get(beingWrittenFileKey);
        if (beingWrittenFilePath != null) {
            return beingWrittenFilePath;
        }
        CharSequence[] pathSegments = new String[]{this.transactionDirectory, beingWrittenFileKey, this.generateFileName(this.transactionId)};
        String newBeingWrittenFilePath = String.join((CharSequence)File.separator, pathSegments);
        this.beingWrittenFile.put(beingWrittenFileKey, newBeingWrittenFilePath);
        if (!"NON_PARTITION".equals(dataPartitionDirAndValuesMap.keySet().toArray()[0].toString())) {
            this.partitionDirAndValuesMap.putAll(dataPartitionDirAndValuesMap);
        }
        return newBeingWrittenFilePath;
    }

    public String getTargetLocation(@NonNull String seaTunnelFilePath) {
        if (seaTunnelFilePath == null) {
            throw new NullPointerException("seaTunnelFilePath is marked non-null but is null");
        }
        String tmpPath = seaTunnelFilePath.replaceAll(Matcher.quoteReplacement(this.transactionDirectory), Matcher.quoteReplacement(this.fileSinkConfig.getPath()));
        return tmpPath.replaceAll("NON_PARTITION" + Matcher.quoteReplacement(File.separator), "");
    }

    @Override
    public long getCheckpointId() {
        return this.checkpointId;
    }

    @Override
    public FileSinkConfig getFileSinkConfig() {
        return this.fileSinkConfig;
    }

    @Override
    public HadoopFileSystemProxy getHadoopFileSystemProxy() {
        return this.hadoopFileSystemProxy;
    }

    @Override
    public void close() throws IOException {
        try {
            if (this.hadoopFileSystemProxy != null) {
                this.hadoopFileSystemProxy.close();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

