/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.collector.TableFunctionCollector;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.types.PlannerTypeUtils;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

public abstract class CommonExecLookupJoin
extends ExecNodeBase<RowData>
implements SingleTransformationTranslator<RowData> {
    public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
    public static final String FIELD_NAME_JOIN_CONDITION = "joinCondition";
    public static final String FIELD_NAME_TEMPORAL_TABLE = "temporalTable";
    public static final String FIELD_NAME_LOOKUP_KEYS = "lookupKeys";
    public static final String FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE = "projectionOnTemporalTable";
    public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";
    @JsonProperty(value="joinType")
    private final FlinkJoinType joinType;
    @JsonProperty(value="lookupKeys")
    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
    @JsonProperty(value="temporalTable")
    private final TemporalTableSourceSpec temporalTableSourceSpec;
    @JsonProperty(value="projectionOnTemporalTable")
    @Nullable
    private final List<RexNode> projectionOnTemporalTable;
    @JsonProperty(value="filterOnTemporalTable")
    @Nullable
    private final RexNode filterOnTemporalTable;
    @JsonProperty(value="joinCondition")
    @Nullable
    private final RexNode joinCondition;
    private final boolean existCalcOnTemporalTable;
    @Nullable
    private final RelDataType temporalTableOutputType;

    protected CommonExecLookupJoin(int id, ExecNodeContext context, ReadableConfig persistedConfig, FlinkJoinType joinType, @Nullable RexNode joinCondition, TemporalTableSourceSpec temporalTableSourceSpec, Map<Integer, LookupJoinUtil.LookupKey> lookupKeys, @Nullable List<RexNode> projectionOnTemporalTable, @Nullable RexNode filterOnTemporalTable, List<InputProperty> inputProperties, RowType outputType, String description) {
        super(id, context, persistedConfig, inputProperties, (LogicalType)outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.joinType = (FlinkJoinType)Preconditions.checkNotNull((Object)joinType);
        this.joinCondition = joinCondition;
        this.lookupKeys = Collections.unmodifiableMap((Map)Preconditions.checkNotNull(lookupKeys));
        this.temporalTableSourceSpec = (TemporalTableSourceSpec)Preconditions.checkNotNull((Object)temporalTableSourceSpec);
        this.projectionOnTemporalTable = projectionOnTemporalTable;
        this.filterOnTemporalTable = filterOnTemporalTable;
        if (null != projectionOnTemporalTable) {
            this.existCalcOnTemporalTable = true;
            this.temporalTableOutputType = RexUtil.createStructType(FlinkTypeFactory.INSTANCE(), projectionOnTemporalTable);
        } else {
            this.existCalcOnTemporalTable = false;
            this.temporalTableOutputType = null;
        }
    }

    public TemporalTableSourceSpec getTemporalTableSourceSpec() {
        return this.temporalTableSourceSpec;
    }

    @Override
    public Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        RelOptTable temporalTable = this.temporalTableSourceSpec.getTemporalTable(planner.getFlinkContext());
        this.validate(temporalTable);
        ExecEdge inputEdge = this.getInputEdges().get(0);
        RowType inputRowType = (RowType)inputEdge.getOutputType();
        RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
        RowType resultRowType = (RowType)this.getOutputType();
        this.validateLookupKeyType(this.lookupKeys, inputRowType, tableSourceRowType);
        boolean isAsyncEnabled = false;
        UserDefinedFunction userDefinedFunction = LookupJoinUtil.getLookupFunction(temporalTable, this.lookupKeys.keySet());
        UserDefinedFunctionHelper.prepareInstance((ReadableConfig)config, (UserDefinedFunction)userDefinedFunction);
        if (userDefinedFunction instanceof AsyncTableFunction) {
            isAsyncEnabled = true;
        }
        boolean isLeftOuterJoin = this.joinType == FlinkJoinType.LEFT;
        StreamOperatorFactory<RowData> operatorFactory = isAsyncEnabled ? this.createAsyncLookupJoin(temporalTable, config, this.lookupKeys, (AsyncTableFunction<Object>)((AsyncTableFunction)userDefinedFunction), planner.getRelBuilder(), inputRowType, tableSourceRowType, resultRowType, isLeftOuterJoin) : this.createSyncLookupJoin(temporalTable, config, this.lookupKeys, (TableFunction)userDefinedFunction, planner.getRelBuilder(), inputRowType, tableSourceRowType, resultRowType, isLeftOuterJoin, planner.getExecEnv().getConfig().isObjectReuseEnabled());
        Transformation<?> inputTransformation = inputEdge.translateToPlan(planner);
        return ExecNodeUtil.createOneInputTransformation(inputTransformation, this.createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, config), operatorFactory, InternalTypeInfo.of((RowType)resultRowType), inputTransformation.getParallelism());
    }

    protected void validateLookupKeyType(Map<Integer, LookupJoinUtil.LookupKey> lookupKeys, RowType inputRowType, RowType tableSourceRowType) {
        LinkedList imCompatibleConditions = new LinkedList();
        lookupKeys.entrySet().stream().filter(entry -> entry.getValue() instanceof LookupJoinUtil.FieldRefLookupKey).forEach(entry -> {
            LogicalType rightType;
            int rightKey = (Integer)entry.getKey();
            int leftKey = ((LookupJoinUtil.FieldRefLookupKey)entry.getValue()).index;
            LogicalType leftType = inputRowType.getTypeAt(leftKey);
            boolean isCompatible = PlannerTypeUtils.isInteroperable((LogicalType)leftType, (LogicalType)(rightType = tableSourceRowType.getTypeAt(rightKey)));
            if (!isCompatible) {
                String leftName = (String)inputRowType.getFieldNames().get(leftKey);
                String rightName = (String)tableSourceRowType.getFieldNames().get(rightKey);
                imCompatibleConditions.add(String.format("%s[%s]=%s[%s]", leftName, leftType, rightName, rightType));
            }
        });
        if (!imCompatibleConditions.isEmpty()) {
            throw new TableException("Temporal table join requires equivalent condition of the same type, but the condition is " + StringUtils.join(imCompatibleConditions, (String)","));
        }
    }

    private StreamOperatorFactory<RowData> createAsyncLookupJoin(RelOptTable temporalTable, ExecNodeConfig config, Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys, AsyncTableFunction<Object> asyncLookupFunction, RelBuilder relBuilder, RowType inputRowType, RowType tableSourceRowType, RowType resultRowType, boolean isLeftOuterJoin) {
        AsyncLookupJoinRunner asyncFunc;
        int asyncBufferCapacity = (Integer)config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
        long asyncTimeout = ((Duration)config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)).toMillis();
        DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
        LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>> generatedFuncWithType = LookupJoinCodeGenerator.generateAsyncLookupFunction(config.getTableConfig(), dataTypeFactory, (LogicalType)inputRowType, (LogicalType)tableSourceRowType, (LogicalType)resultRowType, allLookupKeys, LookupJoinUtil.getOrderedLookupKeys(allLookupKeys.keySet()), asyncLookupFunction, StringUtils.join(temporalTable.getQualifiedName(), (String)"."));
        RowType rightRowType = Optional.ofNullable(this.temporalTableOutputType).map(FlinkTypeFactory::toLogicalRowType).orElse(tableSourceRowType);
        GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture = LookupJoinCodeGenerator.generateTableAsyncCollector(config.getTableConfig(), "TableFunctionResultFuture", inputRowType, rightRowType, JavaScalaConversionUtil.toScala(Optional.ofNullable(this.joinCondition)));
        DataStructureConverter fetcherConverter = DataStructureConverters.getConverter((DataType)generatedFuncWithType.dataType());
        if (this.existCalcOnTemporalTable) {
            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc = LookupJoinCodeGenerator.generateCalcMapFunction(config.getTableConfig(), JavaScalaConversionUtil.toScala(this.projectionOnTemporalTable), this.filterOnTemporalTable, this.temporalTableOutputType, tableSourceRowType);
            asyncFunc = new AsyncLookupJoinWithCalcRunner(generatedFuncWithType.tableFunc(), fetcherConverter, generatedCalc, generatedResultFuture, InternalSerializers.create((RowType)rightRowType), isLeftOuterJoin, asyncBufferCapacity);
        } else {
            asyncFunc = new AsyncLookupJoinRunner(generatedFuncWithType.tableFunc(), fetcherConverter, generatedResultFuture, InternalSerializers.create((RowType)rightRowType), isLeftOuterJoin, asyncBufferCapacity);
        }
        return new AsyncWaitOperatorFactory((AsyncFunction)asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
    }

    private StreamOperatorFactory<RowData> createSyncLookupJoin(RelOptTable temporalTable, ExecNodeConfig config, Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys, TableFunction<?> syncLookupFunction, RelBuilder relBuilder, RowType inputRowType, RowType tableSourceRowType, RowType resultRowType, boolean isLeftOuterJoin, boolean isObjectReuseEnabled) {
        LookupJoinRunner processFunc;
        DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
        int[] orderedLookupKeys = LookupJoinUtil.getOrderedLookupKeys(allLookupKeys.keySet());
        GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher = LookupJoinCodeGenerator.generateSyncLookupFunction(config.getTableConfig(), dataTypeFactory, (LogicalType)inputRowType, (LogicalType)tableSourceRowType, (LogicalType)resultRowType, allLookupKeys, orderedLookupKeys, syncLookupFunction, StringUtils.join(temporalTable.getQualifiedName(), (String)"."), isObjectReuseEnabled);
        RowType rightRowType = Optional.ofNullable(this.temporalTableOutputType).map(FlinkTypeFactory::toLogicalRowType).orElse(tableSourceRowType);
        CodeGeneratorContext ctx = new CodeGeneratorContext(config.getTableConfig());
        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector = LookupJoinCodeGenerator.generateCollector(ctx, inputRowType, rightRowType, resultRowType, JavaScalaConversionUtil.toScala(Optional.ofNullable(this.joinCondition)), JavaScalaConversionUtil.toScala(Optional.empty()), true);
        if (this.existCalcOnTemporalTable) {
            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc = LookupJoinCodeGenerator.generateCalcMapFunction(config.getTableConfig(), JavaScalaConversionUtil.toScala(this.projectionOnTemporalTable), this.filterOnTemporalTable, this.temporalTableOutputType, tableSourceRowType);
            processFunc = new LookupJoinWithCalcRunner(generatedFetcher, generatedCalc, generatedCollector, isLeftOuterJoin, rightRowType.getFieldCount());
        } else {
            processFunc = new LookupJoinRunner(generatedFetcher, generatedCollector, isLeftOuterJoin, rightRowType.getFieldCount());
        }
        return SimpleOperatorFactory.of((StreamOperator)new ProcessOperator((ProcessFunction)processFunc));
    }

    private void validate(RelOptTable temporalTable) {
        this.validateTableSource(temporalTable);
        if (this.lookupKeys.isEmpty()) {
            throw new TableException(String.format("Temporal table join requires an equality condition on fields of %s.", this.getTableSourceDescription(temporalTable)));
        }
        if (this.joinType != FlinkJoinType.LEFT && this.joinType != FlinkJoinType.INNER) {
            throw new TableException(String.format("Temporal table join currently only support INNER JOIN and LEFT JOIN, but was %s JOIN.", this.joinType.toString()));
        }
    }

    private String getTableSourceDescription(RelOptTable temporalTable) {
        if (temporalTable instanceof TableSourceTable) {
            return String.format("table [%s]", ((TableSourceTable)temporalTable).contextResolvedTable().getIdentifier().asSummaryString());
        }
        if (temporalTable instanceof LegacyTableSourceTable) {
            return String.format("table [%s]", ((LegacyTableSourceTable)temporalTable).tableIdentifier().asSummaryString());
        }
        return "";
    }

    private void validateTableSource(RelOptTable temporalTable) {
        if (temporalTable instanceof TableSourceTable) {
            if (!(((TableSourceTable)temporalTable).tableSource() instanceof LookupTableSource)) {
                throw new TableException(String.format("%s must implement LookupTableSource interface if it is used in temporal table join.", this.getTableSourceDescription(temporalTable)));
            }
        } else if (temporalTable instanceof LegacyTableSourceTable) {
            TableSource tableSource = ((LegacyTableSourceTable)temporalTable).tableSource();
            if (!(tableSource instanceof LookupableTableSource)) {
                throw new TableException(String.format("%s must implement LookupableTableSource interface if it is used in temporal table join.", this.getTableSourceDescription(temporalTable)));
            }
            TypeInformation tableSourceProducedType = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo((DataType)tableSource.getProducedDataType());
            if (!(tableSourceProducedType instanceof InternalTypeInfo && tableSourceProducedType.getTypeClass().isAssignableFrom(RowData.class) || tableSourceProducedType instanceof RowTypeInfo)) {
                throw new TableException(String.format("Temporal table join only support Row or RowData type as return type of temporal table. But was %s.", tableSourceProducedType));
            }
        } else {
            throw new TableException(String.format("table [%s] is neither TableSourceTable not LegacyTableSourceTable.", StringUtils.join(temporalTable.getQualifiedName(), (String)".")));
        }
    }
}

