/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.heartbeat;

import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Threads;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HeartbeatImpl
implements Heartbeat {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
    private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
    static final int DEFAULT_HEARTBEAT_INTERVAL = 0;
    static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
    private static final String SERVER_NAME_KEY = "serverName";
    private static final Schema KEY_SCHEMA = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey")).field("serverName", Schema.STRING_SCHEMA).build();
    private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat")).field("ts_ms", Schema.INT64_SCHEMA).build();
    private final String topicName;
    private final Duration heartbeatInterval;
    private final String key;
    private volatile Threads.Timer heartbeatTimeout;

    HeartbeatImpl(Duration heartbeatInterval, String topicName, String key) {
        this.topicName = topicName;
        this.key = key;
        this.heartbeatInterval = heartbeatInterval;
        this.heartbeatTimeout = this.resetHeartbeat();
    }

    @Override
    public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (this.heartbeatTimeout.expired()) {
            this.forcedBeat(partition, offset, consumer);
            this.heartbeatTimeout = this.resetHeartbeat();
        }
    }

    @Override
    public void heartbeat(Map<String, ?> partition, Heartbeat.OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (this.heartbeatTimeout.expired()) {
            this.forcedBeat(partition, offsetProducer.offset(), consumer);
            this.heartbeatTimeout = this.resetHeartbeat();
        }
    }

    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        LOGGER.debug("Generating heartbeat event");
        if (offset == null || offset.isEmpty()) {
            return;
        }
        consumer.accept(this.heartbeatRecord(partition, offset));
    }

    @Override
    public boolean isEnabled() {
        return true;
    }

    private Struct serverNameKey(String serverName) {
        Struct result = new Struct(KEY_SCHEMA);
        result.put(SERVER_NAME_KEY, (Object)serverName);
        return result;
    }

    private Struct messageValue() {
        Struct result = new Struct(VALUE_SCHEMA);
        result.put("ts_ms", (Object)Instant.now().toEpochMilli());
        return result;
    }

    private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        Integer partition = 0;
        return new SourceRecord(sourcePartition, sourceOffset, this.topicName, partition, KEY_SCHEMA, (Object)this.serverNameKey(this.key), VALUE_SCHEMA, this.messageValue());
    }

    private Threads.Timer resetHeartbeat() {
        return Threads.timer(Clock.SYSTEM, this.heartbeatInterval);
    }
}

