/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.common.net.exabus.util;

import com.oracle.coherence.common.base.Blocking;
import com.oracle.coherence.common.base.Collector;
import com.oracle.coherence.common.base.Disposable;
import com.oracle.coherence.common.base.Factory;
import com.oracle.coherence.common.base.Hasher;
import com.oracle.coherence.common.base.Notifier;
import com.oracle.coherence.common.base.Pollable;
import com.oracle.coherence.common.base.SingleWaiterCooperativeNotifier;
import com.oracle.coherence.common.collections.SingleConsumerBlockingQueue;
import com.oracle.coherence.common.internal.Platform;
import com.oracle.coherence.common.internal.net.socketbus.SocketBusDriver;
import com.oracle.coherence.common.internal.util.Histogram;
import com.oracle.coherence.common.internal.util.ScaledHistogram;
import com.oracle.coherence.common.io.BufferManager;
import com.oracle.coherence.common.io.BufferManagers;
import com.oracle.coherence.common.io.BufferSequence;
import com.oracle.coherence.common.io.BufferSequenceInputStream;
import com.oracle.coherence.common.io.BufferSequenceOutputStream;
import com.oracle.coherence.common.io.Buffers;
import com.oracle.coherence.common.io.MultiBufferSequence;
import com.oracle.coherence.common.io.SingleBufferSequence;
import com.oracle.coherence.common.net.SSLSettings;
import com.oracle.coherence.common.net.SSLSocketProvider;
import com.oracle.coherence.common.net.SocketSettings;
import com.oracle.coherence.common.net.exabus.Bus;
import com.oracle.coherence.common.net.exabus.Depot;
import com.oracle.coherence.common.net.exabus.EndPoint;
import com.oracle.coherence.common.net.exabus.Event;
import com.oracle.coherence.common.net.exabus.MemoryBus;
import com.oracle.coherence.common.net.exabus.MessageBus;
import com.oracle.coherence.common.net.exabus.spi.Driver;
import com.oracle.coherence.common.net.exabus.util.QueueingEventCollector;
import com.oracle.coherence.common.net.exabus.util.SimpleDepot;
import com.oracle.coherence.common.net.exabus.util.SimpleEvent;
import com.oracle.coherence.common.util.Bandwidth;
import com.oracle.coherence.common.util.Duration;
import com.oracle.coherence.common.util.MemorySize;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.URL;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

public class MessageBusTest {
    public static final int MSG_HEADER_SIZE = 21;
    protected static BufferManager s_manager;
    protected static boolean s_fVerbose;
    protected static boolean s_fPrompt;
    protected static boolean s_fCached;
    protected static boolean s_fPollingCollector;
    public static int s_nLatencyFreq;
    public static boolean s_fReceipts;
    public static long s_cbMsgMin;
    public static long s_cbMsgMax;
    public static int s_cbChunk;
    public static byte[] s_abChunk;
    protected static long AWAIT_SPIN_NANOS;
    public static boolean s_fFlowControl;
    public static boolean s_fRelay;
    public static boolean s_fBlock;
    public static boolean s_fSingleUseConnection;
    public static AtomicLong s_cErrors;
    public static Random s_rand;

    public static Histogram makeLatencyHistogram() {
        return new ScaledHistogram(10000000).setFormatter(v -> new Duration((double)v, Duration.Magnitude.MICRO).toString());
    }

    public static BufferSequence getMessage(int nId, boolean fResp, long ldtSent) throws IOException {
        long cbPayload;
        long cb = s_cbMsgMin;
        long cbDelta = s_cbMsgMax - s_cbMsgMin;
        if (cbDelta > 0L) {
            cb += s_rand.nextLong() % cbDelta;
        }
        cb = Math.max(cb, 21L);
        SkipStream out = null;
        do {
            try {
                out = new SkipStream(s_manager, cb);
            }
            catch (OutOfMemoryError e) {
                System.err.println(Thread.currentThread().getName() + " handling error: " + String.valueOf(e));
                System.err.println(s_manager);
                s_cErrors.incrementAndGet();
                Thread.yield();
            }
        } while (out == null);
        out.writeInt(nId);
        out.writeBoolean(fResp);
        out.writeLong(ldtSent);
        out.writeLong(cbPayload);
        int cbChunk = s_cbChunk;
        switch (cbChunk) {
            default: {
                for (cbPayload = cb - 21L; cbPayload >= (long)cbChunk; cbPayload -= (long)cbChunk) {
                    out.write(s_abChunk);
                }
            }
            case 8: {
                while (cbPayload >= 8L) {
                    out.writeLong(0L);
                    cbPayload -= 8L;
                }
            }
            case 4: {
                while (cbPayload >= 4L) {
                    out.writeInt(0);
                    cbPayload -= 4L;
                }
            }
            case 2: {
                while (cbPayload >= 2L) {
                    out.writeShort(0);
                    cbPayload -= 2L;
                }
            }
            case 1: {
                while (cbPayload >= 1L) {
                    out.write(0);
                    --cbPayload;
                }
                break;
            }
            case 0: {
                out.skip(cbPayload);
            }
        }
        return out.toBufferSequence();
    }

    public static SocketBusDriver.DefaultDependencies applyDriverProperties(String sPrefix, Properties props, SocketBusDriver.DefaultDependencies deps) throws Exception {
        deps.setBufferManager(s_manager);
        SocketSettings sockOpts = new SocketSettings(SocketBusDriver.DefaultDependencies.DEFAULT_OPTIONS);
        String sName = sPrefix + "socket.rxbuffer";
        if (props.containsKey(sName)) {
            sockOpts.set(4098, (int)new MemorySize(props.getProperty(sName)).getByteCount());
        }
        if (props.containsKey(sName = sPrefix + "socket.txbuffer")) {
            sockOpts.set(4097, (int)new MemorySize(props.getProperty(sName)).getByteCount());
        }
        if (props.containsKey(sName = sPrefix + "socket.nodelay")) {
            sockOpts.set(1, Boolean.parseBoolean(props.getProperty(sName)));
        }
        if (props.containsKey(sName = sPrefix + "socket.linger")) {
            sockOpts.set(128, Boolean.parseBoolean(props.getProperty(sName)));
        }
        deps.setSocketOptions(sockOpts);
        return deps;
    }

    public static SimpleDepot.Dependencies parseDependencies(String sPrefix, Properties props) throws Exception {
        SimpleDepot.DefaultDependencies depsDepot = new SimpleDepot.DefaultDependencies();
        String sSslKeystore = sPrefix + "ssl.keystore";
        if (props.containsKey(sSslKeystore)) {
            String sClientAuth;
            String sKeystore = props.getProperty(sSslKeystore);
            String sPassword = props.getProperty(sPrefix + "ssl.password", "password");
            SSLContext sSLContext = SSLContext.getInstance("TLS");
            KeyManagerFactory keymanager = KeyManagerFactory.getInstance("SunX509");
            TrustManagerFactory trustmanager = TrustManagerFactory.getInstance("SunX509");
            KeyStore keystore = KeyStore.getInstance("JKS");
            char[] achPassword = sPassword.toCharArray();
            keystore.load(new URL("file:" + sKeystore).openStream(), achPassword);
            keymanager.init(keystore, achPassword);
            trustmanager.init(keystore);
            sSLContext.init(keymanager.getKeyManagers(), trustmanager.getTrustManagers(), new SecureRandom());
            SSLSettings sslSettings = new SSLSettings().setSSLContext(sSLContext);
            switch (sClientAuth = props.getProperty(sPrefix + "ssl.clientauth", "none").toLowerCase()) {
                case "wanted": {
                    sslSettings.setClientAuth(SSLSocketProvider.ClientAuthMode.wanted);
                    break;
                }
                case "required": 
                case "true": {
                    sslSettings.setClientAuth(SSLSocketProvider.ClientAuthMode.required);
                    break;
                }
                default: {
                    sslSettings.setClientAuth(SSLSocketProvider.ClientAuthMode.none);
                }
            }
            depsDepot.setSSLSettings(sslSettings);
        }
        HashMap<String, Driver> mapDriver = new HashMap<String, Driver>(depsDepot.getDrivers());
        for (Map.Entry entry : mapDriver.entrySet()) {
            Driver driver = (Driver)entry.getValue();
            if (!(driver instanceof SocketBusDriver)) continue;
            SocketBusDriver sbDriver = (SocketBusDriver)driver;
            entry.setValue(new SocketBusDriver(MessageBusTest.applyDriverProperties(sPrefix, props, new SocketBusDriver.DefaultDependencies(sbDriver.getDependencies()))));
        }
        mapDriver.put("EchoBus", new EchoBus.EchoDriver());
        depsDepot.setDrivers(mapDriver);
        return depsDepot;
    }

    public static Map<String, String> parseArgs(String[] asArg) {
        HashMap<String, String> mapArgs = new HashMap<String, String>();
        int c = asArg.length;
        for (int i = 0; i < c; ++i) {
            String sVal;
            String sKey;
            String arg = asArg[i];
            if (arg.startsWith("-")) {
                sKey = arg;
                sVal = null;
                while (i + 1 < c) {
                    arg = asArg[i + 1];
                    if (arg.startsWith("-")) {
                        try {
                            Integer.valueOf(arg);
                        }
                        catch (NumberFormatException e) {
                            break;
                        }
                    }
                    sVal = sVal == null ? arg : sVal + " " + arg;
                    ++i;
                }
            } else {
                throw new IllegalArgumentException("unepxected paramter " + arg);
            }
            mapArgs.put(sKey, sVal == null ? "true" : sVal);
        }
        return mapArgs;
    }

