package org.zbus.broker;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.zbus.broker.Broker;
import org.zbus.kit.log.Logger;
import org.zbus.mq.server.MqAdaptor;
import org.zbus.mq.server.MqServer;
import org.zbus.mq.server.MqServerConfig;
import org.zbus.net.Sync;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;
import org.zbus.net.http.Message;

/* loaded from: input_file:org/zbus/broker/JvmBroker.class */
public class JvmBroker extends Session implements Broker {
    private static final Logger log = Logger.getLogger((Class<?>) JvmBroker.class);
    protected MqServer mqServer;
    protected MqAdaptor adaptor;
    protected final Sync<Message, Message> sync;
    protected int readTimeout;
    protected boolean ownMqServer;

    public JvmBroker() {
        this(new MqServerConfig());
    }

    public JvmBroker(MqServerConfig mqServerConfig) {
        this(new MqServer(mqServerConfig));
        this.ownMqServer = true;
    }

    public JvmBroker(MqServer mqServer) {
        super(null, null, null);
        this.sync = new Sync<>();
        this.readTimeout = 3000;
        this.ownMqServer = false;
        this.mqServer = mqServer;
        this.adaptor = this.mqServer.getDefaultMqAdaptor();
        setStatus(Session.SessionStatus.CONNECTED);
        try {
            this.adaptor.onSessionAccepted(this);
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    @Override // org.zbus.net.core.Session
    public String getLocalAddress() {
        return "LocalBroker-Local-" + id();
    }

    @Override // org.zbus.net.core.Session
    public String getRemoteAddress() {
        return "LocalBroker-Remote-" + id();
    }

    @Override // org.zbus.net.core.Session
    public void write(IoBuffer ioBuffer) throws IOException {
        throw new IllegalArgumentException("IoBuffer not support in LocalBroker");
    }

    @Override // org.zbus.net.core.Session
    public void write(Object obj) throws IOException {
        if (!(obj instanceof Message)) {
            throw new IllegalArgumentException("Message type required");
        }
        Message message = (Message) obj;
        Sync.Ticket<Message, Message> removeTicket = this.sync.removeTicket(message.getId());
        if (removeTicket != null) {
            removeTicket.notifyResponse(message);
        } else {
            log.debug("!!!!!!!!!!!!!!!!!!!!!!!!!!Drop,%s", message);
        }
    }

    @Override // org.zbus.net.Invoker
    public Message invokeSync(Message message, int i) throws IOException, InterruptedException {
        Sync.Ticket<Message, Message> ticket = null;
        try {
            ticket = this.sync.createTicket(message, i);
            invokeAsync(message, (Sync.ResultCallback<Message>) null);
            if (!ticket.await(i, TimeUnit.MILLISECONDS)) {
                if (ticket != null) {
                    this.sync.removeTicket(ticket.getId());
                }
                return null;
            }
            Message response = ticket.response();
            if (ticket != null) {
                this.sync.removeTicket(ticket.getId());
            }
            return response;
        } catch (Throwable th) {
            if (ticket != null) {
                this.sync.removeTicket(ticket.getId());
            }
            throw th;
        }
    }

    @Override // org.zbus.net.Invoker
    public void invokeAsync(Message message, Sync.ResultCallback<Message> resultCallback) throws IOException {
        Sync.Ticket<Message, Message> ticket = null;
        if (resultCallback != null) {
            ticket = this.sync.createTicket(message, this.readTimeout, resultCallback);
        } else if (message.getId() == null) {
            message.setId(Sync.Ticket.nextId());
        }
        try {
            this.adaptor.onMessage(message, this);
        } catch (IOException e) {
            if (ticket != null) {
                this.sync.removeTicket(ticket.getId());
            }
            throw e;
        }
    }

    @Override // org.zbus.net.core.Session, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        setStatus(Session.SessionStatus.CLOSED);
        this.adaptor.onSessionToDestroy(this);
        if (this.ownMqServer) {
            this.mqServer.close();
        }
    }

    @Override // org.zbus.broker.Broker
    public Message.MessageInvoker getClient(Broker.BrokerHint brokerHint) throws IOException {
        return this;
    }

    @Override // org.zbus.broker.Broker
    public void closeClient(Message.MessageInvoker messageInvoker) throws IOException {
    }
}
