package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.Security;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/AvroSource.class */
public class AvroSource extends AbstractSource implements EventDrivenSource, Configurable, AvroSourceProtocol {
    private static final String THREADS = "threads";
    private static final Logger logger = LoggerFactory.getLogger(AvroSource.class);
    private static final String PORT_KEY = "port";
    private static final String BIND_KEY = "bind";
    private static final String COMPRESSION_TYPE = "compression-type";
    private static final String SSL_KEY = "ssl";
    private static final String KEYSTORE_KEY = "keystore";
    private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
    private static final String KEYSTORE_TYPE_KEY = "keystore-type";
    private int port;
    private String bindAddress;
    private String compressionType;
    private String keystore;
    private String keystorePassword;
    private String keystoreType;
    private boolean enableSsl = false;
    private Server server;
    private SourceCounter sourceCounter;
    private int maxThreads;
    private ScheduledExecutorService connectionCountUpdater;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/source/AvroSource$SSLCompressionChannelPipelineFactory.class */
    public static class SSLCompressionChannelPipelineFactory implements ChannelPipelineFactory {
        private boolean enableCompression;
        private boolean enableSsl;
        private String keystore;
        private String keystorePassword;
        private String keystoreType;

        public SSLCompressionChannelPipelineFactory(boolean z, boolean z2, String str, String str2, String str3) {
            this.enableCompression = z;
            this.enableSsl = z2;
            this.keystore = str;
            this.keystorePassword = str2;
            this.keystoreType = str3;
        }

        private SSLContext createServerSSLContext() {
            try {
                KeyStore keyStore = KeyStore.getInstance(this.keystoreType);
                keyStore.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
                KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(getAlgorithm());
                keyManagerFactory.init(keyStore, this.keystorePassword.toCharArray());
                SSLContext sSLContext = SSLContext.getInstance("TLS");
                sSLContext.init(keyManagerFactory.getKeyManagers(), null, null);
                return sSLContext;
            } catch (Exception e) {
                throw new Error("Failed to initialize the server-side SSLContext", e);
            }
        }

