package org.apache.seatunnel.engine.core.parse;

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.SaveModeExecuteLocation;
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkConfig;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.class */
public class MultipleTableJobConfigParser {
    private static final Logger log = LoggerFactory.getLogger(MultipleTableJobConfigParser.class);
    private final IdGenerator idGenerator;
    private final JobConfig jobConfig;
    private final List<URL> commonPluginJars;
    private final Config seaTunnelJobConfig;
    private final ReadonlyConfig envOptions;
    private final JobConfigParser fallbackParser;
    private final boolean isStartWithSavePoint;
    private final List<JobPipelineCheckpointData> pipelineCheckpoints;

    public MultipleTableJobConfigParser(String str, IdGenerator idGenerator, JobConfig jobConfig) {
        this(str, idGenerator, jobConfig, Collections.emptyList(), false);
    }

    public MultipleTableJobConfigParser(Config config, IdGenerator idGenerator, JobConfig jobConfig) {
        this(config, idGenerator, jobConfig, Collections.emptyList(), false, Collections.emptyList());
    }

    public MultipleTableJobConfigParser(String str, IdGenerator idGenerator, JobConfig jobConfig, List<URL> list, boolean z) {
        this(str, null, idGenerator, jobConfig, list, z, Collections.emptyList());
    }

    public MultipleTableJobConfigParser(String str, List<String> list, IdGenerator idGenerator, JobConfig jobConfig, List<URL> list2, boolean z, List<JobPipelineCheckpointData> list3) {
        this.idGenerator = idGenerator;
        this.jobConfig = jobConfig;
        this.commonPluginJars = list2;
        this.isStartWithSavePoint = z;
        this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(str, new String[0]), list);
        this.envOptions = ReadonlyConfig.fromConfig(this.seaTunnelJobConfig.getConfig("env"));
        this.fallbackParser = new JobConfigParser(idGenerator, list2, this, z);
        this.pipelineCheckpoints = list3;
    }

    public MultipleTableJobConfigParser(Config config, IdGenerator idGenerator, JobConfig jobConfig, List<URL> list, boolean z, List<JobPipelineCheckpointData> list2) {
        this.idGenerator = idGenerator;
        this.jobConfig = jobConfig;
        this.commonPluginJars = list;
        this.isStartWithSavePoint = z;
        this.seaTunnelJobConfig = config;
        this.envOptions = ReadonlyConfig.fromConfig(config.getConfig("env"));
        this.fallbackParser = new JobConfigParser(idGenerator, list, this, z);
        this.pipelineCheckpoints = list2;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoaderService) {
        fillJobConfigAndCommonJars();
        List configList = TypesafeConfigUtils.getConfigList(this.seaTunnelJobConfig, "source", Collections.emptyList());
        List configList2 = TypesafeConfigUtils.getConfigList(this.seaTunnelJobConfig, "transform", Collections.emptyList());
        List configList3 = TypesafeConfigUtils.getConfigList(this.seaTunnelJobConfig, "sink", Collections.emptyList());
        List<URL> connectorJarList = getConnectorJarList(configList, PluginType.SOURCE);
        List<URL> connectorJarList2 = getConnectorJarList(configList2, PluginType.TRANSFORM);
        List<URL> connectorJarList3 = getConnectorJarList(configList3, PluginType.SINK);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        List list = (List) Stream.of((Object[]) new List[]{connectorJarList, connectorJarList2}).flatMap((v0) -> {
            return v0.stream();
        }).distinct().collect(Collectors.toList());
        ClassLoader classLoader = getClassLoader(classLoaderService, contextClassLoader, list);
        ClassLoader classLoader2 = getClassLoader(classLoaderService, contextClassLoader, connectorJarList3);
        try {
            Thread.currentThread().setContextClassLoader(classLoader);
            ConfigParserUtil.checkGraph(configList, configList2, configList3);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            log.info("start generating all sources.");
            if (this.isStartWithSavePoint && this.pipelineCheckpoints != null && !this.pipelineCheckpoints.isEmpty()) {
                Preconditions.checkState(configList.size() == this.pipelineCheckpoints.size(), "The number of source configurations and pipeline checkpoints must be equal.");
            }
            for (int i = 0; i < configList.size(); i++) {
                Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource = parseSource(i, (Config) configList.get(i), classLoader);
                linkedHashMap.put(parseSource._1(), parseSource._2());
            }
            log.info("start generating all transforms.");
            parseTransforms(configList2, classLoader, linkedHashMap);
            Thread.currentThread().setContextClassLoader(classLoader2);
            log.info("start generating all sinks.");
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < configList3.size(); i2++) {
                arrayList.addAll(parseSink(i2, (Config) configList3.get(i2), classLoader2, linkedHashMap));
            }
            ImmutablePair<List<Action>, Set<URL>> immutablePair = new ImmutablePair<>(arrayList, getUsedFactoryUrls(arrayList));
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            if (classLoaderService != null) {
                classLoaderService.releaseClassLoader(Long.parseLong(this.jobConfig.getJobContext().getJobId()), list);
                classLoaderService.releaseClassLoader(Long.parseLong(this.jobConfig.getJobContext().getJobId()), connectorJarList3);
            }
            return immutablePair;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            if (classLoaderService != null) {
                classLoaderService.releaseClassLoader(Long.parseLong(this.jobConfig.getJobContext().getJobId()), list);
                classLoaderService.releaseClassLoader(Long.parseLong(this.jobConfig.getJobContext().getJobId()), connectorJarList3);
            }
            throw th;
        }
    }

    private ClassLoader getClassLoader(ClassLoaderService classLoaderService, ClassLoader classLoader, List<URL> list) {
        return classLoaderService == null ? new SeaTunnelChildFirstClassLoader(list, classLoader) : classLoaderService.getClassLoader(Long.parseLong(this.jobConfig.getJobContext().getJobId()), list);
    }

    public Set<URL> getUsedFactoryUrls(List<Action> list) {
        HashSet hashSet = new HashSet();
        fillUsedFactoryUrls(list, hashSet);
        return hashSet;
    }

    private List<URL> getConnectorJarList(List<? extends Config> list, PluginType pluginType) {
        List list2 = (List) list.stream().map(ConfigParserUtil::getFactoryId).map(str -> {
            return PluginIdentifier.of("seatunnel", pluginType.getType(), str);
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(list2));
        arrayList.addAll(this.commonPluginJars);
        return arrayList;
    }

    private void fillUsedFactoryUrls(List<Action> list, Set<URL> set) {
        list.forEach(action -> {
            set.addAll(action.getJarUrls());
            if (action.getUpstream().isEmpty()) {
                return;
            }
            fillUsedFactoryUrls(action.getUpstream(), set);
        });
    }

    private void fillJobConfigAndCommonJars() {
        this.jobConfig.getJobContext().setJobMode((JobMode) this.envOptions.get(EnvCommonOptions.JOB_MODE));
        if (StringUtils.isEmpty(this.jobConfig.getName()) || this.jobConfig.getName().equals("SeaTunnel") || this.jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) {
            this.jobConfig.setName((String) this.envOptions.get(EnvCommonOptions.JOB_NAME));
        }
        this.jobConfig.getEnvOptions().putAll(this.envOptions.getSourceMap());
        this.commonPluginJars.addAll(new ArrayList((Collection) Common.getThirdPartyJars(this.jobConfig.getEnvOptions().getOrDefault(EnvCommonOptions.JARS.key(), "").toString()).stream().map((v0) -> {
            return v0.toUri();
        }).map(uri -> {
            try {
                return uri.toURL();
            } catch (MalformedURLException e) {
                throw new SeaTunnelEngineException("the uri of jar illegal:" + uri, e);
            }
        }).collect(Collectors.toList())));
        log.info("add common jar in plugins :{}", this.commonPluginJars);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T extends Factory> boolean isFallback(ClassLoader classLoader, Class<T> cls, String str, Consumer<T> consumer) {
        Optional discoverOptionalFactory = FactoryUtil.discoverOptionalFactory(classLoader, cls, str);
        if (!discoverOptionalFactory.isPresent()) {
            return true;
        }
        try {
            consumer.accept(discoverOptionalFactory.get());
            return false;
        } catch (Exception e) {
            if ((e instanceof UnsupportedOperationException) && "The Factory has not been implemented and the deprecated Plugin will be used.".equals(e.getMessage())) {
                log.warn("The Factory has not been implemented and the deprecated Plugin will be used.");
                return true;
            }
            log.debug(ExceptionUtils.getMessage(e));
            return false;
        }
    }

    private int getParallelism(ReadonlyConfig readonlyConfig) {
        return Math.max(1, ((Integer) readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(this.envOptions.get(CommonOptions.PARALLELISM))).intValue());
    }

    public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(int i, Config config, ClassLoader classLoader) {
        ReadonlyConfig fromConfig = ReadonlyConfig.fromConfig(config);
        String factoryId = ConfigParserUtil.getFactoryId(fromConfig);
        String str = (String) fromConfig.getOptional(CommonOptions.PLUGIN_OUTPUT).orElse("default-identifier");
        int parallelism = getParallelism(fromConfig);
        if (isFallback(classLoader, TableSourceFactory.class, factoryId, tableSourceFactory -> {
            tableSourceFactory.createSource((TableSourceFactoryContext) null);
        })) {
            return new Tuple2<>(str, Collections.singletonList(this.fallbackParser.parseSource(config, this.jobConfig, str, parallelism)));
        }
        Tuple2 createAndPrepareSource = (!this.isStartWithSavePoint || this.pipelineCheckpoints == null || this.pipelineCheckpoints.isEmpty()) ? FactoryUtil.createAndPrepareSource(fromConfig, classLoader, factoryId) : FactoryUtil.restoreAndPrepareSource(fromConfig, classLoader, factoryId, getSourceCheckpoint(i, factoryId));
        HashSet hashSet = new HashSet();
        hashSet.addAll(getSourcePluginJarPaths(config));
        ArrayList arrayList = new ArrayList();
        long nextId = this.idGenerator.getNextId();
        String createSourceActionName = JobConfigParser.createSourceActionName(i, factoryId);
        SeaTunnelSource seaTunnelSource = (SeaTunnelSource) createAndPrepareSource._1();
        seaTunnelSource.setJobContext(this.jobConfig.getJobContext());
        PluginUtil.ensureJobModeMatch(this.jobConfig.getJobContext(), seaTunnelSource);
        SourceAction sourceAction = new SourceAction(nextId, createSourceActionName, (SeaTunnelSource) createAndPrepareSource._1(), hashSet, new HashSet());
        sourceAction.setParallelism(parallelism);
        Iterator it = ((List) createAndPrepareSource._2()).iterator();
        while (it.hasNext()) {
            arrayList.add(new Tuple2((CatalogTable) it.next(), sourceAction));
        }
        return new Tuple2<>(str, arrayList);
    }

    public void parseTransforms(List<? extends Config> list, ClassLoader classLoader, LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> linkedHashMap) {
        if (CollectionUtils.isEmpty(list) || list.isEmpty()) {
            return;
        }
        LinkedList linkedList = new LinkedList(list);
        int i = 0;
        while (!linkedList.isEmpty()) {
            int i2 = i;
            i++;
            parseTransform(i2, linkedList, classLoader, linkedHashMap);
        }
    }

    private void parseTransform(int i, Queue<Config> queue, ClassLoader classLoader, LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> linkedHashMap) {
        Config poll = queue.poll();
        ReadonlyConfig fromConfig = ReadonlyConfig.fromConfig(poll);
        String factoryId = ConfigParserUtil.getFactoryId(fromConfig);
        HashSet hashSet = new HashSet();
        hashSet.addAll(getTransformPluginJarPaths(poll));
        Stream<String> stream = ConfigParserUtil.getInputIds(fromConfig).stream();
        linkedHashMap.getClass();
        List list = (List) stream.map((v1) -> {
            return r1.get(v1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            if (!queue.isEmpty()) {
                queue.offer(poll);
                return;
            }
            list = (List) findLast(linkedHashMap);
        }
        String str = (String) fromConfig.getOptional(CommonOptions.PLUGIN_OUTPUT).orElse("default-identifier");
        Set set = (Set) list.stream().map((v0) -> {
            return v0._2();
        }).collect(Collectors.toCollection(LinkedHashSet::new));
        LinkedHashSet linkedHashSet = (LinkedHashSet) list.stream().map((v0) -> {
            return v0._1();
        }).collect(Collectors.toCollection(LinkedHashSet::new));
        checkProducedTypeEquals(set);
        int intValue = ((Integer) fromConfig.getOptional(CommonOptions.PARALLELISM).orElse(Integer.valueOf(((Action) ((Tuple2) list.get(0))._2()).getParallelism()))).intValue();
        SeaTunnelTransform createAndPrepareMultiTableTransform = FactoryUtil.createAndPrepareMultiTableTransform(new ArrayList(linkedHashSet), fromConfig, classLoader, factoryId);
        createAndPrepareMultiTableTransform.setJobContext(this.jobConfig.getJobContext());
        TransformAction transformAction = new TransformAction(this.idGenerator.getNextId(), JobConfigParser.createTransformActionName(i, factoryId), new ArrayList(set), createAndPrepareMultiTableTransform, hashSet, new HashSet());
        transformAction.setParallelism(intValue);
        ArrayList arrayList = new ArrayList();
        Iterator it = createAndPrepareMultiTableTransform.getProducedCatalogTables().iterator();
        while (it.hasNext()) {
            arrayList.add(new Tuple2<>((CatalogTable) it.next(), transformAction));
        }
        linkedHashMap.put(str, arrayList);
    }

    public static SeaTunnelDataType<?> getProducedType(Action action) {
        if (action instanceof SourceAction) {
            try {
                return ((CatalogTable) ((SourceAction) action).getSource().getProducedCatalogTables().get(0)).getSeaTunnelRowType();
            } catch (UnsupportedOperationException e) {
                return ((SourceAction) action).getSource().getProducedType();
            }
        }
        if (action instanceof TransformAction) {
            return ((TransformAction) action).getTransform().getProducedCatalogTable().getSeaTunnelRowType();
        }
        throw new UnsupportedOperationException();
    }

    public static void checkProducedTypeEquals(Set<Action> set) {
        SeaTunnelDataType<?> producedType = getProducedType((Action) new ArrayList(set).get(0));
        Iterator<Action> it = set.iterator();
        while (it.hasNext()) {
            if (!producedType.equals(getProducedType(it.next()))) {
                throw new JobDefineCheckException("Transform/Sink don't support processing data with two different structures.");
            }
        }
    }

    @Deprecated
    private static <T> T findLast(LinkedHashMap<?, T> linkedHashMap) {
        int size = linkedHashMap.size();
        int i = 1;
        for (T t : linkedHashMap.values()) {
            if (i == size) {
                return t;
            }
            i++;
        }
        return null;
    }

    public List<SinkAction<?, ?, ?, ?>> parseSink(int i, Config config, ClassLoader classLoader, LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> linkedHashMap) {
        ReadonlyConfig fromConfig = ReadonlyConfig.fromConfig(config);
        String factoryId = ConfigParserUtil.getFactoryId(fromConfig);
        Stream<String> stream = ConfigParserUtil.getInputIds(fromConfig).stream();
        linkedHashMap.getClass();
        List<List<Tuple2<CatalogTable, Action>>> list = (List) stream.map((v1) -> {
            return r1.get(v1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            list = Collections.singletonList(findLast(linkedHashMap));
        } else if (list.size() > 1) {
            Iterator<List<Tuple2<CatalogTable, Action>>> it = list.iterator();
            while (it.hasNext()) {
                if (it.next().size() > 1) {
                    throw new JobDefineCheckException("Sink don't support simultaneous writing of data from multi-table source and other sources.");
                }
            }
        }
        if (isFallback(classLoader, TableSinkFactory.class, factoryId, tableSinkFactory -> {
            tableSinkFactory.createSink((TableSinkFactoryContext) null);
        })) {
            return this.fallbackParser.parseSinks(i, list, config, this.jobConfig);
        }
        HashSet hashSet = new HashSet();
        hashSet.addAll(getSinkPluginJarPaths(config));
        ArrayList arrayList = new ArrayList();
        if (list.size() > 1) {
            Set<Action> set = (Set) list.stream().flatMap((v0) -> {
                return v0.stream();
            }).map((v0) -> {
                return v0._2();
            }).collect(Collectors.toCollection(LinkedHashSet::new));
            checkProducedTypeEquals(set);
            Tuple2<CatalogTable, Action> tuple2 = list.get(0).get(0);
            arrayList.add(createSinkAction((CatalogTable) tuple2._1(), set, fromConfig, classLoader, hashSet, new HashSet(), factoryId, ((Action) tuple2._2()).getParallelism(), i));
            return arrayList;
        }
        for (Tuple2<CatalogTable, Action> tuple22 : list.get(0)) {
            arrayList.add(createSinkAction((CatalogTable) tuple22._1(), Collections.singleton(tuple22._2()), fromConfig, classLoader, hashSet, new HashSet(), factoryId, ((Action) tuple22._2()).getParallelism(), i));
        }
        return (List) tryGenerateMultiTableSink(arrayList, fromConfig, classLoader, factoryId, i).map((v0) -> {
            return Collections.singletonList(v0);
        }).orElse(arrayList);
    }

    private Optional<SinkAction<?, ?, ?, ?>> tryGenerateMultiTableSink(List<SinkAction<?, ?, ?, ?>> list, ReadonlyConfig readonlyConfig, ClassLoader classLoader, String str, int i) {
        if (list.stream().anyMatch(sinkAction -> {
            return !(sinkAction.getSink() instanceof SupportMultiTableSink);
        })) {
            log.info("Unsupported multi table sink api, rollback to sink template");
            return Optional.empty();
        }
        HashMap hashMap = new HashMap();
        Set set = (Set) list.stream().flatMap(sinkAction2 -> {
            return sinkAction2.getJarUrls().stream();
        }).collect(Collectors.toSet());
        list.forEach(sinkAction3 -> {
            hashMap.put(sinkAction3.getConfig().getMultipleRowTableId(), sinkAction3.getSink());
        });
        SeaTunnelSink createMultiTableSink = FactoryUtil.createMultiTableSink(hashMap, readonlyConfig, classLoader);
        SinkAction sinkAction4 = new SinkAction(this.idGenerator.getNextId(), JobConfigParser.createSinkActionName(i, str, "MultiTableSink"), list.get(0).getUpstream(), createMultiTableSink, set, new HashSet());
        sinkAction4.setParallelism(list.get(0).getParallelism());
        return Optional.of(sinkAction4);
    }

    private SinkAction<?, ?, ?, ?> createSinkAction(CatalogTable catalogTable, Set<Action> set, ReadonlyConfig readonlyConfig, ClassLoader classLoader, Set<URL> set2, Set<ConnectorJarIdentifier> set3, String str, int i, int i2) {
        SeaTunnelSink<?, ?, ?, ?> createAndPrepareSink = FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader, str);
        createAndPrepareSink.setJobContext(this.jobConfig.getJobContext());
        SinkConfig sinkConfig = new SinkConfig(catalogTable.getTableId().toTablePath().toString());
        SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(this.idGenerator.getNextId(), JobConfigParser.createSinkActionName(i2, str, sinkConfig.getMultipleRowTableId()), new ArrayList(set), createAndPrepareSink, set2, set3, sinkConfig);
        if (!this.isStartWithSavePoint) {
            handleSaveMode(createAndPrepareSink);
        }
        sinkAction.setParallelism(i);
        return sinkAction;
    }

    public void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> seaTunnelSink) {
        if (SupportSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
            SupportSaveMode supportSaveMode = (SupportSaveMode) seaTunnelSink;
            if (((SaveModeExecuteLocation) this.envOptions.get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)).equals(SaveModeExecuteLocation.CLIENT)) {
                log.warn("SaveMode execute location on CLIENT is deprecated, please use CLUSTER instead.");
                Optional saveModeHandler = supportSaveMode.getSaveModeHandler();
                if (saveModeHandler.isPresent()) {
                    try {
                        SaveModeHandler saveModeHandler2 = (SaveModeHandler) saveModeHandler.get();
                        Throwable th = null;
                        try {
                            saveModeHandler2.open();
                            new SaveModeExecuteWrapper(saveModeHandler2).execute();
                            if (saveModeHandler2 != null) {
                                if (0 != 0) {
                                    try {
                                        saveModeHandler2.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    saveModeHandler2.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED, e);
                    }
                }
            }
        }
    }

    private List<URL> getSourcePluginJarPaths(Config config) {
        return new SeaTunnelSourcePluginDiscovery().getPluginJarPaths(Lists.newArrayList(new PluginIdentifier[]{PluginIdentifier.of("seatunnel", "source", config.getString("plugin_name"))}));
    }

    private List<URL> getTransformPluginJarPaths(Config config) {
        return new SeaTunnelTransformPluginDiscovery().getPluginJarPaths(Lists.newArrayList(new PluginIdentifier[]{PluginIdentifier.of("seatunnel", "transform", config.getString("plugin_name"))}));
    }

    private List<URL> getSinkPluginJarPaths(Config config) {
        return new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(Lists.newArrayList(new PluginIdentifier[]{PluginIdentifier.of("seatunnel", "sink", config.getString("plugin_name"))}));
    }

    private ChangeStreamTableSourceCheckpoint getSourceCheckpoint(int i, String str) {
        String createSourceActionName = JobConfigParser.createSourceActionName(i, str);
        JobPipelineCheckpointData jobPipelineCheckpointData = this.pipelineCheckpoints.get(i);
        Preconditions.checkArgument(jobPipelineCheckpointData.getPipelineId() == i + 1, String.format("The pipeline id in the checkpoint data is %d, but the config index is %d.", Integer.valueOf(jobPipelineCheckpointData.getPipelineId()), Integer.valueOf(i + 1)));
        List list = (List) jobPipelineCheckpointData.getTaskStates().entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).contains(createSourceActionName);
        }).map(entry2 -> {
            return (JobPipelineCheckpointData.ActionState) entry2.getValue();
        }).collect(Collectors.toList());
        Preconditions.checkArgument(list.size() == 1, String.format("The source action name %s is not found in the checkpoint keys %s.", createSourceActionName, jobPipelineCheckpointData.getTaskStates().keySet()));
        return new ChangeStreamTableSourceCheckpoint(((JobPipelineCheckpointData.ActionState) list.get(0)).getCoordinatorState().get(0), (List) ((JobPipelineCheckpointData.ActionState) list.get(0)).getSubtaskState().stream().flatMap(actionSubtaskState -> {
            return actionSubtaskState == null ? Stream.of(Collections.emptyList()) : Stream.of(actionSubtaskState.getState());
        }).collect(Collectors.toList()));
    }
}
