package org.zbus.mq;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import org.zbus.broker.Broker;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol;
import org.zbus.net.http.Message;

/* loaded from: input_file:org/zbus/mq/Consumer.class */
public class Consumer extends MqAdmin implements Closeable {
    private static final Logger log = Logger.getLogger((Class<?>) Consumer.class);
    private Message.MessageInvoker client;
    private String topic;
    private int consumeTimeout;
    private Message.MessageInvoker replyClient;
    private volatile Thread consumerThread;
    private volatile ConsumerHandler consumerHandler;
    private volatile ConsumerExceptionHandler consumerExceptionHandler;
    private final Runnable consumerTask;

    /* loaded from: input_file:org/zbus/mq/Consumer$ConsumerExceptionHandler.class */
    public interface ConsumerExceptionHandler {
        void onException(Exception exc, Consumer consumer);
    }

    /* loaded from: input_file:org/zbus/mq/Consumer$ConsumerHandler.class */
    public interface ConsumerHandler {
        void handle(Message message, Consumer consumer) throws IOException;
    }

    public Consumer(Broker broker, String str, Protocol.MqMode... mqModeArr) {
        super(broker, str, mqModeArr);
        this.topic = null;
        this.consumeTimeout = 300000;
        this.consumerThread = null;
        this.consumerTask = new Runnable() { // from class: org.zbus.mq.Consumer.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        try {
                            try {
                                Message take = Consumer.this.take();
                                if (Consumer.this.consumerHandler == null) {
                                    Consumer.log.warn("Missing consumerHandler, call onMessage first");
                                } else {
                                    try {
                                        Consumer.this.consumerHandler.handle(take, Consumer.this);
                                    } catch (IOException e) {
                                        Consumer.log.error(e.getMessage(), e);
                                    }
                                }
                            } catch (MqException e2) {
                                if (Consumer.this.consumerExceptionHandler == null) {
                                    throw e2;
                                }
                                Consumer.this.consumerExceptionHandler.onException(e2, Consumer.this);
                                return;
                            }
                        } catch (InterruptedException e3) {
                            Consumer.this.close();
                            return;
                        }
                    } catch (IOException e4) {
                        if (Consumer.this.consumerExceptionHandler != null) {
                            Consumer.this.consumerExceptionHandler.onException(e4, Consumer.this);
                        } else {
                            Consumer.log.error(e4.getMessage(), e4);
                        }
                    }
                }
            }
        };
    }

    public Consumer(MqConfig mqConfig) {
        super(mqConfig);
        this.topic = null;
        this.consumeTimeout = 300000;
        this.consumerThread = null;
        this.consumerTask = new Runnable() { // from class: org.zbus.mq.Consumer.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        try {
                            try {
                                Message take = Consumer.this.take();
                                if (Consumer.this.consumerHandler == null) {
                                    Consumer.log.warn("Missing consumerHandler, call onMessage first");
                                } else {
                                    try {
                                        Consumer.this.consumerHandler.handle(take, Consumer.this);
                                    } catch (IOException e) {
                                        Consumer.log.error(e.getMessage(), e);
                                    }
                                }
                            } catch (MqException e2) {
                                if (Consumer.this.consumerExceptionHandler == null) {
                                    throw e2;
                                }
                                Consumer.this.consumerExceptionHandler.onException(e2, Consumer.this);
                                return;
                            }
                        } catch (InterruptedException e3) {
                            Consumer.this.close();
                            return;
                        }
                    } catch (IOException e4) {
                        if (Consumer.this.consumerExceptionHandler != null) {
                            Consumer.this.consumerExceptionHandler.onException(e4, Consumer.this);
                        } else {
                            Consumer.log.error(e4.getMessage(), e4);
                        }
                    }
                }
            }
        };
        this.topic = mqConfig.getTopic();
    }

    private Broker.BrokerHint brokerHint() {
        Broker.BrokerHint brokerHint = new Broker.BrokerHint();
        brokerHint.setEntry(this.mq);
        return brokerHint;
    }

    private void ensureClient() throws IOException {
        if (this.client == null) {
            synchronized (this) {
                if (this.client == null) {
                    this.client = this.broker.getClient(brokerHint());
                }
            }
        }
    }

    public Message recv(int i) throws IOException, InterruptedException {
        return take(i);
    }

    public Message take(int i) throws IOException, InterruptedException {
        ensureClient();
        Message message = new Message();
        message.setCmd(Protocol.Consume);
        message.setMq(this.mq);
        message.setHead("token", this.accessToken);
        if (Protocol.MqMode.isEnabled(this.mode, Protocol.MqMode.PubSub) && this.topic != null) {
            message.setTopic(this.topic);
        }
        try {
            Message invokeSync = this.client.invokeSync(message, i);
            if (invokeSync == null) {
                return invokeSync;
            }
            invokeSync.setId(invokeSync.getRawId());
            invokeSync.removeHead(Message.RAWID);
            if (invokeSync.isStatus200()) {
                return invokeSync;
            }
            if (!invokeSync.isStatus404()) {
                throw new MqException(invokeSync.getBodyString());
            }
            if (createMQ()) {
                return take(i);
            }
            throw new MqException(invokeSync.getBodyString());
        } catch (ClosedByInterruptException e) {
            throw new InterruptedException(e.getMessage());
        } catch (IOException e2) {
            log.error(e2.getMessage(), e2);
            try {
                this.broker.closeClient(this.client);
                this.client = this.broker.getClient(brokerHint());
            } catch (IOException e3) {
                log.error(e3.getMessage(), e3);
            }
            return null;
        }
    }

    public Message take() throws InterruptedException, IOException {
        Message take;
        do {
            take = take(this.consumeTimeout);
        } while (take == null);
        return take;
    }

    @Override // org.zbus.mq.MqAdmin
    protected Message invokeSync(Message message) throws IOException, InterruptedException {
        ensureClient();
        return this.client.invokeSync(message, 10000);
    }

    private void ensureReplyClient() throws IOException {
        if (this.replyClient == null) {
            synchronized (this) {
                if (this.replyClient == null) {
                    this.replyClient = this.broker.getClient(brokerHint());
                }
            }
        }
    }

    public void routeMessage(Message message) throws IOException {
        ensureReplyClient();
        message.setCmd(Protocol.Route);
        message.setAck(false);
        this.replyClient.invokeAsync(message, null);
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        if (!Protocol.MqMode.isEnabled(this.mode, Protocol.MqMode.PubSub)) {
            throw new IllegalStateException("topic require PubSub mode");
        }
        this.topic = str;
    }

    public void onMessage(ConsumerHandler consumerHandler) {
        this.consumerHandler = consumerHandler;
    }

    public void onException(ConsumerExceptionHandler consumerExceptionHandler) {
        this.consumerExceptionHandler = consumerExceptionHandler;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        stop();
    }

    public synchronized void stop() {
        if (this.consumerThread != null) {
            this.consumerThread.interrupt();
            this.consumerThread = null;
        }
        try {
            if (this.client != null) {
                this.broker.closeClient(this.client);
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        try {
            if (this.replyClient != null) {
                this.broker.closeClient(this.replyClient);
            }
        } catch (IOException e2) {
            log.error(e2.getMessage(), e2);
        }
    }

    public synchronized void start(ConsumerHandler consumerHandler) {
        onMessage(consumerHandler);
        start();
    }

    public synchronized void start() {
        if (this.consumerThread == null) {
            this.consumerThread = new Thread(this.consumerTask);
            this.consumerThread.setName("ConsumerThread");
        }
        if (this.consumerThread.isAlive()) {
            return;
        }
        this.consumerThread.start();
    }

    public void setConsumeTimeout(int i) {
        this.consumeTimeout = i;
    }
}