        private String getAlgorithm() {
            String property = Security.getProperty("ssl.KeyManagerFactory.algorithm");
            if (property == null) {
                property = "SunX509";
            }
            return property;
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.enableCompression) {
                pipeline.addFirst("deflater", new ZlibEncoder(6));
                pipeline.addFirst("inflater", new ZlibDecoder());
            }
            if (this.enableSsl) {
                SSLEngine createSSLEngine = createServerSSLContext().createSSLEngine();
                createSSLEngine.setUseClientMode(false);
                pipeline.addFirst(AvroSource.SSL_KEY, new SslHandler(createSSLEngine));
            }
            return pipeline;
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, "port", "bind");
        this.port = context.getInteger("port").intValue();
        this.bindAddress = context.getString("bind");
        this.compressionType = context.getString(COMPRESSION_TYPE, "none");
        try {
            this.maxThreads = context.getInteger("threads", 0).intValue();
        } catch (NumberFormatException e) {
            logger.warn("AVRO source's \"threads\" property must specify an integer value.", context.getString("threads"));
        }
        this.enableSsl = context.getBoolean(SSL_KEY, false).booleanValue();
        this.keystore = context.getString(KEYSTORE_KEY);
        this.keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
        this.keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
        if (this.enableSsl) {
            Preconditions.checkNotNull(this.keystore, "keystore must be specified when SSL is enabled");
            Preconditions.checkNotNull(this.keystorePassword, "keystore-password must be specified when SSL is enabled");
            try {
                KeyStore.getInstance(this.keystoreType).load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
            } catch (Exception e2) {
                throw new FlumeException("Avro source configured with invalid keystore: " + this.keystore, e2);
            }
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Starting {}...", this);
        this.server = new NettyServer(new SpecificResponder(AvroSourceProtocol.class, this), new InetSocketAddress(this.bindAddress, this.port), initSocketChannelFactory(), initChannelPipelineFactory(), (ExecutionHandler) null);
        this.connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
        this.server.start();
        this.sourceCounter.start();
        super.start();
        final NettyServer nettyServer = this.server;
        this.connectionCountUpdater.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flume.source.AvroSource.1
            @Override // java.lang.Runnable
            public void run() {
                AvroSource.this.sourceCounter.setOpenConnectionCount(Long.valueOf(nettyServer.getNumActiveConnections()).longValue());
            }
        }, 0L, 60L, TimeUnit.SECONDS);
        logger.info("Avro source {} started.", getName());
    }

    private NioServerSocketChannelFactory initSocketChannelFactory() {
        return this.maxThreads <= 0 ? new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()) : new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newFixedThreadPool(this.maxThreads));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.flume.source.AvroSource$2] */
    private ChannelPipelineFactory initChannelPipelineFactory() {
        boolean equalsIgnoreCase = this.compressionType.equalsIgnoreCase("deflate");
        return (equalsIgnoreCase || this.enableSsl) ? new SSLCompressionChannelPipelineFactory(equalsIgnoreCase, this.enableSsl, this.keystore, this.keystorePassword, this.keystoreType) : new ChannelPipelineFactory() { // from class: org.apache.flume.source.AvroSource.2
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline();
            }
        };
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Avro source {} stopping: {}", getName(), this);
        this.server.close();
        try {
            this.server.join();
        } catch (InterruptedException e) {
            logger.info("Avro source " + getName() + ": Interrupted while waiting for Avro server to stop. Exiting. Exception follows.", e);
        }
        this.sourceCounter.stop();
        this.connectionCountUpdater.shutdown();
        while (!this.connectionCountUpdater.isTerminated()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e2) {
                logger.error("Interrupted while waiting for connection count executor to terminate", e2);
                Throwables.propagate(e2);
            }
        }
        super.stop();
        logger.info("Avro source {} stopped. Metrics: {}", getName(), this.sourceCounter);
    }

    @Override // org.apache.flume.source.AbstractSource
    public String toString() {
        return "Avro source " + getName() + ": { bindAddress: " + this.bindAddress + ", port: " + this.port + " }";
    }

    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<CharSequence, CharSequence> entry : map.entrySet()) {
            hashMap.put(entry.getKey().toString(), entry.getValue().toString());
        }
        return hashMap;
    }

    public Status append(AvroFlumeEvent avroFlumeEvent) {
        logger.debug("Avro source {}: Received avro event: {}", getName(), avroFlumeEvent);
        this.sourceCounter.incrementAppendReceivedCount();
        this.sourceCounter.incrementEventReceivedCount();
        try {
            getChannelProcessor().processEvent(EventBuilder.withBody(avroFlumeEvent.getBody().array(), toStringMap(avroFlumeEvent.getHeaders())));
            this.sourceCounter.incrementAppendAcceptedCount();
            this.sourceCounter.incrementEventAcceptedCount();
            return Status.OK;
        } catch (ChannelException e) {
            logger.warn("Avro source " + getName() + ": Unable to process event. Exception follows.", e);
            return Status.FAILED;
        }
    }

    public Status appendBatch(List<AvroFlumeEvent> list) {
        logger.debug("Avro source {}: Received avro event batch of {} events.", getName(), Integer.valueOf(list.size()));
        this.sourceCounter.incrementAppendBatchReceivedCount();
        this.sourceCounter.addToEventReceivedCount(list.size());
        ArrayList arrayList = new ArrayList();
        for (AvroFlumeEvent avroFlumeEvent : list) {
            arrayList.add(EventBuilder.withBody(avroFlumeEvent.getBody().array(), toStringMap(avroFlumeEvent.getHeaders())));
        }
        try {
            getChannelProcessor().processEventBatch(arrayList);
            this.sourceCounter.incrementAppendBatchAcceptedCount();
            this.sourceCounter.addToEventAcceptedCount(list.size());
            return Status.OK;
        } catch (Throwable th) {
            logger.error("Avro source " + getName() + ": Unable to process event batch. Exception follows.", th);
            if (th instanceof Error) {
                throw ((Error) th);
            }
            return Status.FAILED;
        }
    }
}
