package org.zbus.mq.server;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol;
import org.zbus.mq.disk.MessageQueue;
import org.zbus.net.core.Session;
import org.zbus.net.http.Message;

/* loaded from: input_file:org/zbus/mq/server/PubSub.class */
public class PubSub extends AbstractMQ {
    private static final Logger log = Logger.getLogger((Class<?>) PubSub.class);
    protected final ConcurrentMap<String, PullSession> pullMap;

    public PubSub(String str, MessageQueue messageQueue) {
        super(str, messageQueue);
        this.pullMap = new ConcurrentHashMap();
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void consume(Message message, Session session) throws IOException {
        PullSession pullSession = this.pullMap.get(session.id());
        if (pullSession != null) {
            pullSession.setPullMessage(message);
        } else {
            this.pullMap.putIfAbsent(session.id(), new PullSession(session, message));
        }
        dispatch();
    }

    @Override // org.zbus.mq.server.AbstractMQ
    void dispatch() throws IOException {
        while (true) {
            Message message = (Message) this.msgQ.poll();
            if (message == null) {
                break;
            }
            this.lastUpdateTime = System.currentTimeMillis();
            String topic = message.getTopic();
            Iterator<Map.Entry<String, PullSession>> it = this.pullMap.entrySet().iterator();
            while (it.hasNext()) {
                PullSession value = it.next().getValue();
                if (value == null || !value.getSession().isActive()) {
                    it.remove();
                } else if (value.isTopicMatched(topic)) {
                    Message copyWithoutBody = Message.copyWithoutBody(message);
                    copyWithoutBody.setResponseStatus(200);
                    value.getMsgQ().offer(copyWithoutBody);
                }
            }
        }
        Iterator<Map.Entry<String, PullSession>> it2 = this.pullMap.entrySet().iterator();
        while (it2.hasNext()) {
            PullSession value2 = it2.next().getValue();
            if (value2 == null || !value2.getSession().isActive()) {
                it2.remove();
            } else {
                try {
                    try {
                        value2.lock.lock();
                        Message message2 = value2.pullMessage;
                        if (message2 == null) {
                            value2.lock.unlock();
                        } else {
                            Message poll = value2.getMsgQ().poll();
                            if (poll == null) {
                                value2.lock.unlock();
                            } else {
                                poll.setResponseStatus(200);
                                poll.setRawId(poll.getId());
                                poll.setId(message2.getId());
                                value2.pullMessage = null;
                                value2.getSession().write(poll);
                                value2.lock.unlock();
                            }
                        }
                    } catch (IOException e) {
                        log.error(e.getMessage(), e);
                        value2.lock.unlock();
                    }
                } catch (Throwable th) {
                    value2.lock.unlock();
                    throw th;
                }
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void cleanSession(Session session) {
        Iterator<Map.Entry<String, PullSession>> it = this.pullMap.entrySet().iterator();
        while (it.hasNext()) {
            if (session == it.next().getValue().session) {
                it.remove();
                return;
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void cleanSession() {
        Iterator<Map.Entry<String, PullSession>> it = this.pullMap.entrySet().iterator();
        while (it.hasNext()) {
            if (!it.next().getValue().session.isActive()) {
                it.remove();
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public Protocol.MqInfo getMqInfo() {
        Protocol.MqInfo mqInfo = new Protocol.MqInfo();
        mqInfo.name = this.name;
        mqInfo.lastUpdateTime = this.lastUpdateTime;
        mqInfo.creator = getCreator();
        mqInfo.mode = this.mode;
        mqInfo.unconsumedMsgCount = this.msgQ.size();
        mqInfo.consumerInfoList = new ArrayList();
        Iterator<PullSession> it = this.pullMap.values().iterator();
        while (it.hasNext()) {
            mqInfo.consumerInfoList.add(it.next().getConsumerInfo());
        }
        mqInfo.consumerCount = mqInfo.consumerInfoList.size();
        return mqInfo;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<PullSession> it = this.pullMap.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().session.asyncClose();
            } catch (IOException e) {
                log.warn(e.getMessage(), e);
            }
        }
    }
}
