/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.cdc.debezium;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffset;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.alibaba.ververica.cdc.debezium.internal.FlinkDatabaseHistory;
import com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.history.HistoryRecord;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class DebeziumSourceFunction<T>
extends RichSourceFunction<T>
implements CheckpointedFunction,
CheckpointListener,
ResultTypeQueryable<T> {
    private static final long serialVersionUID = -5808108641062931623L;
    protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class);
    public static final String OFFSETS_STATE_NAME = "offset-states";
    public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states";
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    private final DebeziumDeserializationSchema<T> deserializer;
    private final Properties properties;
    @Nullable
    private final DebeziumOffset specificOffset;
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    private ExecutorService executor;
    private DebeziumEngine<?> engine;
    private volatile transient Throwable error;
    private volatile boolean running = true;
    private volatile boolean debeziumStarted = false;
    private volatile transient DebeziumChangeConsumer<T> debeziumConsumer;
    private volatile transient String restoredOffsetState;
    private transient ListState<byte[]> offsetState;
    private transient ListState<String> historyRecordsState;
    private transient String engineInstanceName;

    public DebeziumSourceFunction(DebeziumDeserializationSchema<T> deserializer, Properties properties, @Nullable DebeziumOffset specificOffset) {
        this.deserializer = deserializer;
        this.properties = properties;
        this.specificOffset = specificOffset;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
        this.executor = Executors.newSingleThreadExecutor(threadFactory);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.offsetState = stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, (TypeInformation)PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
        this.historyRecordsState = stateStore.getUnionListState(new ListStateDescriptor(HISTORY_RECORDS_STATE_NAME, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO));
        if (context.isRestored()) {
            this.restoreOffsetState();
            this.restoreHistoryRecordsState();
        } else if (this.specificOffset != null) {
            byte[] serializedOffset = DebeziumOffsetSerializer.INSTANCE.serialize(this.specificOffset);
            this.restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
            LOG.info("Consumer subtask {} starts to read from specified offset {}.", (Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)this.restoredOffsetState);
        } else {
            LOG.info("Consumer subtask {} has no restore state.", (Object)this.getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    private void restoreOffsetState() throws Exception {
        for (byte[] serializedOffset : (Iterable)this.offsetState.get()) {
            if (this.restoredOffsetState == null) {
                this.restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8);
                continue;
            }
            throw new RuntimeException("Debezium Source only support single task, however, this is restored from multiple tasks.");
        }
        LOG.info("Consumer subtask {} restored offset state: {}.", (Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)this.restoredOffsetState);
    }

    private void restoreHistoryRecordsState() throws Exception {
        DocumentReader reader = DocumentReader.defaultReader();
        ConcurrentLinkedQueue<HistoryRecord> historyRecords = new ConcurrentLinkedQueue<HistoryRecord>();
        int recordsCount = 0;
        boolean firstEntry = true;
        for (String record : (Iterable)this.historyRecordsState.get()) {
            if (firstEntry) {
                this.engineInstanceName = record;
                firstEntry = false;
                continue;
            }
            historyRecords.add(new HistoryRecord(reader.read(record)));
            ++recordsCount;
        }
        if (this.engineInstanceName != null) {
            FlinkDatabaseHistory.registerHistoryRecords(this.engineInstanceName, historyRecords);
        }
        LOG.info("Consumer subtask {} restored history records state: {} with {} records.", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.engineInstanceName, recordsCount});
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            this.snapshotOffsetState(functionSnapshotContext.getCheckpointId());
            this.snapshotHistoryRecordsState();
        }
    }

    private void snapshotOffsetState(long checkpointId) throws Exception {
        this.offsetState.clear();
        DebeziumChangeConsumer<T> consumer = this.debeziumConsumer;
        byte[] serializedOffset = null;
        if (consumer == null) {
            if (this.restoredOffsetState != null) {
                serializedOffset = this.restoredOffsetState.getBytes(StandardCharsets.UTF_8);
            }
        } else {
            byte[] currentState = consumer.snapshotCurrentState();
            serializedOffset = currentState == null && this.restoredOffsetState != null ? this.restoredOffsetState.getBytes(StandardCharsets.UTF_8) : currentState;
        }
        if (serializedOffset != null) {
            this.offsetState.add((Object)serializedOffset);
            this.pendingOffsetsToCommit.put((Object)checkpointId, (Object)serializedOffset);
            while (this.pendingOffsetsToCommit.size() > 100) {
                this.pendingOffsetsToCommit.remove(0);
            }
        }
    }

    private void snapshotHistoryRecordsState() throws Exception {
        this.historyRecordsState.clear();
        if (this.engineInstanceName != null) {
            this.historyRecordsState.add((Object)this.engineInstanceName);
            ConcurrentLinkedQueue<HistoryRecord> historyRecords = FlinkDatabaseHistory.getRegisteredHistoryRecord(this.engineInstanceName);
            if (historyRecords != null) {
                DocumentWriter writer = DocumentWriter.defaultWriter();
                for (HistoryRecord record : historyRecords) {
                    this.historyRecordsState.add((Object)writer.write(record.document()));
                }
            }
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        this.properties.setProperty("name", "engine");
        this.properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
        if (this.restoredOffsetState != null) {
            this.properties.setProperty("offset.storage.flink.state.value", this.restoredOffsetState);
        }
        this.properties.setProperty("key.converter.schemas.enable", "false");
        this.properties.setProperty("value.converter.schemas.enable", "false");
        this.properties.setProperty("include.schema.changes", "false");
        this.properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));
        this.properties.setProperty("tombstones.on.delete", "false");
        this.properties.setProperty("database.history", FlinkDatabaseHistory.class.getCanonicalName());
        if (this.engineInstanceName == null) {
            this.engineInstanceName = UUID.randomUUID().toString();
            FlinkDatabaseHistory.registerEmptyHistoryRecord(this.engineInstanceName);
        }
        this.properties.setProperty("database.history.instance.name", this.engineInstanceName);
        String dbzHeartbeatPrefix = this.properties.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());
        this.debeziumConsumer = new DebeziumChangeConsumer<T>(sourceContext, this.deserializer, this.restoredOffsetState == null, this::reportError, dbzHeartbeatPrefix);
        this.engine = DebeziumEngine.create(Connect.class).using(this.properties).notifying(this.debeziumConsumer).using(OffsetCommitPolicy.always()).using((success, message, error) -> {
            if (!success && error != null) {
                this.reportError(error);
            }
        }).build();
        if (!this.running) {
            return;
        }
        this.executor.execute(this.engine);
        this.debeziumStarted = true;
        MetricGroup metricGroup = this.getRuntimeContext().getMetricGroup();
        metricGroup.gauge("currentFetchEventTimeLag", () -> this.debeziumConsumer.getFetchDelay());
        metricGroup.gauge("currentEmitEventTimeLag", () -> this.debeziumConsumer.getEmitDelay());
        metricGroup.gauge("sourceIdleTime", () -> this.debeziumConsumer.getIdleTime());
        try {
            while (this.running && !this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                if (this.error == null) continue;
                this.running = false;
                this.shutdownEngine();
                ExceptionUtils.rethrow((Throwable)this.error);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!this.running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }
        DebeziumChangeConsumer<T> consumer = this.debeziumConsumer;
        if (consumer == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }
        try {
            int posInMap = this.pendingOffsetsToCommit.indexOf((Object)checkpointId);
            if (posInMap == -1) {
                LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}", (Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)checkpointId);
                return;
            }
            byte[] serializedOffsets = (byte[])this.pendingOffsetsToCommit.remove(posInMap);
            for (int i = 0; i < posInMap; ++i) {
                this.pendingOffsetsToCommit.remove(0);
            }
            if (serializedOffsets == null || serializedOffsets.length == 0) {
                LOG.debug("Consumer subtask {} has empty checkpoint state.", (Object)this.getRuntimeContext().getIndexOfThisSubtask());
                return;
            }
            DebeziumOffset offset = DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
            consumer.commitOffset(offset);
        }
        catch (Exception e) {
            LOG.warn("Ignore error when committing offset to database.", (Throwable)e);
        }
    }

    public void cancel() {
        this.running = false;
        this.shutdownEngine();
    }

    public void close() throws Exception {
        this.cancel();
        if (this.executor != null) {
            this.executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }
        super.close();
    }

    private void reportError(Throwable error) {
        LOG.error("Reporting error:", error);
        this.error = error;
    }

    private void shutdownEngine() {
        try {
            if (this.engine != null) {
                this.engine.close();
            }
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
        finally {
            if (this.executor != null) {
                this.executor.shutdown();
            }
            this.debeziumStarted = false;
        }
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    @VisibleForTesting
    public LinkedMap getPendingOffsetsToCommit() {
        return this.pendingOffsetsToCommit;
    }

    @VisibleForTesting
    public boolean getDebeziumStarted() {
        return this.debeziumStarted;
    }
}

