/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaInternalProducer<K, V>
extends KafkaProducer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaInternalProducer.class);
    private static final String TRANSACTION_MANAGER_STATE_ENUM = "org.apache.kafka.clients.producer.internals.TransactionManager$State";
    private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
    private String transactionalId;

    public KafkaInternalProducer(Properties properties, String transactionId) {
        super(properties);
        this.transactionalId = transactionId;
    }

    @Override
    public void initTransactions() {
        this.setTransactionalId(this.transactionalId);
        super.initTransactions();
    }

    @Override
    public void beginTransaction() throws ProducerFencedException {
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.beginTransaction. " + this.transactionalId);
        }
        super.beginTransaction();
    }

    @Override
    public void commitTransaction() throws ProducerFencedException {
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.commitTransaction." + this.transactionalId);
        }
        super.commitTransaction();
    }

    @Override
    public void abortTransaction() throws ProducerFencedException {
        super.abortTransaction();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setTransactionalId(String transactionalId) {
        if (log.isDebugEnabled()) {
            log.debug("KafkaInternalProducer.abortTransaction. Target transactionalId=" + transactionalId);
        }
        if (!transactionalId.equals(this.transactionalId)) {
            Object transactionManager;
            if (log.isDebugEnabled()) {
                log.debug("KafkaInternalProducer.abortTransaction. Current transactionalId={} not match target transactionalId={}", (Object)this.transactionalId, (Object)transactionalId);
            }
            Object object = transactionManager = this.getTransactionManager();
            synchronized (object) {
                ReflectionUtils.setField((Object)transactionManager, (String)"transactionalId", (Object)transactionalId);
                ReflectionUtils.setField((Object)transactionManager, (String)"currentState", KafkaInternalProducer.getTransactionManagerState("UNINITIALIZED"));
                this.transactionalId = transactionalId;
            }
        }
    }

    public short getEpoch() {
        Object transactionManager = this.getTransactionManager();
        Optional producerIdAndEpoch = ReflectionUtils.getField((Object)transactionManager, (String)PRODUCER_ID_AND_EPOCH_FIELD_NAME);
        return (Short)ReflectionUtils.getField(producerIdAndEpoch.get(), (String)"epoch").get();
    }

    public long getProducerId() {
        Object transactionManager = this.getTransactionManager();
        Object producerIdAndEpoch = ReflectionUtils.getField((Object)transactionManager, (String)PRODUCER_ID_AND_EPOCH_FIELD_NAME).get();
        return (Long)ReflectionUtils.getField(producerIdAndEpoch, (String)"producerId").get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resumeTransaction(long producerId, short epoch, boolean txnStarted) {
        Object transactionManager;
        log.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, producerId, epoch});
        Object object = transactionManager = this.getTransactionManager();
        synchronized (object) {
            Object topicPartitionBookkeeper = ReflectionUtils.getField((Object)transactionManager, transactionManager.getClass(), (String)"topicPartitionBookkeeper").get();
            KafkaInternalProducer.transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
            ReflectionUtils.invoke(topicPartitionBookkeeper, (String)"reset", (Object[])new Object[0]);
            ReflectionUtils.setField((Object)transactionManager, (String)PRODUCER_ID_AND_EPOCH_FIELD_NAME, (Object)KafkaInternalProducer.createProducerIdAndEpoch(producerId, epoch));
            KafkaInternalProducer.transitionTransactionManagerStateTo(transactionManager, "READY");
            KafkaInternalProducer.transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
            ReflectionUtils.setField((Object)transactionManager, (String)"transactionStarted", (Object)txnStarted);
        }
    }

    public boolean isTxnStarted() {
        Object transactionManager = this.getTransactionManager();
        return (Boolean)ReflectionUtils.getField((Object)transactionManager, (String)"transactionStarted").get();
    }

    private static Object createProducerIdAndEpoch(long producerId, short epoch) {
        try {
            Field field = TransactionManager.class.getDeclaredField(PRODUCER_ID_AND_EPOCH_FIELD_NAME);
            Class<?> clazz = field.getType();
            Constructor<?> constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
            constructor.setAccessible(true);
            return constructor.newInstance(producerId, epoch);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchFieldException | NoSuchMethodException | InvocationTargetException e) {
            throw new KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE, "Incompatible KafkaProducer version", e);
        }
    }

    private Object getTransactionManager() {
        Optional transactionManagerOptional = ReflectionUtils.getField((Object)this, KafkaProducer.class, (String)"transactionManager");
        if (!transactionManagerOptional.isPresent()) {
            throw new KafkaConnectorException((SeaTunnelErrorCode)KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED, "Can't get transactionManager in KafkaProducer");
        }
        return transactionManagerOptional.get();
    }

    private static void transitionTransactionManagerStateTo(Object transactionManager, String state) {
        ReflectionUtils.invoke((Object)transactionManager, (String)"transitionTo", (Object[])new Object[]{KafkaInternalProducer.getTransactionManagerState(state)});
    }

    private static Enum<?> getTransactionManagerState(String enumName) {
        try {
            Class<?> cl = Class.forName(TRANSACTION_MANAGER_STATE_ENUM);
            return Enum.valueOf(cl, enumName);
        }
        catch (ClassNotFoundException e) {
            throw new KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE, "Incompatible KafkaProducer version", e);
        }
    }
}

