/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.gcp.pubsub;

import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory;
import org.apache.flink.streaming.connectors.gcp.pubsub.DeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class PubSubSource<OUT>
extends RichSourceFunction<OUT>
implements ResultTypeQueryable<OUT>,
ParallelSourceFunction<OUT>,
CheckpointListener,
ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
    protected final PubSubDeserializationSchema<OUT> deserializationSchema;
    protected final PubSubSubscriberFactory pubSubSubscriberFactory;
    protected final Credentials credentials;
    protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory;
    protected final FlinkConnectorRateLimiter rateLimiter;
    protected final int messagePerSecondRateLimit;
    protected transient AcknowledgeOnCheckpoint<String> acknowledgeOnCheckpoint;
    protected transient PubSubSubscriber subscriber;
    protected volatile transient boolean isRunning;

    PubSubSource(PubSubDeserializationSchema<OUT> deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory, FlinkConnectorRateLimiter rateLimiter, int messagePerSecondRateLimit) {
        this.deserializationSchema = deserializationSchema;
        this.pubSubSubscriberFactory = pubSubSubscriberFactory;
        this.credentials = credentials;
        this.acknowledgeOnCheckpointFactory = acknowledgeOnCheckpointFactory;
        this.rateLimiter = rateLimiter;
        this.messagePerSecondRateLimit = messagePerSecondRateLimit;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (this.hasNoCheckpointingEnabled(this.getRuntimeContext())) {
            throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and the checkpointing frequency must be MUCH lower than the PubSub timeout for it to retry a message.");
        }
        this.getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);
        this.rateLimiter.setRate((long)(this.messagePerSecondRateLimit * this.getRuntimeContext().getNumberOfParallelSubtasks()));
        this.rateLimiter.open(this.getRuntimeContext());
        this.deserializationSchema.open(RuntimeContextInitializationContextAdapters.deserializationAdapter((RuntimeContext)this.getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        this.createAndSetPubSubSubscriber();
        this.isRunning = true;
    }

    private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) {
        return !(runtimeContext instanceof StreamingRuntimeContext) || !((StreamingRuntimeContext)runtimeContext).isCheckpointingEnabled();
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        PubSubCollector collector = new PubSubCollector(sourceContext);
        while (this.isRunning) {
            try {
                this.processMessage(sourceContext, this.subscriber.pull(), collector);
            }
            catch (InterruptedException | CancellationException e) {
                this.isRunning = false;
            }
        }
    }

    public void close() throws Exception {
        this.subscriber.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMessage(SourceFunction.SourceContext<OUT> sourceContext, List<ReceivedMessage> messages, PubSubCollector collector) throws Exception {
        this.rateLimiter.acquire(messages.size());
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            for (ReceivedMessage message : messages) {
                this.acknowledgeOnCheckpoint.addAcknowledgeId(message.getAckId());
                PubsubMessage pubsubMessage = message.getMessage();
                this.deserializationSchema.deserialize(pubsubMessage, collector);
                if (!collector.isEndOfStreamSignalled()) continue;
                this.cancel();
                return;
            }
        }
    }

    private Integer getOutstandingMessagesToAck() {
        return this.acknowledgeOnCheckpoint.numberOfOutstandingAcknowledgements();
    }

    public void cancel() {
        this.isRunning = false;
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public static DeserializationSchemaBuilder newBuilder() {
        return new DeserializationSchemaBuilder();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.acknowledgeOnCheckpoint.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public List<AcknowledgeIdsForCheckpoint<String>> snapshotState(long checkpointId, long timestamp) throws Exception {
        return this.acknowledgeOnCheckpoint.snapshotState(checkpointId, timestamp);
    }

    public void restoreState(List<AcknowledgeIdsForCheckpoint<String>> state) throws Exception {
        this.createAndSetPubSubSubscriber();
        this.acknowledgeOnCheckpoint.restoreState(state);
    }

    private void createAndSetPubSubSubscriber() throws Exception {
        if (this.subscriber == null) {
            this.subscriber = this.pubSubSubscriberFactory.getSubscriber(this.credentials);
        }
        if (this.acknowledgeOnCheckpoint == null) {
            this.acknowledgeOnCheckpoint = this.acknowledgeOnCheckpointFactory.create(this.subscriber);
        }
    }

    static class AcknowledgeOnCheckpointFactory
    implements Serializable {
        AcknowledgeOnCheckpointFactory() {
        }

        AcknowledgeOnCheckpoint<String> create(Acknowledger<String> acknowledger) {
            return new AcknowledgeOnCheckpoint<String>(acknowledger);
        }
    }

    public static interface SubscriptionNameBuilder<OUT> {
        public PubSubSourceBuilder<OUT> withSubscriptionName(String var1);
    }

    public static interface ProjectNameBuilder<OUT> {
        public SubscriptionNameBuilder<OUT> withProjectName(String var1);
    }

    public static class DeserializationSchemaBuilder {
        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema) {
            return new PubSubSourceBuilder(deserializationSchema);
        }

        public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema) {
            return new PubSubSourceBuilder(deserializationSchema);
        }
    }

    public static class PubSubSourceBuilder<OUT>
    implements ProjectNameBuilder<OUT>,
    SubscriptionNameBuilder<OUT> {
        private final PubSubDeserializationSchema<OUT> deserializationSchema;
        private String projectName;
        private String subscriptionName;
        private PubSubSubscriberFactory pubSubSubscriberFactory;
        private Credentials credentials;
        private int messagePerSecondRateLimit = 100000;

        private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
            Preconditions.checkNotNull(deserializationSchema);
            this.deserializationSchema = new DeserializationSchemaWrapper<OUT>(deserializationSchema);
        }

        private PubSubSourceBuilder(PubSubDeserializationSchema<OUT> deserializationSchema) {
            Preconditions.checkNotNull(deserializationSchema);
            this.deserializationSchema = deserializationSchema;
        }

        @Override
        public SubscriptionNameBuilder<OUT> withProjectName(String projectName) {
            Preconditions.checkNotNull((Object)projectName);
            this.projectName = projectName;
            return this;
        }

        @Override
        public PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName) {
            Preconditions.checkNotNull((Object)subscriptionName);
            this.subscriptionName = subscriptionName;
            return this;
        }

        public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials) {
            this.credentials = credentials;
            return this;
        }

        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory pubSubSubscriberFactory) {
            this.pubSubSubscriberFactory = pubSubSubscriberFactory;
            return this;
        }

        public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries) {
            this.pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(ProjectSubscriptionName.format((String)this.projectName, (String)this.subscriptionName), retries, perRequestTimeout, maxMessagesPerPull);
            return this;
        }

        public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit) {
            this.messagePerSecondRateLimit = messagePerSecondRateLimit;
            return this;
        }

        public PubSubSource<OUT> build() throws IOException {
            if (this.credentials == null) {
                this.credentials = SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build().getCredentials();
            }
            if (this.pubSubSubscriberFactory == null) {
                this.pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(ProjectSubscriptionName.format((String)this.projectName, (String)this.subscriptionName), 3, Duration.ofSeconds(15L), 100);
            }
            return new PubSubSource<OUT>(this.deserializationSchema, this.pubSubSubscriberFactory, this.credentials, new AcknowledgeOnCheckpointFactory(), (FlinkConnectorRateLimiter)new GuavaFlinkConnectorRateLimiter(), this.messagePerSecondRateLimit);
        }
    }

    private class PubSubCollector
    implements Collector<OUT> {
        private final SourceFunction.SourceContext<OUT> ctx;
        private boolean endOfStreamSignalled = false;

        private PubSubCollector(SourceFunction.SourceContext<OUT> ctx) {
            this.ctx = ctx;
        }

        public void collect(OUT record) {
            if (this.endOfStreamSignalled || PubSubSource.this.deserializationSchema.isEndOfStream(record)) {
                this.endOfStreamSignalled = true;
                return;
            }
            this.ctx.collect(record);
        }

        public boolean isEndOfStreamSignalled() {
            return this.endOfStreamSignalled;
        }

        public void close() {
        }
    }
}

