/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.yarn.YarnLocalResourceDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Utils {
    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
    public static final String KRB5_FILE_NAME = "krb5.conf";
    public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
    @VisibleForTesting
    static final String YARN_RM_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";
    @VisibleForTesting
    static final String YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler";
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_MB_KEY = "yarn.resource-types.memory-mb.increment-allocation";
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY = "yarn.scheduler.increment-allocation-mb";
    private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB = 1024;
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY = "yarn.resource-types.vcores.increment-allocation";
    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY = "yarn.scheduler.increment-allocation-vcores";
    private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1;

    public static void setupYarnClassPath(org.apache.hadoop.conf.Configuration conf, Map<String, String> appMasterEnv) {
        String[] applicationClassPathEntries;
        Utils.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(), appMasterEnv.get("_FLINK_CLASSPATH"));
        for (String c : applicationClassPathEntries = conf.getStrings("yarn.application.classpath", YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
            Utils.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(), c.trim());
        }
    }

    public static void deleteApplicationFiles(String applicationFilesDir) {
        if (!StringUtils.isNullOrWhitespaceOnly((String)applicationFilesDir)) {
            Path path = new Path(applicationFilesDir);
            try {
                FileSystem fileSystem = path.getFileSystem();
                if (!fileSystem.delete(path, true)) {
                    LOG.error("Deleting yarn application files under {} was unsuccessful.", (Object)applicationFilesDir);
                }
            }
            catch (IOException e) {
                LOG.error("Could not properly delete yarn application files directory {}.", (Object)applicationFilesDir, (Object)e);
            }
        } else {
            LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
        }
    }

    static LocalResource registerLocalResource(org.apache.hadoop.fs.Path remoteRsrcPath, long resourceSize, long resourceModificationTime, LocalResourceVisibility resourceVisibility, LocalResourceType resourceType) {
        LocalResource localResource = (LocalResource)Records.newRecord(LocalResource.class);
        localResource.setResource(ConverterUtils.getYarnUrlFromURI((URI)remoteRsrcPath.toUri()));
        localResource.setSize(resourceSize);
        localResource.setTimestamp(resourceModificationTime);
        localResource.setType(resourceType);
        localResource.setVisibility(resourceVisibility);
        return localResource;
    }

    private static LocalResource registerLocalResource(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path remoteRsrcPath, LocalResourceType resourceType) throws IOException {
        FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
        return Utils.registerLocalResource(remoteRsrcPath, jarStat.getLen(), jarStat.getModificationTime(), LocalResourceVisibility.APPLICATION, resourceType);
    }

    public static void setTokensFor(ContainerLaunchContext amContainer, List<org.apache.hadoop.fs.Path> paths, org.apache.hadoop.conf.Configuration conf) throws IOException {
        Credentials credentials = new Credentials();
        TokenCache.obtainTokensForNamenodes((Credentials)credentials, (org.apache.hadoop.fs.Path[])paths.toArray(new org.apache.hadoop.fs.Path[0]), (org.apache.hadoop.conf.Configuration)conf);
        Utils.obtainTokenForHBase(credentials, conf);
        UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
        Collection usrTok = currUsr.getTokens();
        for (Token token : usrTok) {
            Text id = new Text(token.getIdentifier());
            LOG.info("Adding user token " + id + " with " + token);
            credentials.addToken(id, token);
        }
        try (DataOutputBuffer dob = new DataOutputBuffer();){
            credentials.writeTokenStorageToStream((DataOutputStream)dob);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
            }
            ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
            amContainer.setTokens(securityTokens);
        }
    }

    private static void obtainTokenForHBase(Credentials credentials, org.apache.hadoop.conf.Configuration conf) throws IOException {
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.info("Attempting to obtain Kerberos security token for HBase");
            try {
                Token token;
                block7: {
                    Class.forName("org.apache.hadoop.hbase.HBaseConfiguration").getMethod("addHbaseResources", org.apache.hadoop.conf.Configuration.class).invoke(null, conf);
                    LOG.info("HBase security setting: {}", (Object)conf.get("hbase.security.authentication"));
                    if (!"kerberos".equals(conf.get("hbase.security.authentication"))) {
                        LOG.info("HBase has not been configured to use Kerberos.");
                        return;
                    }
                    try {
                        LOG.info("Obtaining Kerberos security token for HBase");
                        token = (Token)Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil").getMethod("obtainToken", org.apache.hadoop.conf.Configuration.class).invoke(null, conf);
                    }
                    catch (NoSuchMethodException e) {
                        Closeable connectionFactory = (Closeable)Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory").getMethod("createConnection", org.apache.hadoop.conf.Configuration.class).invoke(null, conf);
                        Class<?> connectionClass = Class.forName("org.apache.hadoop.hbase.client.Connection");
                        token = (Token)Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil").getMethod("obtainToken", connectionClass).invoke(null, connectionFactory);
                        if (null == connectionFactory) break block7;
                        connectionFactory.close();
                    }
                }
                if (token == null) {
                    LOG.error("No Kerberos security token for HBase available");
                    return;
                }
                credentials.addToken(token.getService(), token);
                LOG.info("Added HBase Kerberos security token to credentials.");
            }
            catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                LOG.info("HBase is not available (not packaged with this application): {} : \"{}\".", (Object)e.getClass().getSimpleName(), (Object)e.getMessage());
            }
        }
    }

    public static void addToEnvironment(Map<String, String> environment, String variable, String value) {
        String val = environment.get(variable);
        val = val == null ? value : val + File.pathSeparator + value;
        environment.put(StringInterner.weakIntern((String)variable), StringInterner.weakIntern((String)val));
    }

    public static String resolveKeytabPath(String workingDir, String keytabPath) {
        String keytab = null;
        if (keytabPath != null) {
            File f = new File(keytabPath);
            if (f.exists()) {
                keytab = f.getAbsolutePath();
                LOG.info("Resolved keytab path: {}", (Object)keytab);
            } else {
                f = new File(workingDir, keytabPath);
                if (f.exists()) {
                    keytab = f.getAbsolutePath();
                    LOG.info("Resolved keytab path: {}", (Object)keytab);
                } else {
                    LOG.warn("Could not resolve keytab path with: {}", (Object)keytabPath);
                    keytab = null;
                }
            }
        }
        return keytab;
    }

    private Utils() {
        throw new RuntimeException();
    }

    static ContainerLaunchContext createTaskExecutorContext(Configuration flinkConfig, YarnConfiguration yarnConfig, YarnResourceManagerDriverConfiguration configuration, ContaineredTaskManagerParameters tmParams, String taskManagerDynamicProperties, String workingDirectory, Class<?> taskManagerMainClass, Logger log) throws Exception {
        String remoteFlinkJarPath = (String)Preconditions.checkNotNull((Object)configuration.getFlinkDistJar(), (String)"Environment variable %s not set", (Object[])new Object[]{"_FLINK_DIST_JAR"});
        String shipListString = (String)Preconditions.checkNotNull((Object)configuration.getClientShipFiles(), (String)"Environment variable %s not set", (Object[])new Object[]{"_CLIENT_SHIP_FILES"});
        String remoteKeytabPath = configuration.getRemoteKeytabPath();
        String localKeytabPath = configuration.getLocalKeytabPath();
        String keytabPrincipal = configuration.getKeytabPrinciple();
        String remoteYarnConfPath = configuration.getYarnSiteXMLPath();
        String remoteKrb5Path = configuration.getKrb5Path();
        if (log.isDebugEnabled()) {
            log.debug("TM:remote keytab path obtained {}", (Object)remoteKeytabPath);
            log.debug("TM:local keytab path obtained {}", (Object)localKeytabPath);
            log.debug("TM:keytab principal obtained {}", (Object)keytabPrincipal);
            log.debug("TM:remote yarn conf path obtained {}", (Object)remoteYarnConfPath);
            log.debug("TM:remote krb5 path obtained {}", (Object)remoteKrb5Path);
        }
        String classPathString = (String)Preconditions.checkNotNull((Object)configuration.getFlinkClasspath(), (String)"Environment variable %s not set", (Object[])new Object[]{"_FLINK_CLASSPATH"});
        LocalResource keytabResource = null;
        if (remoteKeytabPath != null) {
            log.info("Adding keytab {} to the AM container local resource bucket", (Object)remoteKeytabPath);
            org.apache.hadoop.fs.Path keytabPath = new org.apache.hadoop.fs.Path(remoteKeytabPath);
            org.apache.hadoop.fs.FileSystem fs = keytabPath.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfig);
            keytabResource = Utils.registerLocalResource(fs, keytabPath, LocalResourceType.FILE);
        }
        LocalResource yarnConfResource = null;
        if (remoteYarnConfPath != null) {
            log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", (Object)remoteYarnConfPath);
            org.apache.hadoop.fs.Path yarnConfPath = new org.apache.hadoop.fs.Path(remoteYarnConfPath);
            org.apache.hadoop.fs.FileSystem fs = yarnConfPath.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfig);
            yarnConfResource = Utils.registerLocalResource(fs, yarnConfPath, LocalResourceType.FILE);
        }
        LocalResource krb5ConfResource = null;
        boolean hasKrb5 = false;
        if (remoteKrb5Path != null) {
            log.info("Adding remoteKrb5Path {} to the container local resource bucket", (Object)remoteKrb5Path);
            org.apache.hadoop.fs.Path krb5ConfPath = new org.apache.hadoop.fs.Path(remoteKrb5Path);
            org.apache.hadoop.fs.FileSystem fs = krb5ConfPath.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfig);
            krb5ConfResource = Utils.registerLocalResource(fs, krb5ConfPath, LocalResourceType.FILE);
            hasKrb5 = true;
        }
        HashMap<String, LocalResource> taskManagerLocalResources = new HashMap<String, LocalResource>();
        YarnLocalResourceDescriptor flinkDistLocalResourceDesc = YarnLocalResourceDescriptor.fromString(remoteFlinkJarPath);
        taskManagerLocalResources.put(flinkDistLocalResourceDesc.getResourceKey(), flinkDistLocalResourceDesc.toLocalResource());
        if (yarnConfResource != null) {
            taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
        }
        if (krb5ConfResource != null) {
            taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
        }
        if (keytabResource != null) {
            taskManagerLocalResources.put(localKeytabPath, keytabResource);
        }
        Utils.decodeYarnLocalResourceDescriptorListFromString(shipListString).forEach(resourceDesc -> taskManagerLocalResources.put(resourceDesc.getResourceKey(), resourceDesc.toLocalResource()));
        log.info("Creating container launch context for TaskManagers");
        boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
        boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
        String launchCommand = BootstrapTools.getTaskManagerShellCommand((Configuration)flinkConfig, (ContaineredTaskManagerParameters)tmParams, (String)".", (String)"<LOG_DIR>", (boolean)hasLogback, (boolean)hasLog4j, (boolean)hasKrb5, taskManagerMainClass, (String)taskManagerDynamicProperties);
        if (log.isDebugEnabled()) {
            log.debug("Starting TaskManagers with command: " + launchCommand);
        } else {
            log.info("Starting TaskManagers");
        }
        ContainerLaunchContext ctx = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        ctx.setCommands(Collections.singletonList(launchCommand));
        ctx.setLocalResources(taskManagerLocalResources);
        HashMap<String, String> containerEnv = new HashMap<String, String>();
        containerEnv.putAll(tmParams.taskManagerEnv());
        containerEnv.put("_FLINK_CLASSPATH", classPathString);
        Utils.setupYarnClassPath((org.apache.hadoop.conf.Configuration)yarnConfig, containerEnv);
        containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getCurrentUser().getUserName());
        if (remoteKeytabPath != null && localKeytabPath != null && keytabPrincipal != null) {
            containerEnv.put("_REMOTE_KEYTAB_PATH", remoteKeytabPath);
            containerEnv.put("_LOCAL_KEYTAB_PATH", localKeytabPath);
            containerEnv.put("_KEYTAB_PRINCIPAL", keytabPrincipal);
        } else if (localKeytabPath != null && keytabPrincipal != null) {
            containerEnv.put("_LOCAL_KEYTAB_PATH", localKeytabPath);
            containerEnv.put("_KEYTAB_PRINCIPAL", keytabPrincipal);
        }
        ctx.setEnvironment(containerEnv);
        String fileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
        if (fileLocation != null) {
            log.debug("Adding security tokens to TaskExecutor's container launch context.");
            try (DataOutputBuffer dob = new DataOutputBuffer();){
                Credentials cred = Credentials.readTokenStorageFile((File)new File(fileLocation), (org.apache.hadoop.conf.Configuration)HadoopUtils.getHadoopConfiguration((Configuration)flinkConfig));
                Credentials taskManagerCred = new Credentials();
                Collection userTokens = cred.getAllTokens();
                for (Token token : userTokens) {
                    if (token.getKind().equals((Object)AMRMTokenIdentifier.KIND_NAME)) continue;
                    Text id = new Text(token.getIdentifier());
                    taskManagerCred.addToken(id, token);
                }
                taskManagerCred.writeTokenStorageToStream((DataOutputStream)dob);
                ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
                ctx.setTokens(securityTokens);
            }
            catch (Throwable t) {
                log.error("Failed to add Hadoop's security tokens.", t);
            }
        } else {
            log.info("Could not set security tokens because Hadoop's token file location is unknown.");
        }
        return ctx;
    }

    static boolean isRemotePath(String path) throws IOException {
        Path flinkPath = new Path(path);
        return flinkPath.getFileSystem().isDistributedFS();
    }

    private static List<YarnLocalResourceDescriptor> decodeYarnLocalResourceDescriptorListFromString(String resources) throws Exception {
        ArrayList<YarnLocalResourceDescriptor> resourceDescriptors = new ArrayList<YarnLocalResourceDescriptor>();
        for (String shipResourceDescStr : resources.split(";")) {
            if (shipResourceDescStr.isEmpty()) continue;
            resourceDescriptors.add(YarnLocalResourceDescriptor.fromString(shipResourceDescStr));
        }
        return resourceDescriptors;
    }

    @VisibleForTesting
    static Resource getUnitResource(YarnConfiguration yarnConfig) {
        int unitVcore;
        int unitMemMB;
        String yarnRmSchedulerClazzName = yarnConfig.get("yarn.resourcemanager.scheduler.class");
        if (Objects.equals(yarnRmSchedulerClazzName, YARN_RM_FAIR_SCHEDULER_CLAZZ) || Objects.equals(yarnRmSchedulerClazzName, YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ)) {
            String propMem = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_MB_KEY);
            String propVcore = yarnConfig.get(YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY);
            unitMemMB = propMem != null ? Integer.parseInt(propMem) : yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, 1024);
            unitVcore = propVcore != null ? Integer.parseInt(propVcore) : yarnConfig.getInt(YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, 1);
        } else {
            unitMemMB = yarnConfig.getInt("yarn.scheduler.minimum-allocation-mb", 1024);
            unitVcore = yarnConfig.getInt("yarn.scheduler.minimum-allocation-vcores", 1);
        }
        return Resource.newInstance((int)unitMemMB, (int)unitVcore);
    }

    public static List<org.apache.hadoop.fs.Path> getQualifiedRemoteSharedPaths(Configuration configuration, YarnConfiguration yarnConfiguration) throws IOException, FlinkException {
        return Utils.getRemoteSharedPaths(configuration, (FunctionWithException<String, org.apache.hadoop.fs.Path, IOException>)((FunctionWithException)pathStr -> {
            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(pathStr);
            return path.getFileSystem((org.apache.hadoop.conf.Configuration)yarnConfiguration).makeQualified(path);
        }));
    }

    private static List<org.apache.hadoop.fs.Path> getRemoteSharedPaths(Configuration configuration, FunctionWithException<String, org.apache.hadoop.fs.Path, IOException> strToPathMapper) throws IOException, FlinkException {
        List providedLibDirs = ConfigUtils.decodeListFromConfig((ReadableConfig)configuration, YarnConfigOptions.PROVIDED_LIB_DIRS, strToPathMapper);
        for (org.apache.hadoop.fs.Path path : providedLibDirs) {
            if (Utils.isRemotePath(path.toString())) continue;
            throw new FlinkException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain dirs accessible from all worker nodes, while the \"" + path + "\" is local.");
        }
        return providedLibDirs;
    }
}

