/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.gcp.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

public class PubSubSink<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class);
    private final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference();
    private final ApiFutureCallback<String> failureHandler = new FailureHandler();
    private final AtomicInteger numPendingFutures = new AtomicInteger(0);
    private final Credentials credentials;
    private final SerializationSchema<IN> serializationSchema;
    private final String projectName;
    private final String topicName;
    private final String hostAndPortForEmulator;
    private transient Publisher publisher;
    private volatile boolean isRunning;
    private transient ManagedChannel managedChannel = null;
    private transient TransportChannel channel = null;

    private PubSubSink(Credentials credentials, SerializationSchema<IN> serializationSchema, String projectName, String topicName, String hostAndPortForEmulator) {
        this.credentials = credentials;
        this.serializationSchema = serializationSchema;
        this.projectName = projectName;
        this.topicName = topicName;
        this.hostAndPortForEmulator = hostAndPortForEmulator;
    }

    public void open(Configuration configuration) throws Exception {
        this.serializationSchema.open(RuntimeContextInitializationContextAdapters.serializationAdapter((RuntimeContext)this.getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        Publisher.Builder builder = Publisher.newBuilder((TopicName)TopicName.of((String)this.projectName, (String)this.topicName)).setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.credentials));
        if (this.hostAndPortForEmulator != null) {
            this.managedChannel = ManagedChannelBuilder.forTarget((String)this.hostAndPortForEmulator).usePlaintext().build();
            this.channel = GrpcTransportChannel.newBuilder().setManagedChannel(this.managedChannel).build();
            builder.setChannelProvider((TransportChannelProvider)FixedTransportChannelProvider.create((TransportChannel)this.channel)).setCredentialsProvider((CredentialsProvider)EmulatorCredentialsProvider.create()).setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(10).setTotalTimeout(Duration.ofSeconds((long)10L)).setInitialRetryDelay(Duration.ofMillis((long)100L)).setRetryDelayMultiplier(1.3).setMaxRetryDelay(Duration.ofSeconds((long)5L)).setInitialRpcTimeout(Duration.ofSeconds((long)5L)).setRpcTimeoutMultiplier(1.0).setMaxRpcTimeout(Duration.ofSeconds((long)10L)).build());
        }
        this.publisher = builder.build();
        this.isRunning = true;
    }

    public void close() throws Exception {
        super.close();
        this.shutdownPublisher();
        this.shutdownTransportChannel();
        this.shutdownManagedChannel();
        this.isRunning = false;
    }

    private void shutdownPublisher() {
        try {
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
        }
        catch (Exception e) {
            LOG.info("Shutting down Publisher failed.", (Throwable)e);
        }
    }

    private void shutdownTransportChannel() {
        if (this.channel == null) {
            return;
        }
        try {
            this.channel.close();
        }
        catch (Exception e) {
            LOG.info("Shutting down TransportChannel failed.", (Throwable)e);
        }
    }

    private void shutdownManagedChannel() {
        if (this.managedChannel == null) {
            return;
        }
        try {
            this.managedChannel.shutdownNow();
            this.managedChannel.awaitTermination(1000L, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            LOG.info("Shutting down ManagedChannel failed.", (Throwable)e);
        }
    }

    public void invoke(IN message, SinkFunction.Context context) {
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])this.serializationSchema.serialize(message))).build();
        ApiFuture future = this.publisher.publish(pubsubMessage);
        this.numPendingFutures.incrementAndGet();
        ApiFutures.addCallback((ApiFuture)future, this.failureHandler, (Executor)Executors.directExecutor());
    }

    public static SerializationSchemaBuilder newBuilder() {
        return new SerializationSchemaBuilder();
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.publisher.publishAllOutstanding();
        this.waitForFuturesToComplete();
        if (this.exceptionAtomicReference.get() != null) {
            throw this.exceptionAtomicReference.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForFuturesToComplete() {
        AtomicInteger atomicInteger = this.numPendingFutures;
        synchronized (atomicInteger) {
            while (this.isRunning && this.numPendingFutures.get() > 0) {
                try {
                    this.numPendingFutures.wait();
                }
                catch (InterruptedException e) {
                    LOG.info("Interrupted when waiting for futures to complete");
                }
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) {
    }

    private class FailureHandler
    implements ApiFutureCallback<String>,
    Serializable {
        private FailureHandler() {
        }

        public void onFailure(Throwable t) {
            this.ackAndMaybeNotifyNoPendingFutures();
            PubSubSink.this.exceptionAtomicReference.set(new RuntimeException("Failed trying to publish message", t));
        }

        public void onSuccess(String result) {
            this.ackAndMaybeNotifyNoPendingFutures();
            LOG.debug("Successfully published message with id: {}", (Object)result);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void ackAndMaybeNotifyNoPendingFutures() {
            if (PubSubSink.this.numPendingFutures.decrementAndGet() == 0) {
                AtomicInteger atomicInteger = PubSubSink.this.numPendingFutures;
                synchronized (atomicInteger) {
                    PubSubSink.this.numPendingFutures.notify();
                }
            }
        }
    }

    public static interface TopicNameBuilder<IN> {
        public PubSubSinkBuilder<IN> withTopicName(String var1);
    }

    public static interface ProjectNameBuilder<IN> {
        public TopicNameBuilder<IN> withProjectName(String var1);
    }

    public static class SerializationSchemaBuilder {
        public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> deserializationSchema) {
            return new PubSubSinkBuilder(deserializationSchema);
        }
    }

    public static class PubSubSinkBuilder<IN>
    implements ProjectNameBuilder<IN>,
    TopicNameBuilder<IN> {
        private SerializationSchema<IN> serializationSchema;
        private String projectName;
        private String topicName;
        private Credentials credentials;
        private String hostAndPort;

        private PubSubSinkBuilder(SerializationSchema<IN> serializationSchema) {
            this.serializationSchema = serializationSchema;
        }

        public PubSubSinkBuilder<IN> withCredentials(Credentials credentials) {
            this.credentials = credentials;
            return this;
        }

        @Override
        public TopicNameBuilder<IN> withProjectName(String projectName) {
            Preconditions.checkNotNull((Object)projectName);
            this.projectName = projectName;
            return this;
        }

        @Override
        public PubSubSinkBuilder<IN> withTopicName(String topicName) {
            Preconditions.checkNotNull((Object)topicName);
            this.topicName = topicName;
            return this;
        }

        public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort) {
            this.hostAndPort = hostAndPort;
            return this;
        }

        public PubSubSink<IN> build() throws IOException {
            if (this.credentials == null) {
                this.credentials = this.hostAndPort == null ? SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build().getCredentials() : EmulatorCredentials.getInstance();
            }
            return new PubSubSink(this.credentials, this.serializationSchema, this.projectName, this.topicName, this.hostAndPort);
        }
    }
}

