/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.RecordCounter;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.types.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

public class MiniBatchGroupAggFunction
extends MapBundleFunction<BaseRow, List<BaseRow>, BaseRow, BaseRow> {
    private static final long serialVersionUID = 7455939331036508477L;
    private final GeneratedAggsHandleFunction genAggsHandler;
    private final GeneratedRecordEqualiser genRecordEqualiser;
    private final LogicalType[] accTypes;
    private final RowType inputType;
    private final RecordCounter recordCounter;
    private final boolean generateRetraction;
    private transient JoinedRow resultRow = new JoinedRow();
    private transient TypeSerializer<BaseRow> inputRowSerializer;
    private transient AggsHandleFunction function = null;
    private transient RecordEqualiser equaliser = null;
    private transient ValueState<BaseRow> accState = null;

    public MiniBatchGroupAggFunction(GeneratedAggsHandleFunction genAggsHandler, GeneratedRecordEqualiser genRecordEqualiser, LogicalType[] accTypes, RowType inputType, int indexOfCountStar, boolean generateRetraction) {
        this.genAggsHandler = genAggsHandler;
        this.genRecordEqualiser = genRecordEqualiser;
        this.recordCounter = RecordCounter.of(indexOfCountStar);
        this.accTypes = accTypes;
        this.inputType = inputType;
        this.generateRetraction = generateRetraction;
    }

    @Override
    public void open(ExecutionContext ctx) throws Exception {
        super.open(ctx);
        this.function = (AggsHandleFunction)this.genAggsHandler.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.function.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext()));
        this.equaliser = (RecordEqualiser)this.genRecordEqualiser.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        BaseRowTypeInfo accTypeInfo = new BaseRowTypeInfo(this.accTypes);
        ValueStateDescriptor accDesc = new ValueStateDescriptor("accState", (TypeInformation)accTypeInfo);
        this.accState = ctx.getRuntimeContext().getState(accDesc);
        this.inputRowSerializer = InternalSerializers.create((LogicalType)this.inputType, ctx.getRuntimeContext().getExecutionConfig());
        this.resultRow = new JoinedRow();
    }

    @Override
    public List<BaseRow> addInput(@Nullable List<BaseRow> value, BaseRow input) throws Exception {
        List<BaseRow> bufferedRows = value;
        if (value == null) {
            bufferedRows = new ArrayList<BaseRow>();
        }
        bufferedRows.add((BaseRow)this.inputRowSerializer.copy((Object)input));
        return bufferedRows;
    }

    @Override
    public void finishBundle(Map<BaseRow, List<BaseRow>> buffer, Collector<BaseRow> out) throws Exception {
        for (Map.Entry<BaseRow, List<BaseRow>> entry : buffer.entrySet()) {
            BaseRow currentKey = entry.getKey();
            List<BaseRow> inputRows = entry.getValue();
            boolean firstRow = false;
            this.ctx.setCurrentKey(currentKey);
            BaseRow acc = (BaseRow)this.accState.value();
            if (acc == null) {
                acc = this.function.createAccumulators();
                firstRow = true;
            }
            this.function.setAccumulators(acc);
            BaseRow prevAggValue = this.function.getValue();
            for (BaseRow input : inputRows) {
                if (BaseRowUtil.isAccumulateMsg(input)) {
                    this.function.accumulate(input);
                    continue;
                }
                this.function.retract(input);
            }
            BaseRow newAggValue = this.function.getValue();
            acc = this.function.getAccumulators();
            if (!this.recordCounter.recordCountIsZero(acc)) {
                this.accState.update((Object)acc);
                if (!firstRow) {
                    if (this.equaliser.equalsWithoutHeader(prevAggValue, newAggValue)) continue;
                    if (this.generateRetraction) {
                        this.resultRow.replace(currentKey, prevAggValue).setHeader((byte)1);
                        out.collect((Object)this.resultRow);
                    }
                    this.resultRow.replace(currentKey, newAggValue).setHeader((byte)0);
                    out.collect((Object)this.resultRow);
                    continue;
                }
                this.resultRow.replace(currentKey, newAggValue).setHeader((byte)0);
                out.collect((Object)this.resultRow);
                continue;
            }
            if (!firstRow) {
                this.resultRow.replace(currentKey, prevAggValue).setHeader((byte)1);
                out.collect((Object)this.resultRow);
            }
            this.accState.clear();
            this.function.cleanup();
        }
    }

    @Override
    public void close() throws Exception {
        if (this.function != null) {
            this.function.close();
        }
    }
}

