/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.cdc.debezium.internal;

import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffset;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.alibaba.ververica.cdc.debezium.internal.ErrorReporter;
import io.debezium.connector.SnapshotRecord;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DebeziumChangeConsumer<T>
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
    public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
    public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final DebeziumDeserializationSchema<T> deserialization;
    private final DebeziumCollector debeziumCollector;
    private final ErrorReporter errorReporter;
    private final DebeziumOffset debeziumOffset;
    private final DebeziumOffsetSerializer stateSerializer;
    private final String heartbeatTopicPrefix;
    private boolean isInDbSnapshotPhase;
    private boolean lockHold = false;
    private volatile long messageTimestamp = 0L;
    private volatile long processTime = 0L;
    private volatile long fetchDelay = 0L;
    private volatile long emitDelay = 0L;
    private DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;

    public DebeziumChangeConsumer(SourceFunction.SourceContext<T> sourceContext, DebeziumDeserializationSchema<T> deserialization, boolean isInDbSnapshotPhase, ErrorReporter errorReporter, String heartbeatTopicPrefix) {
        this.sourceContext = sourceContext;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.deserialization = deserialization;
        this.isInDbSnapshotPhase = isInDbSnapshotPhase;
        this.heartbeatTopicPrefix = heartbeatTopicPrefix;
        this.debeziumCollector = new DebeziumCollector();
        this.errorReporter = errorReporter;
        this.debeziumOffset = new DebeziumOffset();
        this.stateSerializer = DebeziumOffsetSerializer.INSTANCE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
        this.currentCommitter = committer;
        this.processTime = System.currentTimeMillis();
        try {
            for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
                SourceRecord record = event.value();
                this.updateMessageTimestamp(record);
                this.fetchDelay = this.processTime - this.messageTimestamp;
                if (this.isHeartbeatEvent(record)) {
                    Object object = this.checkpointLock;
                    synchronized (object) {
                        this.debeziumOffset.setSourcePartition(record.sourcePartition());
                        this.debeziumOffset.setSourceOffset(record.sourceOffset());
                        continue;
                    }
                }
                this.deserialization.deserialize(record, this.debeziumCollector);
                if (this.isInDbSnapshotPhase) {
                    if (!this.lockHold) {
                        MemoryUtils.UNSAFE.monitorEnter(this.checkpointLock);
                        this.lockHold = true;
                        LOG.info("Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");
                    }
                    if (!this.isSnapshotRecord(record)) {
                        MemoryUtils.UNSAFE.monitorExit(this.checkpointLock);
                        this.isInDbSnapshotPhase = false;
                        LOG.info("Received record from streaming binlog phase, released checkpoint lock.");
                    }
                }
                this.emitRecordsUnderCheckpointLock(this.debeziumCollector.records, record.sourcePartition(), record.sourceOffset());
            }
        }
        catch (Exception e) {
            LOG.error("Error happens when consuming change messages.", (Throwable)e);
            this.errorReporter.reportError(e);
        }
    }

    private void updateMessageTimestamp(SourceRecord record) {
        Schema schema = record.valueSchema();
        Struct value = (Struct)record.value();
        if (schema.field("source") == null) {
            return;
        }
        Struct source = value.getStruct("source");
        if (source.schema().field("ts_ms") == null) {
            return;
        }
        Long tsMs = source.getInt64("ts_ms");
        if (tsMs != null) {
            this.messageTimestamp = tsMs;
        }
    }

    private boolean isHeartbeatEvent(SourceRecord record) {
        String topic = record.topic();
        return topic != null && topic.startsWith(this.heartbeatTopicPrefix);
    }

    private boolean isSnapshotRecord(SourceRecord record) {
        Struct value = (Struct)record.value();
        if (value != null) {
            Struct source = value.getStruct("source");
            SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source);
            return SnapshotRecord.TRUE == snapshotRecord;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordsUnderCheckpointLock(Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) throws InterruptedException {
        if (this.isInDbSnapshotPhase) {
            this.emitRecords(records, sourcePartition, sourceOffset);
        } else {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.emitRecords(records, sourcePartition, sourceOffset);
            }
        }
    }

    private void emitRecords(Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        T record;
        long currentTimestamp = System.currentTimeMillis();
        while ((record = records.poll()) != null) {
            this.emitDelay = currentTimestamp - this.messageTimestamp;
            this.sourceContext.collect(record);
        }
        this.debeziumOffset.setSourcePartition(sourcePartition);
        this.debeziumOffset.setSourceOffset(sourceOffset);
    }

    public byte[] snapshotCurrentState() throws Exception {
        assert (Thread.holdsLock(this.checkpointLock));
        if (this.debeziumOffset.sourceOffset == null || this.debeziumOffset.sourcePartition == null) {
            return null;
        }
        return this.stateSerializer.serialize(this.debeziumOffset);
    }

    public void commitOffset(DebeziumOffset offset) throws InterruptedException {
        if (this.currentCommitter == null) {
            LOG.info("commitOffset() called on Debezium ChangeConsumer which doesn't receive records yet.");
            return;
        }
        SourceRecord recordWrapper = new SourceRecord(offset.sourcePartition, this.adjustSourceOffset(offset.sourceOffset), "DUMMY", Schema.BOOLEAN_SCHEMA, true);
        EmbeddedEngineChangeEvent<Object, SourceRecord> changeEvent = new EmbeddedEngineChangeEvent<Object, SourceRecord>(null, recordWrapper, recordWrapper);
        this.currentCommitter.markProcessed(changeEvent);
        this.currentCommitter.markBatchFinished();
    }

    public long getFetchDelay() {
        return this.fetchDelay;
    }

    public long getEmitDelay() {
        return this.emitDelay;
    }

    public long getIdleTime() {
        return System.currentTimeMillis() - this.processTime;
    }

    private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
        String value;
        if (sourceOffset.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
            value = sourceOffset.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString();
            sourceOffset.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
        }
        if (sourceOffset.containsKey(LAST_COMMIT_LSN_KEY)) {
            value = sourceOffset.get(LAST_COMMIT_LSN_KEY).toString();
            sourceOffset.put(LAST_COMMIT_LSN_KEY, Long.parseLong(value));
        }
        return sourceOffset;
    }

    private class DebeziumCollector
    implements Collector<T> {
        private final Queue<T> records = new ArrayDeque();

        private DebeziumCollector() {
        }

        public void collect(T record) {
            this.records.add(record);
        }

        public void close() {
        }
    }
}