    public static List<EndPoint> parseEndPoints(Depot depot, String sEps) {
        ArrayList<EndPoint> listEp = new ArrayList<EndPoint>();
        StringTokenizer tok = new StringTokenizer(sEps);
        while (tok.hasMoreElements()) {
            int nPortEnd;
            String sTok = tok.nextToken();
            int of = sTok.indexOf("..");
            if (of == -1) {
                listEp.add(depot.resolveEndPoint(sTok));
                continue;
            }
            String sName = sTok.substring(0, of);
            int ofPort = Math.max(sName.lastIndexOf(46), sName.lastIndexOf(58));
            String sPrefix = sName.substring(0, ofPort + 1);
            int nPort = Integer.parseInt(sName.substring(ofPort + 1));
            if (nPort < (nPortEnd = Integer.parseInt(sTok.substring(of + 2)))) {
                while (nPort <= nPortEnd) {
                    listEp.add(depot.resolveEndPoint(sPrefix + nPort));
                    ++nPort;
                }
                continue;
            }
            while (nPort >= nPortEnd) {
                listEp.add(depot.resolveEndPoint(sPrefix + nPort));
                --nPort;
            }
        }
        return listEp;
    }

    public static void printHelp(PrintStream out) {
        out.println("MessageBusTest parameters:");
        out.println("\t-bind               list of one or more local EndPoints to create");
        out.println("\t-peer               list of one or more remote EndPoints to send to");
        out.println("\t-rxThreads          number of receive threads per bound EndPoint (negative for reentrant)");
        out.println("\t-txThreads          number of transmit threads per bound EndPoint");
        out.println("\t-msgSize            range of message sizes to send, expressed as min[..max]");
        out.println("\t-chunkSize          defines the number of bytes to process as a single unit, i.e. 1 for byte, 8 for long, 0 to disable");
        out.println("\t-cached             re-use message objects where possible, reducing buffer manager overhead");
        out.println("\t-txRate             target outbound data rate");
        out.println("\t-txMaxBacklog       the maximum backlog the test should produce per tx thread");
        out.println("\t-rxRate             target inbound data rate");
        out.println("\t-flushFreq          number of messages to send before flushing, or 0 for auto");
        out.println("\t-latencyFreq        number of messages to send before sampling latency");
        out.println("\t-noReceipts         specified if receipts should not be used, relies on GC to reclaim messages");
        out.println("\t-manager            buffer manager to utilize (net, direct, heap)");
        out.println("\t-polite             if specified this instance will not start sending until connected to");
        out.println("\t-depotFactory       the fully qualified class name of the Factory to use to obtain the Depot");
        out.println("\t-reportInterval     the report interval");
        out.println("\t-polite             if specified this instance will not start sending until connected to");
        out.println("\t-block              if specified a transmit thread will block while awaiting a response, optional value of spin duration");
        out.println("\t-relay              if specified then the process will relay any received messages to one of its peers");
        out.println("\t-ignoreFlowControl  if flow control events are to be ignored, use -txMaxBacklog to prevent OutOfMemory");
        out.println("\t-poll               is specified PollingEventCollector will be utilized");
        out.println("\t-prompt             if specified the user will be prompted before each send");
        out.println("\t-tabular            if specified the output will be in tabular format");
        out.println("\t-warmup             time duration or message count which will be discarded for warmup");
        out.println("\t-single             if specified an outgoing connection will emit just one message, then reconnect");
        out.println("\t-verbose            to enable verbose debugging output");
    }

