/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.elasticsearch.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory;
import org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
import org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost;
import org.apache.flink.elasticsearch7.shaded.org.apache.http.auth.AuthScope;
import org.apache.flink.elasticsearch7.shaded.org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.DocWriteRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkItemResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.delete.DeleteRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.index.IndexRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.update.UpdateRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClientBuilder;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.rest.RestStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ElasticsearchWriter<IN>
implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
    private final ElasticsearchEmitter<? super IN> emitter;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final BulkProcessor bulkProcessor;
    private final RestHighLevelClient client;
    private final RequestIndexer requestIndexer;
    private final Counter numBytesOutCounter;
    private long pendingActions = 0L;
    private boolean checkpointInProgress = false;
    private volatile long lastSendTime = 0L;
    private volatile long ackTime = Long.MAX_VALUE;
    private volatile boolean closed = false;

    ElasticsearchWriter(List<HttpHost> hosts, ElasticsearchEmitter<? super IN> emitter, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig, BulkProcessorBuilderFactory bulkProcessorBuilderFactory, NetworkClientConfig networkClientConfig, SinkWriterMetricGroup metricGroup, MailboxExecutor mailboxExecutor) {
        this.emitter = (ElasticsearchEmitter)Preconditions.checkNotNull(emitter);
        this.flushOnCheckpoint = flushOnCheckpoint;
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mailboxExecutor);
        this.client = new RestHighLevelClient(ElasticsearchWriter.configureRestClientBuilder(RestClient.builder(hosts.toArray(new HttpHost[0])), networkClientConfig));
        this.bulkProcessor = this.createBulkProcessor(bulkProcessorBuilderFactory, bulkProcessorConfig);
        this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
        Preconditions.checkNotNull((Object)metricGroup);
        metricGroup.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTime);
        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
        try {
            emitter.open();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to open the ElasticsearchEmitter", (Throwable)e);
        }
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        this.emitter.emit(element, context, this.requestIndexer);
    }

    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        this.checkpointInProgress = true;
        while (this.pendingActions != 0L && (this.flushOnCheckpoint || endOfInput)) {
            this.bulkProcessor.flush();
            LOG.info("Waiting for the response of {} pending actions.", (Object)this.pendingActions);
            this.mailboxExecutor.yield();
        }
        this.checkpointInProgress = false;
    }

    @VisibleForTesting
    void blockingFlushAllActions() throws InterruptedException {
        while (this.pendingActions != 0L) {
            this.bulkProcessor.flush();
            LOG.info("Waiting for the response of {} pending actions.", (Object)this.pendingActions);
            this.mailboxExecutor.yield();
        }
    }

    public void close() throws Exception {
        this.closed = true;
        this.emitter.close();
        this.bulkProcessor.close();
        this.client.close();
    }

    private static RestClientBuilder configureRestClientBuilder(RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
        if (networkClientConfig.getConnectionPathPrefix() != null) {
            builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
        }
        if (networkClientConfig.getPassword() != null && networkClientConfig.getUsername() != null) {
            BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(networkClientConfig.getUsername(), networkClientConfig.getPassword()));
            builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        }
        if (networkClientConfig.getConnectionRequestTimeout() != null || networkClientConfig.getConnectionTimeout() != null || networkClientConfig.getSocketTimeout() != null) {
            builder.setRequestConfigCallback(requestConfigBuilder -> {
                if (networkClientConfig.getConnectionRequestTimeout() != null) {
                    requestConfigBuilder.setConnectionRequestTimeout(networkClientConfig.getConnectionRequestTimeout());
                }
                if (networkClientConfig.getConnectionTimeout() != null) {
                    requestConfigBuilder.setConnectTimeout(networkClientConfig.getConnectionTimeout());
                }
                if (networkClientConfig.getSocketTimeout() != null) {
                    requestConfigBuilder.setSocketTimeout(networkClientConfig.getSocketTimeout());
                }
                return requestConfigBuilder;
            });
        }
        return builder;
    }

    private BulkProcessor createBulkProcessor(BulkProcessorBuilderFactory bulkProcessorBuilderFactory, BulkProcessorConfig bulkProcessorConfig) {
        BulkProcessor.Builder builder = (BulkProcessor.Builder)bulkProcessorBuilderFactory.apply(this.client, bulkProcessorConfig, new BulkListener());
        builder.setConcurrentRequests(0);
        return builder.build();
    }

    private void enqueueActionInMailbox(ThrowingRunnable<? extends Exception> action, String actionName) {
        if (this.isClosed()) {
            return;
        }
        this.mailboxExecutor.execute(action, actionName);
    }

    private void extractFailures(BulkRequest request, BulkResponse response) {
        if (!response.hasFailures()) {
            this.pendingActions -= (long)request.numberOfActions();
            return;
        }
        Throwable chainedFailures = null;
        for (int i = 0; i < response.getItems().length; ++i) {
            Exception failure;
            BulkItemResponse itemResponse = response.getItems()[i];
            if (!itemResponse.isFailed() || (failure = itemResponse.getFailure().getCause()) == null) continue;
            RestStatus restStatus = itemResponse.getFailure().getStatus();
            DocWriteRequest<?> actionRequest = request.requests().get(i);
            chainedFailures = ExceptionUtils.firstOrSuppressed((Throwable)ElasticsearchWriter.wrapException(restStatus, failure, actionRequest), chainedFailures);
        }
        if (chainedFailures == null) {
            return;
        }
        throw new FlinkRuntimeException(chainedFailures);
    }

    private static Throwable wrapException(RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) {
        if (restStatus == null) {
            return new FlinkRuntimeException(String.format("Single action %s of bulk request failed.", actionRequest), rootFailure);
        }
        return new FlinkRuntimeException(String.format("Single action %s of bulk request failed with status %s.", actionRequest, restStatus.getStatus()), rootFailure);
    }

    private boolean isClosed() {
        if (this.closed) {
            LOG.warn("Writer was closed before all records were acknowledged by Elasticsearch.");
        }
        return this.closed;
    }

    private class DefaultRequestIndexer
    implements RequestIndexer {
        private final Counter numRecordsSendCounter;

        public DefaultRequestIndexer(Counter numRecordsSendCounter) {
            this.numRecordsSendCounter = (Counter)Preconditions.checkNotNull((Object)numRecordsSendCounter);
        }

        @Override
        public void add(DeleteRequest ... deleteRequests) {
            for (DeleteRequest deleteRequest : deleteRequests) {
                this.numRecordsSendCounter.inc();
                ElasticsearchWriter.this.pendingActions++;
                ElasticsearchWriter.this.bulkProcessor.add(deleteRequest);
            }
        }

        @Override
        public void add(IndexRequest ... indexRequests) {
            for (IndexRequest indexRequest : indexRequests) {
                this.numRecordsSendCounter.inc();
                ElasticsearchWriter.this.pendingActions++;
                ElasticsearchWriter.this.bulkProcessor.add(indexRequest);
            }
        }

        @Override
        public void add(UpdateRequest ... updateRequests) {
            for (UpdateRequest updateRequest : updateRequests) {
                this.numRecordsSendCounter.inc();
                ElasticsearchWriter.this.pendingActions++;
                ElasticsearchWriter.this.bulkProcessor.add(updateRequest);
            }
        }
    }

    private class BulkListener
    implements BulkProcessor.Listener {
        private BulkListener() {
        }

        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            LOG.info("Sending bulk of {} actions to Elasticsearch.", (Object)request.numberOfActions());
            ElasticsearchWriter.this.lastSendTime = System.currentTimeMillis();
            ElasticsearchWriter.this.numBytesOutCounter.inc(request.estimatedSizeInBytes());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            ElasticsearchWriter.this.ackTime = System.currentTimeMillis();
            ElasticsearchWriter.this.enqueueActionInMailbox((ThrowingRunnable<Exception>)() -> ElasticsearchWriter.this.extractFailures(request, response), "elasticsearchSuccessCallback");
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            ElasticsearchWriter.this.enqueueActionInMailbox((ThrowingRunnable<Exception>)() -> {
                throw new FlinkRuntimeException("Complete bulk has failed.", failure);
            }, "elasticsearchErrorCallback");
        }
    }
}

