/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@JsonIgnoreProperties(ignoreUnknown=true)
public class StreamExecDeduplicate
extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>,
SingleTransformationTranslator<RowData> {
    public static final String FIELD_NAME_UNIQUE_KEYS = "uniqueKeys";
    public static final String FIELD_NAME_IS_ROWTIME = "isRowtime";
    public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
    public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = "generateUpdateBefore";
    @Experimental
    public static final ConfigOption<Boolean> TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE = ConfigOptions.key((String)"table.exec.insert-and-updateafter-sensitive").booleanType().defaultValue((Object)true).withDescription("Set whether the job (especially the sinks) is sensitive to INSERT messages and UPDATE_AFTER messages. If false, Flink may send UPDATE_AFTER instead of INSERT for the first row at some times (e.g. deduplication for last row). If true, Flink will guarantee to send INSERT for the first row, but there will be additional overhead.Default is true.");
    @JsonProperty(value="uniqueKeys")
    private final int[] uniqueKeys;
    @JsonProperty(value="isRowtime")
    private final boolean isRowtime;
    @JsonProperty(value="keepLastRow")
    private final boolean keepLastRow;
    @JsonProperty(value="generateUpdateBefore")
    private final boolean generateUpdateBefore;

    public StreamExecDeduplicate(int[] uniqueKeys, boolean isRowtime, boolean keepLastRow, boolean generateUpdateBefore, InputProperty inputProperty, RowType outputType, String description) {
        this(uniqueKeys, isRowtime, keepLastRow, generateUpdateBefore, StreamExecDeduplicate.getNewNodeId(), Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecDeduplicate(@JsonProperty(value="uniqueKeys") int[] uniqueKeys, @JsonProperty(value="isRowtime") boolean isRowtime, @JsonProperty(value="keepLastRow") boolean keepLastRow, @JsonProperty(value="generateUpdateBefore") boolean generateUpdateBefore, @JsonProperty(value="id") int id, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, inputProperties, (LogicalType)outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.uniqueKeys = (int[])Preconditions.checkNotNull((Object)uniqueKeys);
        this.isRowtime = isRowtime;
        this.keepLastRow = keepLastRow;
        this.generateUpdateBefore = generateUpdateBefore;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        RowType inputRowType = (RowType)inputEdge.getOutputType();
        InternalTypeInfo rowTypeInfo = (InternalTypeInfo)inputTransform.getOutputType();
        TypeSerializer rowSerializer = rowTypeInfo.createSerializer(planner.getExecEnv().getConfig());
        OneInputStreamOperator<RowData, RowData> operator = this.isRowtime ? new RowtimeDeduplicateOperatorTranslator(planner.getTableConfig(), (InternalTypeInfo<RowData>)rowTypeInfo, (TypeSerializer<RowData>)rowSerializer, inputRowType, this.keepLastRow, this.generateUpdateBefore).createDeduplicateOperator() : new ProcTimeDeduplicateOperatorTranslator(planner.getTableConfig(), (InternalTypeInfo<RowData>)rowTypeInfo, (TypeSerializer<RowData>)rowSerializer, inputRowType, this.keepLastRow, this.generateUpdateBefore).createDeduplicateOperator();
        OneInputTransformation transform = new OneInputTransformation(inputTransform, this.getDescription(), operator, (TypeInformation)rowTypeInfo, inputTransform.getParallelism());
        RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(this.uniqueKeys, (InternalTypeInfo<RowData>)rowTypeInfo);
        transform.setStateKeySelector((KeySelector)selector);
        transform.setStateKeyType((TypeInformation)selector.getProducedType());
        return transform;
    }

    private static class ProcTimeDeduplicateOperatorTranslator
    extends DeduplicateOperatorTranslator {
        private final GeneratedRecordEqualiser generatedEqualiser;

        protected ProcTimeDeduplicateOperatorTranslator(TableConfig tableConfig, InternalTypeInfo<RowData> rowTypeInfo, TypeSerializer<RowData> typeSerializer, RowType inputRowType, boolean keepLastRow, boolean generateUpdateBefore) {
            super(tableConfig, rowTypeInfo, typeSerializer, keepLastRow, generateUpdateBefore);
            this.generatedEqualiser = new EqualiserCodeGenerator(inputRowType).generateRecordEqualiser("DeduplicateRowEqualiser");
        }

        @Override
        OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
            if (this.isMiniBatchEnabled()) {
                CountBundleTrigger trigger = new CountBundleTrigger(this.getMiniBatchSize());
                if (this.keepLastRow) {
                    ProcTimeMiniBatchDeduplicateKeepLastRowFunction processFunction = new ProcTimeMiniBatchDeduplicateKeepLastRowFunction(this.rowTypeInfo, this.typeSerializer, this.getMinRetentionTime(), this.generateUpdateBefore, this.generateInsert(), true, this.generatedEqualiser);
                    return new KeyedMapBundleOperator((MapBundleFunction)processFunction, (BundleTrigger)trigger);
                }
                ProcTimeMiniBatchDeduplicateKeepFirstRowFunction processFunction = new ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(this.typeSerializer, this.getMinRetentionTime());
                return new KeyedMapBundleOperator((MapBundleFunction)processFunction, (BundleTrigger)trigger);
            }
            if (this.keepLastRow) {
                ProcTimeDeduplicateKeepLastRowFunction processFunction = new ProcTimeDeduplicateKeepLastRowFunction(this.rowTypeInfo, this.getMinRetentionTime(), this.generateUpdateBefore, this.generateInsert(), true, this.generatedEqualiser);
                return new KeyedProcessOperator((KeyedProcessFunction)processFunction);
            }
            ProcTimeDeduplicateKeepFirstRowFunction processFunction = new ProcTimeDeduplicateKeepFirstRowFunction(this.getMinRetentionTime());
            return new KeyedProcessOperator((KeyedProcessFunction)processFunction);
        }
    }

    private static class RowtimeDeduplicateOperatorTranslator
    extends DeduplicateOperatorTranslator {
        private final RowType inputRowType;

        protected RowtimeDeduplicateOperatorTranslator(TableConfig tableConfig, InternalTypeInfo<RowData> rowTypeInfo, TypeSerializer<RowData> typeSerializer, RowType inputRowType, boolean keepLastRow, boolean generateUpdateBefore) {
            super(tableConfig, rowTypeInfo, typeSerializer, keepLastRow, generateUpdateBefore);
            this.inputRowType = inputRowType;
        }

        @Override
        OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() {
            int rowtimeIndex = -1;
            for (int i = 0; i < this.inputRowType.getFieldCount(); ++i) {
                if (!TypeCheckUtils.isRowTime((LogicalType)this.inputRowType.getTypeAt(i))) continue;
                rowtimeIndex = i;
                break;
            }
            Preconditions.checkArgument((rowtimeIndex >= 0 ? 1 : 0) != 0);
            if (this.isMiniBatchEnabled()) {
                CountBundleTrigger trigger = new CountBundleTrigger(this.getMiniBatchSize());
                RowTimeMiniBatchDeduplicateFunction processFunction = new RowTimeMiniBatchDeduplicateFunction(this.rowTypeInfo, this.typeSerializer, this.getMinRetentionTime(), rowtimeIndex, this.generateUpdateBefore, this.generateInsert(), this.keepLastRow);
                return new KeyedMapBundleOperator((MapBundleFunction)processFunction, (BundleTrigger)trigger);
            }
            RowTimeDeduplicateFunction processFunction = new RowTimeDeduplicateFunction(this.rowTypeInfo, this.getMinRetentionTime(), rowtimeIndex, this.generateUpdateBefore, this.generateInsert(), this.keepLastRow);
            return new KeyedProcessOperator((KeyedProcessFunction)processFunction);
        }
    }

    private static abstract class DeduplicateOperatorTranslator {
        private final TableConfig tableConfig;
        protected final InternalTypeInfo<RowData> rowTypeInfo;
        protected final TypeSerializer<RowData> typeSerializer;
        protected final boolean keepLastRow;
        protected final boolean generateUpdateBefore;

        protected DeduplicateOperatorTranslator(TableConfig tableConfig, InternalTypeInfo<RowData> rowTypeInfo, TypeSerializer<RowData> typeSerializer, boolean keepLastRow, boolean generateUpdateBefore) {
            this.tableConfig = tableConfig;
            this.rowTypeInfo = rowTypeInfo;
            this.typeSerializer = typeSerializer;
            this.keepLastRow = keepLastRow;
            this.generateUpdateBefore = generateUpdateBefore;
        }

        protected boolean generateInsert() {
            return this.tableConfig.getConfiguration().getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE);
        }

        protected boolean isMiniBatchEnabled() {
            return this.tableConfig.getConfiguration().getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
        }

        protected long getMinRetentionTime() {
            return this.tableConfig.getMinIdleStateRetentionTime();
        }

        protected long getMiniBatchSize() {
            if (this.isMiniBatchEnabled()) {
                long size = this.tableConfig.getConfiguration().getLong(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
                Preconditions.checkArgument((size > 0L ? 1 : 0) != 0, (Object)(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE.key() + " should be greater than 0."));
                return size;
            }
            return -1L;
        }

        abstract OneInputStreamOperator<RowData, RowData> createDeduplicateOperator();
    }
}