    /*
     * WARNING - void declaration
     */
    public static void main(String[] asArg) throws Exception {
        int i;
        int nFlushOn;
        long cbMin;
        long cbMax;
        long cbAvg;
        Map<String, String> mapArgs = MessageBusTest.parseArgs(asArg);
        String sEpLocal = mapArgs.remove("-bind");
        String sEpPeer = mapArgs.remove("-peer");
        String sRxThreads = mapArgs.remove("-rxThreads");
        String sTxThreads = mapArgs.remove("-txThreads");
        String sMsgSize = mapArgs.remove("-msgSize");
        String sChunkSize = mapArgs.remove("-chunkSize");
        String sCached = mapArgs.remove("-cached");
        String sFlushFreq = mapArgs.remove("-flushFreq");
        Object sTxRate = mapArgs.remove("-txRate");
        String sTxMaxBacklog = mapArgs.remove("-txMaxBacklog");
        Object sRxRate = mapArgs.remove("-rxRate");
        String sLatFreq = mapArgs.remove("-latencyFreq");
        String sNoReceipt = mapArgs.remove("-noReceipts");
        String sManager = mapArgs.remove("-manager");
        String sVerbose = mapArgs.remove("-verbose");
        String sPolite = mapArgs.remove("-polite");
        String sBlock = mapArgs.remove("-block");
        String sRelay = mapArgs.remove("-relay");
        String sIgnoreFC = mapArgs.remove("-ignoreFlowControl");
        String sTabular = mapArgs.remove("-tabular");
        String sFactoryDepot = mapArgs.remove("-depotFactory");
        Object sReportInterval = mapArgs.remove("-reportInterval");
        String sPollingColl = mapArgs.remove("-poll");
        String sPrompt = mapArgs.remove("-prompt");
        String sWarmup = mapArgs.remove("-warmup");
        String sSingle = mapArgs.remove("-single");
        if (sTxRate != null && Character.isDigit(((String)sTxRate).charAt(((String)sTxRate).length() - 1))) {
            sTxRate = (String)sTxRate + "MBps";
        }
        if (sRxRate != null && Character.isDigit(((String)sRxRate).charAt(((String)sRxRate).length() - 1))) {
            sRxRate = (String)sRxRate + "MBps";
        }
        if (sReportInterval == null) {
            sReportInterval = "5s";
        } else if (Character.isDigit(((String)sReportInterval).charAt(((String)sReportInterval).length() - 1))) {
            sReportInterval = (String)sReportInterval + "s";
        }
        s_fVerbose = sVerbose != null;
        s_fFlowControl = sIgnoreFC == null;
        s_fCached = sCached != null;
        s_fPollingCollector = sPollingColl != null;
        s_fPrompt = sPrompt != null;
        s_fSingleUseConnection = sSingle != null;
        long cReportMillis = new Duration((String)sReportInterval).as(Duration.Magnitude.MILLI);
        boolean fPolite = sPolite != null && sPolite.equals("true");
        boolean fBlock = sBlock != null && !sBlock.equals("false");
        boolean fRelay = sRelay != null && !sRelay.equals("false");
        boolean fTabular = sTabular != null && sTabular.equals("true");
        int cRxThreads = Math.abs(sRxThreads == null ? 1 : Integer.parseInt(sRxThreads));
        int cTxThreads = Math.abs(sTxThreads == null ? (fRelay ? 0 : 1) : Integer.parseInt(sTxThreads));
        boolean fReentrant = sRxThreads != null && sRxThreads.startsWith("-");
        long cbsOut = sTxRate == null ? -1L : new Bandwidth((String)sTxRate).as(Bandwidth.Rate.BYTES);
        long cbsIn = sRxRate == null ? -1L : new Bandwidth((String)sRxRate).as(Bandwidth.Rate.BYTES);
        long cbMaxBacklog = sTxMaxBacklog == null ? -1L : new MemorySize(sTxMaxBacklog).getByteCount();
        long cMsgWarmup = 0L;
        long cMillisWarmup = 0L;
        if (sWarmup != null) {
            try {
                cMsgWarmup = Integer.parseInt(sWarmup);
            }
            catch (Exception e) {
                cMillisWarmup = new Duration(sWarmup).as(Duration.Magnitude.MILLI);
            }
        }
        if (fBlock && !sBlock.equals("true")) {
            AWAIT_SPIN_NANOS = new Duration(sBlock).getNanos();
        }
        s_fBlock = fBlock;
        if (fRelay && cTxThreads != 0) {
            System.err.println("-relay and -txThreads cannot both be specified");
            System.exit(1);
        }
        s_fRelay = fRelay;
        if (sRxThreads == null || cRxThreads == 0) {
            cRxThreads = Platform.getPlatform().getFairShareProcessors() * 17;
            fReentrant = true;
        }
        if (s_fPollingCollector && cRxThreads != 1) {
            System.err.println("\nWARNING: polling collector generally requires -rxThreads 1\n");
        }
        if (fReentrant && cbsIn != -1L) {
            if (sRxThreads == null) {
                fReentrant = false;
                cRxThreads = 1;
            } else {
                System.err.println("inbound throttling (-rxRate) not available with reentrant processing (-rxThreads <= 0)");
                System.exit(1);
            }
        }
        s_nLatencyFreq = sLatFreq == null ? 100 : Integer.parseInt(sLatFreq);
        boolean bl = s_fReceipts = sNoReceipt == null || !sNoReceipt.equals("true");
        if (!s_fReceipts && cbMaxBacklog != -1L) {
            throw new IllegalArgumentException("-txMaxBacklog requires receipts");
        }
        if (sManager == null || sManager.equals("net")) {
            s_manager = BufferManagers.getNetworkDirectManager();
        } else if (sManager.equals("direct")) {
            s_manager = BufferManagers.getDirectManager();
        } else if (sManager.equals("heap")) {
            s_manager = BufferManagers.getHeapManager();
        } else {
            throw new IllegalArgumentException("unknown heap manager: " + sManager);
        }
        if (sMsgSize == null) {
            cbAvg = 4096L;
            cbMax = 4096L;
            cbMin = 4096L;
        } else {
            int ofMsgDelim = sMsgSize.indexOf("..");
            if (ofMsgDelim == -1) {
                cbMax = cbAvg = new MemorySize(sMsgSize).getByteCount();
                cbMin = cbAvg;
            } else {
                if (s_fCached) {
                    System.err.println("-cached does not support variable sized messaging");
                    throw new IllegalArgumentException();
                }
                cbMin = new MemorySize(sMsgSize.substring(0, ofMsgDelim)).getByteCount();
                cbMax = new MemorySize(sMsgSize.substring(ofMsgDelim + 2)).getByteCount();
                if (cbMax < cbMin) {
                    long n = cbMax;
                    cbMax = cbMin;
                    cbMin = n;
                }
                cbAvg = cbMax - (cbMax - cbMin) / 2L;
            }
        }
        if (cbMin < 21L) {
            System.out.println("increasing minimum message size to 21 bytes to satisfy test requirements\n");
            cbMin = 21L;
        }
        if (cbMax < 21L) {
            cbMax = 21L;
        }
        if (cbAvg < 21L) {
            cbAvg = 21L;
        }
        s_cbMsgMin = cbMin;
        s_cbMsgMax = cbMax;
        int cbChunk = 0;
        if (sChunkSize != null) {
            cbChunk = (int)new MemorySize(sChunkSize).getByteCount();
        }
        s_cbChunk = (int)Math.min(cbMin, (long)cbChunk);
        s_abChunk = new byte[cbChunk];
        int n = nFlushOn = sFlushFreq == null ? 0 : Integer.parseInt(sFlushFreq);
        if (!mapArgs.isEmpty()) {
            System.err.println("unknown parameter " + mapArgs.keySet().iterator().next());
            System.err.println();
            MessageBusTest.printHelp(System.err);
            System.exit(1);
        }
        Depot depot = sFactoryDepot == null ? new SimpleDepot(MessageBusTest.parseDependencies("depot.", System.getProperties())) : (Depot)((Factory)Class.forName(sFactoryDepot).newInstance()).create();
        EndPoint[] aPeer = new EndPoint[]{};
        if (sEpPeer != null) {
            aPeer = MessageBusTest.parseEndPoints(depot, sEpPeer).toArray(aPeer);
        }
        ArrayList<Bus> listBus = new ArrayList<Bus>();
        if (sEpLocal == null) {
            if (aPeer.length == 0) {
                listBus.add(depot.createMessageBus(null));
            } else {
                String sPeer = aPeer[0].getCanonicalName();
                if (sPeer.contains(":")) {
                    listBus.add(depot.createMessageBus(depot.resolveEndPoint(sPeer.substring(0, sPeer.indexOf(58)) + "://0.0.0.0:0")));
                }
            }
        } else {
            for (EndPoint epBind : MessageBusTest.parseEndPoints(depot, sEpLocal)) {
                try {
                    listBus.add(depot.createMessageBus(epBind));
                }
                catch (Exception e) {
                    try {
                        listBus.add(depot.createMemoryBus(epBind));
                    }
                    catch (Exception e2) {
                        e2.printStackTrace();
                        throw e;
                    }
                }
            }
        }
        int cBus = listBus.size();
        HashSet<DemultiplexingCollector> setDemuxer = new HashSet<DemultiplexingCollector>();
        EventProcessor[] aProcessor = new EventProcessor[cBus * cRxThreads];
        Transmitter[] aTransmitter = new Transmitter[cBus * cTxThreads];
        long cbsInProc = cbsIn == -1L ? cbsIn : Math.max(1L, cbsIn / (long)aProcessor.length);
        int iProc = 0;
        int iTrans = 0;
        for (Bus bus : listBus) {
            AtomicInteger fBacklogLocal = new AtomicInteger();
            Set<EndPoint> setReady = Collections.newSetFromMap(new ConcurrentHashMap());
            EndPoint boundEp = bus.getLocalEndPoint();
            EventProcessor[] aProc = new EventProcessor[cRxThreads];
            Transmitter[] aTrans = new Transmitter[cTxThreads];
            Set<EndPoint> setPeer = new HashSet();
            EndPoint[] endPointArray = aPeer;
            int n2 = endPointArray.length;
            for (i = 0; i < n2; ++i) {
                EndPoint endPoint = endPointArray[i];
                if (endPoint.equals(boundEp)) continue;
                setPeer.add(endPoint);
            }
            if ((setPeer = Collections.unmodifiableSet(setPeer)).isEmpty()) {
                Transmitter[] aTransNew = new Transmitter[aTransmitter.length - cTxThreads];
                System.arraycopy(aTransmitter, 0, aTransNew, 0, aTransNew.length);
                aTransmitter = aTransNew;
                aTrans = null;
            } else {
                int c = aTrans.length;
                for (int i2 = 0; i2 < c; ++i2) {
                    aTransmitter[iTrans++] = aTrans[i2] = new Transmitter(bus, i2, setReady, nFlushOn, fBacklogLocal, cbMaxBacklog);
                }
            }
            for (int i3 = 0; i3 < cRxThreads; ++i3) {
                aProcessor[iProc++] = aProc[i3] = new EventProcessor(bus, setPeer, setReady, aTrans, cbsInProc, nFlushOn, fBacklogLocal);
            }
            if (s_fPollingCollector) {
                bus.setEventCollector(new QueueingEventCollector());
            } else {
                DemultiplexingCollector collector = new DemultiplexingCollector(bus, aProc);
                setDemuxer.add(collector);
                bus.setEventCollector(collector);
            }
            bus.open();
            if (fPolite) continue;
            for (EndPoint peer : setPeer) {
                bus.connect(peer);
            }
        }
        if (!fReentrant) {
            for (EventProcessor proc : aProcessor) {
                proc.start();
            }
        }
        long cbsOutTrans = cbsOut == -1L ? cbsOut : Math.max(1L, cbsOut / (long)aTransmitter.length);
        for (Transmitter trans : aTransmitter) {
            trans.setTransmitRate(cbsOutTrans);
            trans.start();
        }
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                System.out.println();
            }
        });
        if (fTabular) {
            System.out.println("msg/s in\tbytes/s in\tmsg/s out\tbytes/s out\tavg receipt latency nanos\tmin response latency nanos\tavg response latency nanos\teffective latency nanos\tmax response latency nanos\tin backlog percentage\tin backlog events\tin backlog bytes\tout backlog percentage\tout backlog events\tout backlog bytes\tconnections\terrors");
        }
        long ldtWarmStart = 0L;
        class Stats {
            long ldt;
            long cMsgIn;
            long cMsgOut;
            long cbIn;
            long cbOut;
            long cbInCollected;
            long cReceipts;
            long cReceiptSamples;
            long cReceiptNanos;
            long cResponses;
            long cResponseNanos;
            long cResponseNanosMin = Long.MAX_VALUE;
            long cResponseNanosMax = -1L;
            long cBacklogLocal;
            long cMillisBacklogLocal;
            long cBacklogRemote;
            long cMillisBacklogRemote;
            long cbInPendingLife;
            long cbOutPendingLife;
            long cErrors;
            long cConnections;
            Histogram histLatency = MessageBusTest.makeLatencyHistogram();

            public Stats(long ldt) {
                this.ldt = ldt;
            }
        }
        Stats statsWarm = null;
        Stats statsPrev = null;
        Stats stats = null;
        int iReport = 0;
        while (true) {
            Blocking.sleep(statsWarm == null ? 10L : cReportMillis);
            stats = new Stats(System.currentTimeMillis());
            for (EventProcessor eventProcessor : aProcessor) {
                stats.cReceipts += eventProcessor.getReceiptsIn();
                stats.cReceiptSamples += eventProcessor.getReceiptSamples();
                stats.cReceiptNanos += eventProcessor.getReceiptNanos();
                stats.cResponses += eventProcessor.getResponsesIn();
                stats.cResponseNanos += eventProcessor.getResponseNanos();
                stats.histLatency.addSamples(eventProcessor.getResponseLatencyHistogram());
                stats.cResponseNanosMax = Math.max(eventProcessor.m_cResponseNanosMax, stats.cResponseNanosMax);
                eventProcessor.m_cResponseNanosMax = -1L;
                stats.cResponseNanosMin = Math.min(eventProcessor.m_cResponseNanosMin, stats.cResponseNanosMin);
                eventProcessor.m_cResponseNanosMin = Long.MAX_VALUE;
                stats.cMsgIn += eventProcessor.getMessagesIn();
                stats.cMsgOut += eventProcessor.getMessagesOut();
                stats.cbIn += eventProcessor.getBytesIn();
                stats.cbOut += eventProcessor.getBytesOut();
                stats.cBacklogLocal += eventProcessor.getLocalBacklogEvents();
                stats.cMillisBacklogLocal += eventProcessor.getLocalBacklogMillis();
                stats.cBacklogRemote += eventProcessor.getRemoteBacklogEvents();
                stats.cConnections += eventProcessor.getConnectionCount();
            }
            for (Thread thread : aTransmitter) {
                stats.cMsgOut += ((Transmitter)thread).getMessagesOut();
                stats.cbOut += ((Transmitter)thread).getBytesOut();
                stats.cMillisBacklogRemote += ((Transmitter)thread).getRemoteBacklogMillis();
            }
            if (setDemuxer != null) {
                for (DemultiplexingCollector collector : setDemuxer) {
                    stats.cbInCollected += collector.getReceivedBytes();
                }
            }
            stats.cErrors = s_cErrors.get();
            if (statsWarm == null) {
                if (stats.cConnections > 0L && Math.max(stats.cMsgIn, stats.cMsgOut) > cMsgWarmup) {
                    if (ldtWarmStart == 0L) {
                        ldtWarmStart = stats.ldt;
                    }
                    if (stats.ldt - ldtWarmStart >= cMillisWarmup) {
                        statsPrev = statsWarm = stats;
                    }
                    iReport = 0;
                }
            } else if (stats.cConnections == 0L) {
                statsWarm = null;
                ldtWarmStart = 0L;
            } else {
                for (Stats stats2 : new Stats[]{statsPrev, statsWarm}) {
                    long cDeltaMillis = stats.ldt - stats2.ldt;
                    long cMsgInDelta = stats.cMsgIn - stats2.cMsgIn;
                    long cMsgOutDelta = stats.cMsgOut - stats2.cMsgOut;
                    long cbInDelta = stats.cbIn - stats2.cbIn;
                    long cbOutDelta = stats.cbOut - stats2.cbOut;
                    long cReceiptDelta = stats.cReceiptSamples - stats2.cReceiptSamples;
                    long cReceiptNanosDelta = stats.cReceiptNanos - stats2.cReceiptNanos;
                    long cResponseDelta = stats.cResponses - stats2.cResponses;
                    long cResponseNanosDelta = stats.cResponseNanos - stats2.cResponseNanos;
                    long cBacklogLocalDelta = stats.cBacklogLocal - stats2.cBacklogLocal;
                    long cMillisBacklogLocalDelta = stats.cMillisBacklogLocal - stats2.cMillisBacklogLocal;
                    long cBacklogRemoteDelta = stats.cBacklogRemote - stats2.cBacklogRemote;
                    long cMillisBacklogRemoteDelta = stats.cMillisBacklogRemote - stats2.cMillisBacklogRemote;
                    long cErrorsDelta = stats.cErrors - stats2.cErrors;
                    long cbOutPending = (stats.cMsgOut - stats.cReceipts) * cbAvg;
                    long cbInPending = s_fPollingCollector ? -1L : stats.cbInCollected - stats.cbIn;
                    double dflSeconds = (double)cDeltaMillis / 1000.0;
                    long MSGsIn = Math.round((double)cMsgInDelta / dflSeconds);
                    long MSGsOut = Math.round((double)cMsgOutDelta / dflSeconds);
                    long BLsLocal = Math.round((double)cBacklogLocalDelta / dflSeconds);
                    long BLsRemote = Math.round((double)cBacklogRemoteDelta / dflSeconds);
                    long lPctBacklogLocal = 100L * cMillisBacklogLocalDelta / (cDeltaMillis * (long)cBus);
                    long lPctBacklogRemote = aTransmitter.length == 0 ? -1L : 100L * cMillisBacklogRemoteDelta / (cDeltaMillis * (long)aTransmitter.length);
                    long cResponseNanosDeltaEff = (long)((double)cResponseNanosDelta * (1.0 / (1.0 - (double)lPctBacklogRemote / 100.0)));
                    if (stats2 == statsWarm) {
                        stats.cbOutPendingLife = statsPrev.cbOutPendingLife + cbOutPending;
                        cbOutPending = stats.cbOutPendingLife / (long)(iReport + 1);
                        stats.cbInPendingLife = statsPrev.cbInPendingLife + cbInPending;
                        cbInPending = stats.cbInPendingLife / (long)(iReport + 1);
                        stats.cConnections = Math.max(stats.cConnections, statsPrev.cConnections);
                        stats.cResponseNanosMin = Math.min(stats.cResponseNanosMin, statsPrev.cResponseNanosMin);
                        stats.cResponseNanosMax = Math.max(stats.cResponseNanosMax, statsPrev.cResponseNanosMax);
                    }
                    if (fTabular) {
                        System.out.println(MSGsIn + "\t" + (double)cbInDelta / dflSeconds + "\t" + MSGsOut + "\t" + (double)cbOutDelta / dflSeconds + "\t" + (cReceiptDelta == 0L ? -1L : cReceiptNanosDelta / cReceiptDelta) + "\t" + (cResponseDelta == 0L ? -1L : stats.cResponseNanosMin) + "\t" + (cResponseDelta == 0L ? -1L : cResponseNanosDelta / cResponseDelta) + "\t" + (cResponseDelta == 0L ? -1L : cResponseNanosDeltaEff / cResponseDelta) + "\t" + (cResponseDelta == 0L ? -1L : stats.cResponseNanosMax) + "\t" + lPctBacklogLocal + "\t" + BLsLocal + "\t" + cbInPending + "\t" + lPctBacklogRemote + "\t" + BLsRemote + "\t" + cbOutPending + "\t" + stats.cConnections + "\t" + cErrorsDelta);
                        break;
                    }
                    System.out.println((stats2 == statsPrev ? "now:  " : "life: ") + "throughput(out " + MSGsOut + "msg/s " + String.valueOf(new Bandwidth((double)(8L * cbOutDelta) / dflSeconds, Bandwidth.Rate.BITS)) + ", in " + MSGsIn + "msg/s " + String.valueOf(new Bandwidth((double)(8L * cbInDelta) / dflSeconds, Bandwidth.Rate.BITS)) + "), latency(response" + (String)(cResponseDelta == 0L ? " n/a" : "(avg " + String.valueOf(new Duration(cResponseNanosDelta / cResponseDelta)) + ", effective " + String.valueOf(new Duration(cResponseNanosDeltaEff / cResponseDelta)) + ", min " + String.valueOf(new Duration(stats.cResponseNanosMin)) + ", max " + String.valueOf(new Duration(stats.cResponseNanosMax)) + ")") + ", receipt " + String.valueOf(cReceiptDelta == 0L ? "n/a" : new Duration(cReceiptNanosDelta / cReceiptDelta)) + "), backlog(out " + (String)(lPctBacklogRemote < 0L ? "n/a " : lPctBacklogRemote + "% ") + BLsRemote + "/s " + String.valueOf(s_fReceipts ? new MemorySize(cbOutPending) : "n/a") + ", in " + lPctBacklogLocal + "% " + BLsLocal + "/s " + String.valueOf(cbInPending < 0L ? "n/a" : new MemorySize(cbInPending)) + "), connections " + stats.cConnections + ", errors " + cErrorsDelta);
                    if (!s_fVerbose || stats.histLatency.getSampleCount() <= 0L) continue;
                    System.out.println("\tlatency detail: " + String.valueOf(stats.histLatency.compare(stats2.histLatency)));
                }
                if (s_fVerbose) {
                    void var77_99;
                    for (Bus bus : listBus) {
                        System.out.println("bus:  " + String.valueOf(bus));
                    }
                    System.out.println("mgr:  " + String.valueOf(s_manager));
                    RuntimeMXBean beanRuntime = ManagementFactory.getRuntimeMXBean();
                    System.out.print("jvm:  " + beanRuntime.getSpecVersion() + " " + beanRuntime.getVmVersion() + " ");
                    for (String s : beanRuntime.getInputArguments()) {
                        System.out.print(s + " ");
                    }
                    System.out.print("\ncmd:  ");
                    String[] stringArray = asArg;
                    i = stringArray.length;
                    boolean bl2 = false;
                    while (var77_99 < i) {
                        String s = stringArray[var77_99];
                        System.out.print(s + " ");
                        ++var77_99;
                    }
                    System.out.println();
                    System.out.println("time: " + String.valueOf(new Date()) + "/" + String.valueOf(new Duration(System.currentTimeMillis() - statsWarm.ldt, Duration.Magnitude.MILLI)));
                }
                if (!fTabular) {
                    System.out.println();
                }
                statsPrev = stats;
            }
            ++iReport;
        }
    }

    static {
        s_nLatencyFreq = 100;
        s_fReceipts = true;
        s_fFlowControl = true;
        s_fRelay = false;
        s_fBlock = false;
        s_cErrors = new AtomicLong();
        s_rand = new Random();
    }

    public static class SkipStream
    extends BufferSequenceOutputStream {
        public SkipStream(BufferManager manager) {
            super(manager);
        }

        public SkipStream(BufferManager manager, long cb) {
            super(manager, cb);
        }

        public void skip(long lcb) throws IOException {
            while (lcb > 0L) {
                ByteBuffer buf = this.ensureSpace(lcb);
                int cb = (int)Math.min(lcb, (long)buf.remaining());
                buf.position(buf.position() + cb);
                lcb -= (long)cb;
            }
        }
    }

    public static class EchoBus
    implements MessageBus {
        protected EndPoint m_pointSelf;
        protected Collector<Event> m_collector;

        public EchoBus(EndPoint pointSelf) {
            this.m_pointSelf = pointSelf;
        }

        @Override
        public void send(final EndPoint peer, BufferSequence bufseq, final Object receipt) {
            final Collector<Event> collector = this.getEventCollector();
            collector.add(new SimpleEvent(Event.Type.MESSAGE, peer, bufseq){

                @Override
                public Object dispose(boolean fTakeContent) {
                    if (receipt != null) {
                        collector.add(new SimpleEvent(Event.Type.RECEIPT, peer, receipt));
                    }
                    return super.dispose(fTakeContent);
                }
            });
        }

        @Override
        public EndPoint getLocalEndPoint() {
            return this.m_pointSelf;
        }

        @Override
        public void open() {
            this.getEventCollector().add(new SimpleEvent(Event.Type.OPEN, this.getLocalEndPoint()));
            this.flush();
        }

        @Override
        public void close() {
            this.getEventCollector().add(new SimpleEvent(Event.Type.CLOSE, this.getLocalEndPoint()));
            this.flush();
        }

        @Override
        public void connect(EndPoint peer) {
            this.getEventCollector().add(new SimpleEvent(Event.Type.CONNECT, peer));
            this.flush();
        }

        @Override
        public void disconnect(EndPoint peer) {
            this.getEventCollector().add(new SimpleEvent(Event.Type.DISCONNECT, peer));
            this.flush();
        }

        @Override
        public void release(EndPoint peer) {
            this.getEventCollector().add(new SimpleEvent(Event.Type.RELEASE, peer));
            this.flush();
        }

        @Override
        public void flush() {
            this.getEventCollector().flush();
        }

        @Override
        public void setEventCollector(Collector<Event> collector) {
            this.m_collector = collector;
        }

        @Override
        public Collector<Event> getEventCollector() {
            return this.m_collector;
        }

        public static class EchoDriver
        implements Driver {
            protected Depot m_depot;

            @Override
            public void setDepot(Depot depot) {
                this.m_depot = depot;
            }

            @Override
            public Depot getDepot() {
                return this.m_depot;
            }

            @Override
            public EndPoint resolveEndPoint(String sName) {
                if (sName == null || !sName.equals("echo")) {
                    return null;
                }
                return new EndPoint(){

                    @Override
                    public String getCanonicalName() {
                        return "echo";
                    }

                    public String toString() {
                        return this.getCanonicalName();
                    }
                };
            }

            @Override
            public boolean isSupported(EndPoint point) {
                return point != null && point.getCanonicalName().equals("echo");
            }

            @Override
            public Bus createBus(EndPoint pointLocal) {
                if (this.isSupported(pointLocal)) {
                    return new EchoBus(pointLocal);
                }
                throw new IllegalArgumentException("unsupported");
            }
        }
    }

    public static class EventProcessor
    extends Thread
    implements Runnable {
        protected final Bus m_bus;
        protected final MessageBus m_busMsg;
        protected final Object f_syncTxEvents = new TxEventSynchronizer();
        protected final Object f_syncRxEvents = new RxEventSynchronizer();
        protected final AtomicInteger f_fBacklogGlobal;
        protected BlockingQueue<Event> m_queue;
        protected final Set<EndPoint> m_setPeer;
        protected Iterator<EndPoint> m_iterPeerRelay;
        protected static final Map<Integer, RelayResponse> s_mapRelayResponse = new ConcurrentHashMap<Integer, RelayResponse>();
        protected static final AtomicInteger s_atomicRelayId = new AtomicInteger();
        protected final Set<EndPoint> m_setReady;
        protected final long m_cbsIn;
        protected final int m_nFlushOn;
        protected final Transmitter[] m_aTransmitter;
        protected long m_cConnections;
        protected long m_cbIn;
        protected long m_cMsgIn;
        protected long m_cResponseIn;
        protected long m_cResponseNanosMax = -1L;
        protected long m_cResponseNanosMin = Long.MAX_VALUE;
        protected final Histogram f_histLatency = MessageBusTest.makeLatencyHistogram();
        protected long m_cResponseNanos;
        protected long m_cReceiptTimings;
        protected long m_cReceiptsNanos;
        protected long m_cReceiptsIn;
        protected long m_cbOut;
        protected long m_cMsgOut;
        protected long m_cBacklogEventsLocal;
        protected long m_ldtBacklogLocalStart;
        protected long m_cBacklogMillisLocal;
        protected long m_cBacklogEventsRemote;
        protected final byte[] m_abChunk;

        public EventProcessor(Bus bus, Set<EndPoint> setPeer, Set<EndPoint> setReady, Transmitter[] aTransmitter, long cbsIn, int nFlushOn, AtomicInteger fBacklogGlobal) {
            super("EventProcessor(" + String.valueOf(bus.getLocalEndPoint()) + ")");
            this.m_bus = bus;
            this.m_busMsg = bus instanceof MessageBus ? (MessageBus)bus : null;
            this.f_fBacklogGlobal = fBacklogGlobal;
            this.m_setPeer = setPeer;
            this.m_setReady = setReady;
            this.m_cbsIn = cbsIn;
            this.m_nFlushOn = nFlushOn;
            this.m_aTransmitter = aTransmitter;
            this.m_abChunk = new byte[s_cbChunk];
        }

        public Bus getBus() {
            return this.m_bus;
        }

        public MessageBus getMessageBus() {
            return this.m_busMsg;
        }

        public Set<EndPoint> getPeers() {
            return this.m_setPeer;
        }

        public long getBytesIn() {
            return this.m_cbIn;
        }

        public long getBytesOut() {
            return this.m_cbOut;
        }

        public long getMessagesIn() {
            return this.m_cMsgIn;
        }

        public long getMessagesOut() {
            return this.m_cMsgOut;
        }

        public long getReceiptSamples() {
            return this.m_cReceiptTimings;
        }

        public long getReceiptNanos() {
            return this.m_cReceiptsNanos;
        }

        public long getReceiptsIn() {
            return this.m_cReceiptsIn;
        }

        public long getResponsesIn() {
            return this.m_cResponseIn;
        }

        public long getResponseNanos() {
            return this.m_cResponseNanos;
        }

        public Histogram getResponseLatencyHistogram() {
            return this.f_histLatency;
        }

        public long getLocalBacklogEvents() {
            return this.m_cBacklogEventsLocal;
        }

        public long getLocalBacklogMillis() {
            return this.m_cBacklogMillisLocal;
        }

        public long getRemoteBacklogEvents() {
            return this.m_cBacklogEventsRemote;
        }

        public long getConnectionCount() {
            return this.m_cConnections;
        }

        protected boolean onMessage(Event event) throws IOException {
            BufferSequence bufseq = (BufferSequence)event.getContent();
            BufferSequenceInputStream in = new BufferSequenceInputStream(bufseq);
            long cbMessage = bufseq.getLength();
            this.m_cbIn += cbMessage;
            long cMsgIn = this.m_cMsgIn++;
            int nId = in.readInt();
            boolean fResp = in.readBoolean();
            long ldtNanos = in.readLong();
            long cbPayload = in.readLong();
            if (21L + cbPayload != cbMessage) {
                s_cErrors.incrementAndGet();
                System.err.println("unexpecteded message size " + cbMessage + " rather then " + (21L + cbPayload) + ", msg=" + Buffers.toString(bufseq) + " from " + String.valueOf(event));
                throw new IllegalStateException("unexpected message size in " + String.valueOf(event));
            }
            if (s_fVerbose && cMsgIn % 10000L == 0L) {
                Thread thread;
                String sName;
                int of = (sName = (thread = Thread.currentThread()).getName()).lastIndexOf(35);
                thread.setName(sName.substring(0, of == -1 ? sName.length() : of) + "#" + cMsgIn);
            }
            int cbChunk = s_cbChunk;
            switch (cbChunk) {
                default: {
                    while (cbPayload >= (long)cbChunk) {
                        in.readFully(this.m_abChunk);
                        cbPayload -= (long)cbChunk;
                    }
                }
                case 8: {
                    while (cbPayload >= 8L) {
                        in.readLong();
                        cbPayload -= 8L;
                    }
                }
                case 4: {
                    while (cbPayload >= 4L) {
                        in.readInt();
                        cbPayload -= 4L;
                    }
                }
                case 2: {
                    while (cbPayload >= 2L) {
                        in.readShort();
                        cbPayload -= 2L;
                    }
                }
                case 1: {
                    while (cbPayload >= 1L) {
                        in.readByte();
                        --cbPayload;
                    }
                    break;
                }
                case 0: 
            }
            EndPoint epResponse = null;
            boolean fFlush = false;
            if (fResp) {
                if (s_fRelay) {
                    if (s_fBlock) {
                        RelayResponse resp = s_mapRelayResponse.remove(nId);
                        epResponse = resp.peer;
                        nId = resp.nId;
                    }
                } else {
                    if (ldtNanos > 0L) {
                        ++this.m_cResponseIn;
                        long cNanos = System.nanoTime() - ldtNanos;
                        this.m_cResponseNanos += cNanos;
                        this.f_histLatency.addSample((int)(cNanos / 1000L));
                        if (cNanos > this.m_cResponseNanosMax) {
                            this.m_cResponseNanosMax = cNanos;
                        }
                        if (cNanos < this.m_cResponseNanosMin) {
                            this.m_cResponseNanosMin = cNanos;
                        }
                    }
                    Transmitter tx = this.m_aTransmitter[nId & 0xFFFF];
                    if (s_fBlock) {
                        tx.signalResult(nId >>> 16);
                    }
                }
            } else if (s_fRelay && !this.m_setPeer.contains(event.getEndPoint())) {
                Iterator<EndPoint> iterRelay = this.m_iterPeerRelay;
                if (iterRelay == null || !iterRelay.hasNext()) {
                    iterRelay = this.m_iterPeerRelay = this.m_setPeer.iterator();
                }
                EndPoint epRelay = iterRelay.next();
                if (ldtNanos != 0L) {
                    if (s_fBlock) {
                        RelayResponse resp = new RelayResponse(event.getEndPoint(), nId);
                        nId = s_atomicRelayId.incrementAndGet();
                        s_mapRelayResponse.put(nId, resp);
                    } else {
                        epResponse = event.getEndPoint();
                    }
                }
                BufferSequence seqRelay = MessageBusTest.getMessage(nId, false, ldtNanos);
                ++this.m_cMsgOut;
                this.m_cbOut += seqRelay.getLength();
                this.getMessageBus().send(epRelay, seqRelay, s_fReceipts ? new Receipt(0L, seqRelay) : null);
                fFlush = true;
            } else if (ldtNanos != 0L) {
                epResponse = event.getEndPoint();
            }
            if (epResponse != null) {
                if (s_fPrompt) {
                    System.out.println("Press ENTER to send messge to " + String.valueOf(event.getEndPoint()));
                    System.in.read();
                }
                BufferSequence seqresponse = MessageBusTest.getMessage(nId, true, ldtNanos);
                ++this.m_cMsgOut;
                this.m_cbOut += seqresponse.getLength();
                this.getMessageBus().send(epResponse, seqresponse, s_fReceipts ? new Receipt(0L, seqresponse) : null);
                fFlush = true;
            }
            return fFlush;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean onEvent(Event event) {
            try {
                Bus bus = this.m_bus;
                EndPoint bindEp = bus.getLocalEndPoint();
                Set<EndPoint> setPeer = this.m_setPeer;
                Set<EndPoint> setReady = this.m_setReady;
                boolean fFlush = false;
                EndPoint ep = event.getEndPoint();
                switch (event.getType()) {
                    case OPEN: 
                    case CLOSE: {
                        System.err.println(event);
                        break;
                    }
                    case CONNECT: {
                        if (!s_fSingleUseConnection) {
                            System.err.println(String.valueOf(event) + " on " + String.valueOf(bindEp));
                        }
                        ++this.m_cConnections;
                    }
                    case BACKLOG_NORMAL: {
                        boolean fAdded;
                        if (!s_fFlowControl && event.getType() != Event.Type.CONNECT) break;
                        if (ep == null) {
                            AtomicInteger atomicInteger = this.f_fBacklogGlobal;
                            synchronized (atomicInteger) {
                                if (!this.f_fBacklogGlobal.compareAndSet(1, 0)) {
                                    System.err.println("received out of order event " + String.valueOf(event) + " on " + String.valueOf(bindEp));
                                    s_cErrors.incrementAndGet();
                                }
                                this.f_fBacklogGlobal.notifyAll();
                                break;
                            }
                        }
                        if (ep == bindEp) {
                            long ldtStart = this.m_ldtBacklogLocalStart;
                            if (ldtStart == 0L) {
                                System.err.println("received out of order event " + String.valueOf(event) + " on " + String.valueOf(bindEp));
                                s_cErrors.incrementAndGet();
                                break;
                            }
                            this.m_cBacklogMillisLocal += System.currentTimeMillis() - ldtStart;
                            this.m_ldtBacklogLocalStart = 0L;
                            break;
                        }
                        if (!setPeer.contains(ep)) break;
                        if (setReady.isEmpty()) {
                            Set<EndPoint> set = setReady;
                            synchronized (set) {
                                fAdded = setReady.add(ep);
                                setReady.notifyAll();
                            }
                        } else {
                            fAdded = setReady.add(ep);
                        }
                        if (fAdded) break;
                        System.err.println("received out of order event " + String.valueOf(event) + " on " + String.valueOf(bindEp));
                        s_cErrors.incrementAndGet();
                        break;
                    }
                    case BACKLOG_EXCESSIVE: {
                        if (!s_fFlowControl) break;
                        if (ep == null) {
                            ++this.m_cBacklogEventsRemote;
                            if (this.f_fBacklogGlobal.compareAndSet(0, 1)) break;
                            System.err.println("received out of order event " + String.valueOf(event) + " on " + String.valueOf(bindEp));
                            s_cErrors.incrementAndGet();
                            break;
                        }
                        if (bindEp == ep) {
                            ++this.m_cBacklogEventsLocal;
                            if (this.m_ldtBacklogLocalStart != 0L) {
                                System.err.println("received out of order event " + String.valueOf(event) + " on " + String.valueOf(bindEp));
                                s_cErrors.incrementAndGet();
                            }
                            this.m_ldtBacklogLocalStart = System.currentTimeMillis();
                            break;
                        }
                        ++this.m_cBacklogEventsRemote;
                        if (setReady.remove(ep) || !setPeer.contains(ep)) break;
                        System.err.println("received out of order event " + String.valueOf(event) + " on " + String.valueOf(bindEp));
                        s_cErrors.incrementAndGet();
                        break;
                    }
                    case DISCONNECT: {
                        if (!s_fSingleUseConnection) {
                            System.err.println(String.valueOf(event) + " on " + String.valueOf(bindEp));
                            Throwable t = (Throwable)event.getContent();
                            if (t != null && (!(t instanceof IOException) || s_fVerbose)) {
                                t.printStackTrace(System.err);
                            }
                        }
                        bus.release(ep);
                        break;
                    }
                    case RELEASE: {
                        if (!s_fSingleUseConnection) {
                            System.err.println(String.valueOf(event) + " on " + String.valueOf(bindEp));
                        }
                        --this.m_cConnections;
                        Set<EndPoint> t = setReady;
                        synchronized (t) {
                            setReady.remove(ep);
                            setReady.notifyAll();
                            break;
                        }
                    }
                    case MESSAGE: {
                        fFlush = this.onMessage(event);
                        break;
                    }
                    case RECEIPT: {
                        Receipt receipt = (Receipt)event.getContent();
                        long ldtNanosSent = receipt.getTimestampNanos();
                        ++this.m_cReceiptsIn;
                        if (ldtNanosSent != 0L) {
                            long cNanos = (s_fPollingCollector ? System.nanoTime() : ((StampedEvent)event).getTimestampNanos()) - ldtNanosSent;
                            this.m_cReceiptsNanos += cNanos;
                            ++this.m_cReceiptTimings;
                            if (this.m_busMsg == null) {
                                this.f_histLatency.addSample((int)(cNanos / 1000L));
                            }
                        }
                        receipt.dispose();
                        break;
                    }
                    default: {
                        System.err.println(String.valueOf(event) + " on " + String.valueOf(bindEp));
                    }
                }
                event.dispose();
                return fFlush;
            }
            catch (Throwable e) {
                s_cErrors.incrementAndGet();
                System.err.println("fatal error after receiving " + this.m_cMsgIn + " messages");
                e.printStackTrace(System.err);
                throw new IllegalStateException(e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean add(Event event) {
            BlockingQueue<Event> queue = this.m_queue;
            if (queue == null) {
                switch (event.getType()) {
                    case BACKLOG_NORMAL: 
                    case BACKLOG_EXCESSIVE: {
                        if (event.getEndPoint() == this.m_bus.getLocalEndPoint()) {
                            Object object = this.f_syncRxEvents;
                            synchronized (object) {
                                return this.onEvent(event);
                            }
                        }
                    }
                    case RECEIPT: {
                        Object object = this.f_syncTxEvents;
                        synchronized (object) {
                            return this.onEvent(event);
                        }
                    }
                    case MESSAGE: {
                        Object object = this.f_syncRxEvents;
                        synchronized (object) {
                            return this.onEvent(event);
                        }
                    }
                }
                Object object = this.f_syncTxEvents;
                synchronized (object) {
                    Object object2 = this.f_syncRxEvents;
                    synchronized (object2) {
                        return this.onEvent(event);
                    }
                }
            }
            queue.add(event);
            return false;
        }

        @Override
        public void run() {
            Bus bus = this.m_bus;
            final BlockingQueue<Event> queue = this.m_queue;
            int nFlushOn = this.m_nFlushOn;
            int cFlushEvent = 0;
            long cbsInTarget = this.m_cbsIn;
            long cbEval = cbsInTarget / 8L;
            long ldtLast = 0L;
            long cbInLast = 0L;
            long cThrottleMillis = 1L;
            int nThrottle = 1000;
            Pollable<Event> poll = s_fPollingCollector ? (QueueingEventCollector)bus.getEventCollector() : new Pollable<Event>(){

                @Override
                public Event poll() {
                    return (Event)queue.poll();
                }

                @Override
                public Event poll(long timeout, TimeUnit unit) throws InterruptedException {
                    return timeout == Long.MAX_VALUE ? (Event)queue.take() : (Event)queue.poll(timeout, unit);
                }
            };
            try {
                int i = 0;
                while (true) {
                    Event event = poll.poll();
                    while (event == null) {
                        if (cFlushEvent > 0) {
                            cFlushEvent = 0;
                            bus.flush();
                        }
                        SingleWaiterCooperativeNotifier.flush();
                        event = poll.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    }
                    if (this.onEvent(event) && ++cFlushEvent > nFlushOn) {
                        cFlushEvent = 0;
                        bus.flush();
                    }
                    if (cbsInTarget > 0L) {
                        if (i % nThrottle == 0) {
                            if (cFlushEvent > 0) {
                                cFlushEvent = 0;
                                bus.flush();
                            }
                            Blocking.sleep(cThrottleMillis);
                        }
                        long cbIn = this.m_cbIn;
                        long cbDelta = cbIn - cbInLast;
                        if (cbInLast == 0L && ldtLast == 0L) {
                            ldtLast = System.currentTimeMillis();
                        } else if (cbDelta > cbEval) {
                            long ldtNow = System.currentTimeMillis();
                            long cMillis = Math.max(1L, ldtNow - ldtLast);
                            double cbs = cbDelta * 1000L / cMillis;
                            double dfl = (double)cbsInTarget / cbs;
                            int nThrottleNew = (int)Math.round((double)(nThrottle = Math.max(1, (int)Math.round((double)nThrottle * dfl))) * dfl);
                            if (nThrottleNew == 0) {
                                nThrottleNew = 1;
                                ++cThrottleMillis;
                            } else if (nThrottleNew == nThrottle) {
                                if (dfl > 1.01) {
                                    ++cThrottleMillis;
                                } else if (dfl < 0.09) {
                                    --cThrottleMillis;
                                }
                            }
                            cbInLast = cbIn;
                            ldtLast = ldtNow;
                            i = 0;
                        }
                    }
                    ++i;
                }
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }

        @Override
        public void start() {
            this.m_queue = new SingleConsumerBlockingQueue<Event>();
            super.start();
        }

        public static class TxEventSynchronizer {
        }

        public static class RxEventSynchronizer {
        }

        public static class RelayResponse {
            EndPoint peer;
            int nId;

            public RelayResponse(EndPoint peer, int nId) {
                this.peer = peer;
                this.nId = nId;
            }
        }
    }

    public static class Transmitter
    extends Thread {
        private volatile Object m_oResult;
        private final Notifier f_notifier = new SingleWaiterCooperativeNotifier();
        protected final Bus f_bus;
        protected final BufferSequence f_bufSeqCached;
        protected final ByteBuffer[] f_aBufCached;
        protected final AtomicInteger f_fBacklogGlobal;
        protected final int f_nId;
        protected final Set<EndPoint> f_setReady;
        protected long f_cbs;
        protected final int f_nFlushOn;
        protected final long f_cbTxMaxBacklog;
        protected final AtomicLong f_cbTxBacklog = new AtomicLong();
        protected final Notifier f_notifierBacklog = new SingleWaiterCooperativeNotifier();
        protected final AtomicInteger f_cPendingResponses = new AtomicInteger();
        protected long m_cbOut;
        protected long m_cMsgOut;
        protected long m_cMillisBacklog;
        protected volatile long m_ldtBacklogStart;
        protected final long s_cbMsgAvg = s_cbMsgMin + (s_cbMsgMax - s_cbMsgMin) / 2L;

        public Transmitter(Bus bus, int nId, Set<EndPoint> setReady, int nFlushOn, AtomicInteger fBacklogGlobal, long cbBacklog) throws IOException {
            super("Transmitter(" + String.valueOf(bus.getLocalEndPoint()) + ")");
            this.f_bus = bus;
            this.f_bufSeqCached = s_fCached ? MessageBusTest.getMessage(nId, false, 0L) : null;
            this.f_aBufCached = s_fCached ? this.f_bufSeqCached.getBuffers() : null;
            this.f_fBacklogGlobal = fBacklogGlobal;
            this.f_nId = nId;
            this.f_setReady = setReady;
            this.f_nFlushOn = nFlushOn;
            this.f_cbTxMaxBacklog = cbBacklog;
        }

        public void signalResult(Object oResult) {
            long cPending = this.f_cPendingResponses.decrementAndGet();
            if (cPending == 0L) {
                this.m_oResult = oResult;
                this.f_notifier.signal();
            } else if (cPending < 0L) {
                throw new IllegalStateException();
            }
        }

        public Object awaitResult() throws InterruptedException {
            Object oResult = this.m_oResult;
            if (oResult == null) {
                if (AWAIT_SPIN_NANOS > 0L) {
                    long ldtEnd = System.nanoTime() + AWAIT_SPIN_NANOS;
                    while (oResult == null && System.nanoTime() < ldtEnd) {
                        oResult = this.m_oResult;
                    }
                }
                while (oResult == null) {
                    this.f_notifier.await();
                    oResult = this.m_oResult;
                }
            }
            this.m_oResult = null;
            return oResult;
        }

        public void setTransmitRate(long cbs) {
            this.f_cbs = cbs;
        }

        public long getBytesOut() {
            return this.m_cbOut;
        }

        public long getMessagesOut() {
            return this.m_cMsgOut;
        }

        public long getRemoteBacklogMillis() {
            long ldtBacklogStart = this.m_ldtBacklogStart;
            long cMillisBacklog = this.m_cMillisBacklog;
            return ldtBacklogStart == 0L || this.m_cbOut == 0L ? cMillisBacklog : cMillisBacklog + (System.currentTimeMillis() - ldtBacklogStart);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Bus bus = this.f_bus;
                MessageBus busMsg = null;
                MemoryBus busMem = null;
                if (bus instanceof MessageBus) {
                    busMsg = (MessageBus)bus;
                } else if (bus instanceof MemoryBus) {
                    busMem = (MemoryBus)bus;
                }
                Set<EndPoint> setReady = this.f_setReady;
                long cbsTarget = this.f_cbs;
                long cbEval = cbsTarget / 8L;
                boolean fBlock = s_fBlock;
                long nFlushOn = Math.max(fBlock ? 1 : 0, this.f_nFlushOn);
                AtomicInteger cPending = this.f_cPendingResponses;
                long lSeq = 0L;
                long cThrottleMillis = 1L;
                long nThrottle = Math.max(1L, (long)((int)cbEval) / (s_cbMsgMax - (s_cbMsgMax - s_cbMsgMin) / 2L) / 8L);
                long ldtLast = System.currentTimeMillis();
                long cbThis = 0L;
                long cSendThrottle = 0L;
                long cMsgOut = 0L;
                Iterator<EndPoint> iterPeer = setReady.iterator();
                while (true) {
                    Thread thread;
                    String sName;
                    block49: {
                        if (this.f_fBacklogGlobal.get() == 1) {
                            long ldtStartWait = this.m_ldtBacklogStart = System.currentTimeMillis();
                            AtomicInteger atomicInteger = this.f_fBacklogGlobal;
                            synchronized (atomicInteger) {
                                while (this.f_fBacklogGlobal.get() == 1) {
                                    Blocking.wait(this.f_fBacklogGlobal);
                                }
                            }
                            if (cbThis > 0L) {
                                this.m_cMillisBacklog += System.currentTimeMillis() - ldtStartWait;
                            }
                        }
                        if (!iterPeer.hasNext() && !(iterPeer = setReady.iterator()).hasNext()) {
                            bus.flush();
                            Set<EndPoint> ldtStartWait = setReady;
                            synchronized (ldtStartWait) {
                                iterPeer = setReady.iterator();
                                while (!iterPeer.hasNext()) {
                                    long ldtStartWait2 = this.m_ldtBacklogStart = System.currentTimeMillis();
                                    Blocking.wait(setReady);
                                    this.m_ldtBacklogStart = 0L;
                                    if (cbThis > 0L) {
                                        this.m_cMillisBacklog += System.currentTimeMillis() - ldtStartWait2;
                                    }
                                    iterPeer = setReady.iterator();
                                }
                            }
                        }
                        Disposable bufseq = null;
                        try {
                            BacklogTrackingReceipt receipt;
                            if (fBlock) {
                                cPending.incrementAndGet();
                            }
                            EndPoint peer = iterPeer.next();
                            if (s_fPrompt) {
                                System.out.println("Press ENTER to send messge to " + String.valueOf(peer));
                                System.in.read();
                            }
                            long ldtNanos = s_nLatencyFreq != 0 && ++lSeq % (long)s_nLatencyFreq == 0L ? System.nanoTime() : (fBlock ? -1L : 0L);
                            bufseq = this.getMessage(ldtNanos);
                            long cb = bufseq.getLength();
                            BacklogTrackingReceipt backlogTrackingReceipt = s_fReceipts ? new BacklogTrackingReceipt(Math.max(0L, ldtNanos), bufseq == this.f_bufSeqCached ? null : bufseq) : (receipt = null);
                            if (busMsg != null) {
                                busMsg.send(peer, (BufferSequence)bufseq, receipt);
                            } else {
                                if (s_fBlock) {
                                    final Disposable garbage = receipt.m_garbage;
                                    receipt.m_garbage = new Disposable(){

                                        @Override
                                        public void dispose() {
                                            this.signalResult(0);
                                            if (garbage != null) {
                                                garbage.dispose();
                                            }
                                        }
                                    };
                                }
                                long cbPeer = busMem.getCapacity(peer);
                                long lOffset = Hasher.mod(s_rand.nextLong(), cbPeer - bufseq.getLength());
                                switch ((int)busMem.getCapacity(null)) {
                                    case 0: {
                                        if (cbPeer == 0L) {
                                            cb = 8L;
                                            busMem.signal(peer, 1234L, receipt);
                                            break;
                                        }
                                        busMem.write(peer, lOffset, (BufferSequence)bufseq, receipt);
                                        break;
                                    }
                                    case 8: {
                                        cb = 16L;
                                        busMem.getAndAdd(peer, 0L, 1L, null, receipt);
                                        break;
                                    }
                                    default: {
                                        busMem.read(peer, lOffset, (BufferSequence)bufseq, receipt);
                                    }
                                }
                            }
                            ++this.m_cMsgOut;
                            cbThis += cb;
                            this.m_cbOut += cb;
                            if (this.f_cbTxMaxBacklog != -1L && this.f_cbTxBacklog.addAndGet(this.s_cbMsgAvg) > this.f_cbTxMaxBacklog) {
                                bus.flush();
                                this.f_notifierBacklog.await();
                            }
                            if (nFlushOn != 0L && cMsgOut % nFlushOn == 0L) {
                                bus.flush();
                                if (fBlock) {
                                    int nSeq = (Integer)this.awaitResult();
                                    int nExp = (int)(this.m_cMsgOut - 1L & 0xFFFFL);
                                    if (nSeq != 0 && nExp != nSeq) {
                                        throw new IllegalStateException("unexpected response id " + nSeq + " expected " + nExp);
                                    }
                                }
                            }
                            if (!s_fSingleUseConnection) break block49;
                            bus.disconnect(peer);
                            Set<EndPoint> nSeq = setReady;
                            synchronized (nSeq) {
                                while (setReady.contains(peer)) {
                                    setReady.wait();
                                }
                            }
                            bus.connect(peer);
                        }
                        catch (IllegalArgumentException e) {
                            if (fBlock) {
                                cPending.decrementAndGet();
                            }
                            if (bufseq == null || bufseq == this.f_bufSeqCached) break block49;
                            bufseq.dispose();
                        }
                    }
                    if (cbsTarget > 0L && ++cSendThrottle % nThrottle == 0L) {
                        bus.flush();
                        Blocking.sleep(cThrottleMillis);
                        if (cbThis >= cbEval) {
                            long ldtNow = System.currentTimeMillis();
                            long cMillis = ldtNow - ldtLast;
                            if (cMillis == 0L) {
                                ++nThrottle;
                            } else {
                                double cbs = cbThis * 1000L / cMillis;
                                double dfl = (double)cbsTarget / cbs;
                                int nThrottleNew = (int)Math.round((double)nThrottle * dfl);
                                if (nThrottleNew == 0) {
                                    nThrottleNew = 1;
                                    ++cThrottleMillis;
                                } else if ((long)nThrottleNew == nThrottle) {
                                    if (dfl > 1.01) {
                                        ++cThrottleMillis;
                                    } else if (dfl < 0.09) {
                                        --cThrottleMillis;
                                    }
                                }
                                nThrottle = nThrottleNew;
                            }
                            cSendThrottle = 0L;
                            cbThis = 0L;
                            ldtLast = ldtNow;
                        }
                    }
                    if (!s_fVerbose || cMsgOut % 10000L != 0L) continue;
                    int of = (sName = (thread = Thread.currentThread()).getName()).lastIndexOf(35);
                    thread.setName(sName.substring(0, of == -1 ? sName.length() : of) + "#" + cMsgOut);
                }
            }
            catch (Throwable e) {
                s_cErrors.incrementAndGet();
                System.err.println("fatal error after sending " + this.m_cMsgOut + " messages");
                throw new IllegalStateException(e);
            }
        }

        public BufferSequence getMessage(long ldtSent) throws IOException {
            if (s_fCached) {
                if (s_fBlock) {
                    for (ByteBuffer buf : this.f_aBufCached) {
                        buf.position(0);
                    }
                    this.f_aBufCached[0].putLong(5, ldtSent);
                    return this.f_aBufCached.length == 1 ? new SingleBufferSequence(null, this.f_aBufCached[0]) : new MultiBufferSequence(null, this.f_aBufCached);
                }
                if (ldtSent == 0L) {
                    return this.f_bufSeqCached;
                }
            }
            return MessageBusTest.getMessage((int)(this.m_cMsgOut << 16) | this.f_nId, false, ldtSent);
        }

        public class BacklogTrackingReceipt
        extends Receipt {
            public BacklogTrackingReceipt(long ldtNanos, Disposable garbage) {
                super(ldtNanos, garbage);
            }

            @Override
            public void dispose() {
                long cbThreshold;
                long cbBacklogPre;
                super.dispose();
                if (Transmitter.this.f_cbTxMaxBacklog != -1L && (cbBacklogPre = Transmitter.this.f_cbTxBacklog.getAndAdd(-Transmitter.this.s_cbMsgAvg)) > (cbThreshold = Transmitter.this.f_cbTxMaxBacklog / 3L) && cbBacklogPre - Transmitter.this.s_cbMsgAvg < Transmitter.this.f_cbTxMaxBacklog) {
                    Transmitter.this.f_notifierBacklog.signal();
                }
            }
        }
    }

    public static class DemultiplexingCollector
    implements Collector<Event> {
        protected final Bus m_bus;
        protected final EventProcessor[] m_aProcessor;
        protected final AtomicLong[] m_acbReceived;
        protected boolean m_fFlush;

        public DemultiplexingCollector(Bus bus, EventProcessor[] aProcessor) {
            this.m_bus = bus;
            this.m_aProcessor = aProcessor;
            this.m_acbReceived = new AtomicLong[aProcessor.length];
            int c = this.m_acbReceived.length;
            for (int i = 0; i < c; ++i) {
                this.m_acbReceived[i] = new AtomicLong();
            }
        }

        @Override
        public void add(Event event) {
            EndPoint pointSrc = event.getEndPoint();
            int nHash = pointSrc == null ? 0 : pointSrc.hashCode();
            switch (event.getType()) {
                case RECEIPT: {
                    if (((Receipt)event.getContent()).getTimestampNanos() == 0L) break;
                    event = new StampedEvent(event);
                    break;
                }
                case MESSAGE: {
                    this.m_acbReceived[Hasher.mod(nHash, this.m_acbReceived.length)].addAndGet(((BufferSequence)event.getContent()).getLength());
                    break;
                }
            }
            if (this.m_aProcessor[Hasher.mod(nHash, this.m_aProcessor.length)].add(event)) {
                this.m_fFlush = true;
            }
        }

        @Override
        public void flush() {
            if (this.m_fFlush) {
                this.m_fFlush = false;
                this.m_bus.flush();
            }
            SingleWaiterCooperativeNotifier.flush();
        }

        public long getReceivedBytes() {
            long cb = 0L;
            int c = this.m_acbReceived.length;
            for (int i = 0; i < c; ++i) {
                cb += this.m_acbReceived[i].get();
            }
            return cb;
        }
    }

    public static class StampedEvent
    implements Event {
        protected final Event m_evt;
        protected long m_ldtNanos;

        public StampedEvent(Event evt) {
            this.m_evt = evt;
            this.m_ldtNanos = System.nanoTime();
        }

        @Override
        public Event.Type getType() {
            return this.m_evt.getType();
        }

        @Override
        public EndPoint getEndPoint() {
            return this.m_evt.getEndPoint();
        }

        @Override
        public Object getContent() {
            return this.m_evt.getContent();
        }

        @Override
        public Object dispose(boolean fTakeContent) {
            return this.m_evt.dispose(fTakeContent);
        }

        @Override
        public void dispose() {
            this.m_evt.dispose();
        }

        public long getTimestampNanos() {
            return this.m_ldtNanos;
        }
    }

    public static class Receipt
    implements Disposable {
        long m_ldtNanos;
        Disposable m_garbage;

        public Receipt(long ldtNanos, Disposable garbage) {
            this.m_ldtNanos = ldtNanos;
            this.m_garbage = garbage;
        }

        public long getTimestampNanos() {
            return this.m_ldtNanos;
        }

        @Override
        public void dispose() {
            Disposable garbage = this.m_garbage;
            if (garbage != null) {
                this.m_garbage = null;
                garbage.dispose();
            }
        }
    }
}

