/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.gcp.pubsub;

import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;

public class BlockingGrpcPubSubSubscriber
implements PubSubSubscriber {
    private final String projectSubscriptionName;
    private final ManagedChannel channel;
    private final SubscriberGrpc.SubscriberBlockingStub stub;
    private final int retries;
    private final Duration timeout;
    private final PullRequest pullRequest;

    public BlockingGrpcPubSubSubscriber(String projectSubscriptionName, ManagedChannel channel, SubscriberGrpc.SubscriberBlockingStub stub, PullRequest pullRequest, int retries, Duration timeout) {
        this.projectSubscriptionName = projectSubscriptionName;
        this.channel = channel;
        this.stub = stub;
        this.retries = retries;
        this.timeout = timeout;
        this.pullRequest = pullRequest;
    }

    @Override
    public List<ReceivedMessage> pull() {
        return this.pull(this.retries);
    }

    private List<ReceivedMessage> pull(int retriesRemaining) {
        try {
            return ((SubscriberGrpc.SubscriberBlockingStub)this.stub.withDeadlineAfter(this.timeout.toMillis(), TimeUnit.MILLISECONDS)).pull(this.pullRequest).getReceivedMessagesList();
        }
        catch (StatusRuntimeException e) {
            if (retriesRemaining > 0) {
                return this.pull(retriesRemaining - 1);
            }
            throw e;
        }
    }

    @Override
    public void acknowledge(List<String> acknowledgementIds) {
        if (acknowledgementIds.isEmpty()) {
            return;
        }
        Tuple2<List<String>, List<String>> splittedAckIds = this.splitAckIds(acknowledgementIds);
        while (!((List)splittedAckIds.f0).isEmpty()) {
            AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(this.projectSubscriptionName).addAllAckIds((Iterable)splittedAckIds.f0).build();
            ((SubscriberGrpc.SubscriberBlockingStub)this.stub.withDeadlineAfter(60L, TimeUnit.SECONDS)).acknowledge(acknowledgeRequest);
            splittedAckIds = this.splitAckIds((List)splittedAckIds.f1);
        }
    }

    private Tuple2<List<String>, List<String>> splitAckIds(List<String> ackIds) {
        int maxPayload = 512000;
        int fixedOverheadPerCall = 100;
        int overheadPerId = 3;
        int totalBytes = 100;
        for (int i = 0; i < ackIds.size(); ++i) {
            if ((totalBytes += ackIds.get(i).length() + 3) <= 512000) continue;
            return Tuple2.of(ackIds.subList(0, i), ackIds.subList(i, ackIds.size()));
        }
        return Tuple2.of(ackIds, Collections.emptyList());
    }

    @Override
    public void close() throws Exception {
        this.channel.shutdownNow();
        this.channel.awaitTermination(20L, TimeUnit.SECONDS);
    }
}

