package org.apache.seatunnel.connectors.doris.sink.committer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.rest.RestService;
import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
import org.apache.seatunnel.connectors.doris.util.ResponseUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.class */
public class DorisCommitter implements SinkCommitter<DorisCommitInfo> {
    private static final Logger log = LoggerFactory.getLogger(DorisCommitter.class);
    private static final String COMMIT_PATTERN = "http://%s/api/%s/_stream_load_2pc";
    private static final int HTTP_TEMPORARY_REDIRECT = 200;
    private final CloseableHttpClient httpClient;
    private final DorisConfig dorisConfig;
    int maxRetry;

    public DorisCommitter(Config config) {
        this(DorisConfig.loadConfig(config), DorisConfig.loadConfig(config).getMaxRetries().intValue(), new HttpUtil().getHttpClient());
    }

    public DorisCommitter(DorisConfig dorisConfig, int i, CloseableHttpClient closeableHttpClient) {
        this.dorisConfig = dorisConfig;
        this.maxRetry = i;
        this.httpClient = closeableHttpClient;
    }

    public List<DorisCommitInfo> commit(List<DorisCommitInfo> list) throws IOException {
        Iterator<DorisCommitInfo> it = list.iterator();
        while (it.hasNext()) {
            commitTransaction(it.next());
        }
        return Collections.emptyList();
    }

    public void abort(List<DorisCommitInfo> list) throws IOException {
        Iterator<DorisCommitInfo> it = list.iterator();
        while (it.hasNext()) {
            abortTransaction(it.next());
        }
    }

    private void commitTransaction(DorisCommitInfo dorisCommitInfo) throws IOException, DorisConnectorException {
        int i = -1;
        String str = null;
        int i2 = 0;
        String hostPort = dorisCommitInfo.getHostPort();
        CloseableHttpResponse closeableHttpResponse = null;
        while (true) {
            int i3 = i2;
            i2++;
            if (i3 > this.maxRetry) {
                break;
            }
            HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
            httpPutBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort, dorisCommitInfo.getDb())).baseAuth(this.dorisConfig.getUsername(), this.dorisConfig.getPassword()).addCommonHeader().addTxnId(dorisCommitInfo.getTxbID()).setEmptyEntity().commit();
            try {
                closeableHttpResponse = this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
                i = closeableHttpResponse.getStatusLine().getStatusCode();
                str = closeableHttpResponse.getStatusLine().getReasonPhrase();
            } catch (IOException e) {
                log.error("commit transaction failed: ", e);
                hostPort = RestService.getBackend(this.dorisConfig, log);
            }
            if (i == 200) {
                break;
            }
            log.warn("commit failed with {}, reason {}", hostPort, str);
            hostPort = RestService.getBackend(this.dorisConfig, log);
        }
        if (i != 200) {
            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, str);
        }
        ObjectMapper objectMapper = new ObjectMapper();
        if (closeableHttpResponse == null || closeableHttpResponse.getEntity() == null) {
            return;
        }
        String entityUtils = EntityUtils.toString(closeableHttpResponse.getEntity());
        Map map = (Map) objectMapper.readValue(entityUtils, new TypeReference<HashMap<String, String>>() { // from class: org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter.1
        });
        if (((String) map.get("status")).equals(LoadStatus.FAIL) && !ResponseUtil.isCommitted((String) map.get("msg"))) {
            throw new DorisConnectorException(DorisConnectorErrorCode.COMMIT_FAILED, entityUtils);
        }
        log.info("load result {}", entityUtils);
    }

    private void abortTransaction(DorisCommitInfo dorisCommitInfo) throws IOException, DorisConnectorException {
        int i = 0;
        String hostPort = dorisCommitInfo.getHostPort();
        CloseableHttpResponse closeableHttpResponse = null;
        do {
            int i2 = i;
            i++;
            if (i2 > this.maxRetry) {
                Map map = (Map) new ObjectMapper().readValue(EntityUtils.toString(closeableHttpResponse.getEntity()), new TypeReference<HashMap<String, String>>() { // from class: org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter.2
                });
                if (LoadStatus.SUCCESS.equals(map.get("status"))) {
                    return;
                }
                if (ResponseUtil.isCommitted((String) map.get("msg"))) {
                    throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, "try abort committed transaction, do you recover from old savepoint?");
                }
                log.warn("Fail to abort transaction. txnId: {}, error: {}", Long.valueOf(dorisCommitInfo.getTxbID()), map.get("msg"));
                return;
            }
            HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
            httpPutBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort, dorisCommitInfo.getDb())).baseAuth(this.dorisConfig.getUsername(), this.dorisConfig.getPassword()).addCommonHeader().addTxnId(dorisCommitInfo.getTxbID()).setEmptyEntity().abort();
            closeableHttpResponse = this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
            if (closeableHttpResponse.getStatusLine().getStatusCode() != 200) {
                break;
            }
        } while (closeableHttpResponse.getEntity() != null);
        log.warn("abort transaction response: " + closeableHttpResponse.getStatusLine().toString());
        throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, "Fail to abort transaction " + dorisCommitInfo.getTxbID() + " with url " + String.format(COMMIT_PATTERN, hostPort, dorisCommitInfo.getDb()));
    }
}
