/*
 * Decompiled with CFR 0.152.
 */
package io.choerodon.asgard.saga.consumer;

import io.choerodon.asgard.common.AbstractAsgardConsumer;
import io.choerodon.asgard.common.ApplicationContextHelper;
import io.choerodon.asgard.common.InstanceResultUtils;
import io.choerodon.asgard.common.UpdateStatusDTO;
import io.choerodon.asgard.saga.SagaDefinition;
import io.choerodon.asgard.saga.SagaProperties;
import io.choerodon.asgard.saga.annotation.SagaTask;
import io.choerodon.asgard.saga.consumer.SagaTaskInvokeBean;
import io.choerodon.asgard.saga.dto.PollSagaTaskInstanceDTO;
import io.choerodon.asgard.saga.dto.SagaTaskInstanceDTO;
import io.choerodon.asgard.saga.feign.SagaConsumerClient;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;

public class SagaConsumer
extends AbstractAsgardConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SagaConsumer.class);
    static final Map<String, SagaTaskInvokeBean> invokeBeanMap = new HashMap<String, SagaTaskInvokeBean>();
    private SagaConsumerClient consumerClient;
    private PollSagaTaskInstanceDTO pollDTO;
    private SagaProperties properties;

    public SagaConsumer(String service, String instance, PlatformTransactionManager transactionManager, Executor executor, ScheduledExecutorService scheduledExecutorService, ApplicationContextHelper contextHelper, long pollIntervalMs) {
        super(service, instance, transactionManager, executor, scheduledExecutorService, contextHelper, pollIntervalMs);
    }

    public void setConsumerClient(SagaConsumerClient consumerClient) {
        this.consumerClient = consumerClient;
    }

    public void setProperties(SagaProperties properties) {
        this.properties = properties;
    }

    @Override
    public void scheduleRunning(String instance) {
        this.consumerClient.pollBatch(this.getPollDTO()).forEach(t -> {
            LOGGER.trace("SagaConsumer polled sagaTaskInstances: {}", t);
            this.runningTasks.add(t.getId());
            ((CompletableFuture)CompletableFuture.supplyAsync(() -> this.invoke((SagaTaskInstanceDTO)t), this.executor).exceptionally(ex -> {
                LOGGER.warn("@SagaTask method code: {}, id: {} supplyAsync failed", new Object[]{t.getTaskCode(), t.getId(), ex});
                return null;
            })).thenAccept(i -> LOGGER.trace("@SagaTask method code: {}, id: {} supplyAsync completed", (Object)t.getTaskCode(), (Object)t.getId()));
        });
    }

    private PollSagaTaskInstanceDTO getPollDTO() {
        if (this.pollDTO == null) {
            this.pollDTO = new PollSagaTaskInstanceDTO(this.instance, this.service, this.properties.getConsumer().getMaxPollSize(), this.runningTasks);
        }
        return this.pollDTO;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SagaTaskInstanceDTO invoke(SagaTaskInstanceDTO data) {
        SagaTaskInvokeBean invokeBean = invokeBeanMap.get(data.getSagaCode() + data.getTaskCode());
        SagaTask sagaTask = invokeBean.sagaTask;
        PlatformTransactionManager platformTransactionManager = this.getSagaTaskTransactionManager(sagaTask.transactionManager());
        TransactionStatus status = this.createTransactionStatus(this.transactionManager, sagaTask.transactionIsolation().value());
        this.beforeInvoke(data.getUserDetails());
        try {
            invokeBean.method.setAccessible(true);
            Object result = invokeBean.method.invoke(invokeBean.object, data.getInput());
            this.consumerClient.updateStatus(data.getId(), UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.COMPLETED.name()).withOutput(InstanceResultUtils.resultToJson(result, this.objectMapper)).withId(data.getId()).withObjectVersionNumber(data.getObjectVersionNumber()).build());
            this.runningTasks.remove(data.getId());
            platformTransactionManager.commit(status);
        }
        catch (Exception e) {
            LOGGER.info("@SagaTask method code: {}, id: {} invoke error", new Object[]{data.getTaskCode(), data.getId(), InstanceResultUtils.getLoggerException(e)});
            String errorMsg = InstanceResultUtils.getErrorInfoFromException(e);
            this.invokeError(platformTransactionManager, status, data, errorMsg);
        }
        finally {
            this.afterInvoke();
        }
        return data;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeError(PlatformTransactionManager platformTransactionManager, TransactionStatus status, SagaTaskInstanceDTO data, String errorMsg) {
        try {
            platformTransactionManager.rollback(status);
        }
        catch (Exception e) {
            LOGGER.warn("@SagaTask method code: {}, id: {} transaction rollback error", new Object[]{data.getTaskCode(), data.getId(), e});
        }
        finally {
            try {
                this.consumerClient.updateStatus(data.getId(), UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.FAILED.name()).withExceptionMessage(errorMsg).withId(data.getId()).withObjectVersionNumber(data.getObjectVersionNumber()).build());
                this.runningTasks.remove(data.getId());
            }
            catch (Exception ex) {
                CompletableFuture.supplyAsync(() -> this.retryUpdateStatusFailed(data.getId(), errorMsg), this.executor);
            }
        }
    }

    private Long retryUpdateStatusFailed(Long id, String errorMsg) {
        while (true) {
            try {
                Thread.sleep(1000L);
                SagaTaskInstanceDTO dto = this.consumerClient.queryStatus(id);
                if (dto == null) {
                    this.runningTasks.remove(id);
                    LOGGER.error("@SagaTask method id: {} queryStatus failed", (Object)id);
                    return id;
                }
                this.consumerClient.updateStatus(id, UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.FAILED.name()).withExceptionMessage(errorMsg).withId(id).withObjectVersionNumber(dto.getObjectVersionNumber()).build());
                this.runningTasks.remove(id);
            }
            catch (InterruptedException e) {
                this.runningTasks.remove(id);
                Thread.currentThread().interrupt();
                LOGGER.error("@SagaTask method id: {} retry to updateStatus failed, thread is Interrupted", (Object)id, (Object)e);
                continue;
            }
            catch (Exception e) {
                LOGGER.debug("@SagaTask method id: {} auto retry to updateStatus failed", (Object)id, (Object)e);
                continue;
            }
            break;
        }
        return id;
    }
}

