/*
 * Decompiled with CFR 0.152.
 */
package com.oracle.coherence.common.internal.net.socketbus;

import com.oracle.coherence.common.base.Disposable;
import com.oracle.coherence.common.internal.net.socketbus.AbstractSocketBus;
import com.oracle.coherence.common.internal.net.socketbus.BufferedSocketBus;
import com.oracle.coherence.common.internal.net.socketbus.MultiBufferMessageEvent;
import com.oracle.coherence.common.internal.net.socketbus.SharedBuffer;
import com.oracle.coherence.common.internal.net.socketbus.SingleBufferMessageEvent;
import com.oracle.coherence.common.internal.net.socketbus.SocketBusDriver;
import com.oracle.coherence.common.internal.util.HeapDump;
import com.oracle.coherence.common.io.BufferManager;
import com.oracle.coherence.common.io.BufferSequence;
import com.oracle.coherence.common.io.BufferSequenceInputStream;
import com.oracle.coherence.common.io.Buffers;
import com.oracle.coherence.common.net.exabus.EndPoint;
import com.oracle.coherence.common.net.exabus.Event;
import com.oracle.coherence.common.net.exabus.MessageBus;
import com.oracle.coherence.common.net.exabus.util.SimpleEvent;
import com.oracle.coherence.common.net.exabus.util.UrlEndPoint;
import com.oracle.coherence.common.util.MemorySize;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.zip.CRC32;

public class SocketMessageBus
extends BufferedSocketBus
implements MessageBus {
    protected final AtomicBoolean f_fBacklogLocal = new AtomicBoolean();
    protected final AtomicLong m_cbEventQueue = new AtomicLong();

    public SocketMessageBus(SocketBusDriver driver, UrlEndPoint pointLocal) throws IOException {
        super(driver, pointLocal);
    }

    @Override
    protected String getProtocolName() {
        return this.getSocketDriver().getDependencies().getMessageBusProtocol();
    }

    @Override
    protected int getProtocolIdentifier() {
        return 1522655233;
    }

    @Override
    public void send(EndPoint peer, BufferSequence bufseq, Object receipt) {
        this.send(peer, bufseq, receipt, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void send(EndPoint peer, BufferSequence bufseq, Object receipt, boolean fSocketWrite) {
        if (bufseq == null) {
            throw new NullPointerException("Null BufferSequence for message:  " + String.valueOf(receipt));
        }
        MessageConnection conn = (MessageConnection)this.ensureConnection(peer);
        if (conn.getProtocolVersion() < 0 && conn.deferSend(bufseq, receipt)) {
            return;
        }
        AtomicInteger cWriters = conn.m_cWritersWaiting;
        cWriters.getAndIncrement();
        MessageConnection messageConnection = conn;
        synchronized (messageConnection) {
            cWriters.getAndDecrement();
            conn.ensureValid().evaluateAutoFlush(conn.isFlushInProgress(), conn.isFlushRequired(), conn.send(bufseq, receipt), fSocketWrite);
        }
    }

    @Override
    protected AbstractSocketBus.Connection makeConnection(UrlEndPoint peer) {
        return new MessageConnection(peer);
    }

    protected void wakeupConnections() {
        for (AbstractSocketBus.Connection conn : this.getConnections()) {
            try {
                conn.wakeup();
            }
            catch (IOException e) {
                conn.onException(e);
            }
        }
    }

    protected void issueLocalBacklog() {
        if (this.f_fBacklogLocal.compareAndSet(false, true)) {
            this.addEvent(new TaskEvent(new SimpleEvent(Event.Type.BACKLOG_EXCESSIVE, this.getLocalEndPoint()), new Runnable(){

                @Override
                public void run() {
                    SocketMessageBus.this.emitEvent(new SimpleEvent(Event.Type.BACKLOG_NORMAL, SocketMessageBus.this.getLocalEndPoint()));
                    SocketMessageBus.this.m_cbEventQueue.set(0L);
                    SocketMessageBus.this.f_fBacklogLocal.set(false);
                    SocketMessageBus.this.wakeupConnections();
                }
            }));
        }
    }

    public class MessageConnection
    extends BufferedSocketBus.BufferedConnection {
        protected ReadBatch m_readBatch;
        protected long m_cbReadThreshold;
        protected ByteBuffer m_bufferMsgHdr;
        protected long m_cMsgUserIn;
        protected long m_cMsgUserOut;
        protected long m_cReceiptsNull;
        protected LinkedList<Pair<BufferSequence, Object>> m_queuePreNegotiate;

        public MessageConnection(UrlEndPoint peer) {
            super(peer);
            this.m_queuePreNegotiate = null;
        }

        protected long getReadThrottleThreshold() {
            long cb = this.m_cbReadThreshold;
            if (cb <= 0L) {
                try {
                    this.m_cbReadThreshold = cb = (long)(this.getReceiveBufferSize() * 8);
                }
                catch (SocketException socketException) {
                    // empty catch block
                }
                if (cb <= 0L) {
                    cb = 65536L;
                }
            }
            return cb;
        }

        @Override
        protected int processReads(boolean fReady) throws IOException {
            if (SocketMessageBus.this.f_fBacklogLocal.get() && SocketMessageBus.this.m_cbEventQueue.get() > this.getReadThrottleThreshold()) {
                return 0;
            }
            if (fReady) {
                ReadBatch batch = this.m_readBatch;
                if (batch == null) {
                    batch = this.m_readBatch = new ReadBatch();
                    batch.m_cbRequired = this.getMessageHeaderSize();
                }
                batch.read();
                if (batch.m_fHeader && batch.m_cbReadable == 0L) {
                    batch.dispose();
                    this.m_readBatch = null;
                    return 1;
                }
                return 1 | OP_EAGER;
            }
            return 1;
        }

        @Override
        public void dispose() {
            ByteBuffer bufferHdr;
            ReadBatch batch = this.m_readBatch;
            if (batch != null) {
                this.m_readBatch = null;
                batch.dispose();
            }
            if ((bufferHdr = this.m_bufferMsgHdr) != null) {
                SocketMessageBus.this.getSocketDriver().getDependencies().getBufferManager().release(bufferHdr);
                this.m_bufferMsgHdr = null;
            }
            super.dispose();
        }

        public long send(BufferSequence bufseq, Object receipt) {
            ByteBuffer bufferMsgHdr;
            long cbMsg = bufseq.getLength();
            int nProt = this.getProtocolVersion();
            int cbHeader = this.getMessageHeaderSize();
            if (cbMsg < 0L || nProt < 5 && cbMsg > Integer.MAX_VALUE) {
                throw new UnsupportedOperationException("unsupported message size " + cbMsg);
            }
            BufferedSocketBus.BufferedConnection.WriteBatch batch = this.m_batchWriteUnflushed;
            if (batch == null) {
                batch = this.m_batchWriteUnflushed = new BufferedSocketBus.BufferedConnection.WriteBatch();
            }
            if (this.m_bufferMsgHdr == null) {
                this.m_bufferMsgHdr = SocketMessageBus.this.getSocketDriver().getDependencies().getBufferManager().acquire(cbHeader * 1024);
                int cbCap = this.m_bufferMsgHdr.capacity();
                this.m_bufferMsgHdr.limit(cbCap - cbCap % cbHeader);
            }
            if ((bufferMsgHdr = this.m_bufferMsgHdr).remaining() > cbHeader) {
                ByteBuffer buffHeader = bufferMsgHdr.slice();
                buffHeader.limit(buffHeader.position() + cbHeader);
                bufferMsgHdr.position(bufferMsgHdr.position() + cbHeader);
                batch.append(buffHeader, false, bufseq, receipt);
            } else {
                batch.append(bufferMsgHdr, true, bufseq, receipt);
                this.m_bufferMsgHdr = null;
            }
            ++this.m_cMsgUserOut;
            if (receipt == null) {
                ++this.m_cReceiptsNull;
            }
            return batch.getLength();
        }

        @Override
        protected void populateMessageHeader(ByteBuffer bufHead, ByteBuffer[] aBuffer, int of, int cBuffers, long cbBuffer) {
            int nProt = this.getProtocolVersion();
            int nPos = bufHead.position();
            if (nProt > 4) {
                CRC32 crc32 = this.f_crcTx;
                int lCrcBody = 0;
                int nLimit = bufHead.limit();
                bufHead.putLong(nPos, cbBuffer);
                nPos += 8;
                if (crc32 != null) {
                    crc32.reset();
                    lCrcBody = Buffers.updateCrc(crc32, aBuffer, of, cbBuffer);
                    lCrcBody = lCrcBody == 0 ? 1 : lCrcBody;
                }
                bufHead.putInt(nPos, lCrcBody);
                nPos += 4;
                int lCrcHeader = 0;
                if (crc32 != null) {
                    crc32.reset();
                    bufHead.limit(nPos);
                    lCrcHeader = Buffers.updateCrc(crc32, bufHead);
                    lCrcHeader = lCrcHeader == 0 ? 1 : lCrcHeader;
                    bufHead.limit(nLimit);
                }
                bufHead.putInt(nPos, lCrcHeader);
            } else {
                bufHead.putInt(nPos, (int)cbBuffer);
            }
        }

        public int getMessageHeaderSize() {
            int nProt = this.getProtocolVersion();
            if (nProt > 4) {
                return 16;
            }
            if (nProt >= 0) {
                return 4;
            }
            throw new IllegalStateException("connection is not ready!");
        }

        @Override
        protected int getReceiptSize() {
            int nProt = this.getProtocolVersion();
            return nProt > 4 ? 25 : 13;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void setProtocolVersion(int nProt) {
            MessageConnection messageConnection = this;
            synchronized (messageConnection) {
                super.setProtocolVersion(nProt);
                LinkedList<Pair<BufferSequence, Object>> queue = this.m_queuePreNegotiate;
                if (queue != null) {
                    if (this.m_fBacklog) {
                        this.m_fBacklog = false;
                        SocketMessageBus.this.emitEvent(new SimpleEvent(Event.Type.BACKLOG_NORMAL, this.getPeer()));
                    }
                    Pair<BufferSequence, Object> pair = null;
                    try {
                        while ((pair = queue.poll()) != null) {
                            this.send(pair.getKey(), pair.getValue());
                        }
                        this.flush();
                        this.m_queuePreNegotiate = null;
                    }
                    catch (Throwable e) {
                        queue.addFirst(pair);
                        this.scheduleDisconnect(e);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void drainReceipts() {
            MessageConnection messageConnection = this;
            synchronized (messageConnection) {
                LinkedList<Pair<BufferSequence, Object>> queue = this.m_queuePreNegotiate;
                this.m_queuePreNegotiate = null;
                if (queue != null) {
                    for (Pair pair : queue) {
                        Object oReceipt = pair.getValue();
                        if (oReceipt == null) continue;
                        SocketMessageBus.this.addEvent(new SimpleEvent(Event.Type.RECEIPT, this.getPeer(), oReceipt));
                    }
                    if (this.m_fBacklog) {
                        this.m_fBacklog = false;
                        SocketMessageBus.this.emitEvent(new SimpleEvent(Event.Type.BACKLOG_NORMAL, this.getPeer()));
                    }
                }
                super.drainReceipts();
            }
        }

        @Override
        public String toString() {
            ReadBatch batch = this.m_readBatch;
            long cReceiptsEmitted = this.m_cReceiptsEmitted;
            long cReceiptsNull = this.m_cReceiptsNull;
            return super.toString() + ", bufferedIn(" + String.valueOf(batch == null ? "" : batch) + "), msgs(in=" + this.m_cMsgUserIn + ", out=" + this.m_cReceiptsEmitted + (String)(cReceiptsNull == 0L ? "" : "[" + (cReceiptsEmitted + cReceiptsNull) + "]") + "/" + this.m_cMsgUserOut + ")";
        }

        public void onMessageDispose(BufferSequence bufseq) {
            long cbCur;
            AtomicLong atomicCb = SocketMessageBus.this.m_cbEventQueue;
            long cbSeq = bufseq.getLength();
            while (!atomicCb.compareAndSet(cbCur = atomicCb.get(), Math.max(0L, cbCur - cbSeq))) {
            }
        }

        @Override
        public void onMigration() {
            super.onMigration();
            ReadBatch batchRead = this.m_readBatch;
            if (batchRead != null) {
                SocketMessageBus.this.getLogger().log(Level.FINER, "{0} discarding partial message from {1} consisting of {2} out of {3} bytes on {4}", new Object[]{SocketMessageBus.this.getLocalEndPoint(), this.getPeer(), batchRead.m_cbReadable, batchRead.m_cbRequired, this});
                batchRead.dispose();
                this.m_readBatch = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected boolean deferSend(BufferSequence bufSeq, Object receipt) {
            MessageConnection messageConnection = this;
            synchronized (messageConnection) {
                this.ensureValid();
                if (this.getProtocolVersion() < 0) {
                    LinkedList<Pair<BufferSequence, Object>> queue = this.m_queuePreNegotiate;
                    if (queue == null) {
                        queue = this.m_queuePreNegotiate = new LinkedList();
                        this.invoke(() -> {
                            MessageConnection messageConnection = this;
                            synchronized (messageConnection) {
                                if (!this.m_fBacklog && this.getProtocolVersion() < 0) {
                                    this.m_fBacklog = true;
                                    SocketMessageBus.this.emitEvent(new SimpleEvent(Event.Type.BACKLOG_EXCESSIVE, this.getPeer()));
                                }
                            }
                        });
                    }
                    queue.add(new Pair<BufferSequence, Object>(bufSeq, receipt));
                    if (this.m_state == AbstractSocketBus.ConnectionState.DEFUNCT) {
                        this.invoke(this::drainReceipts);
                    }
                    return true;
                }
            }
            return false;
        }

        public class ReadBatch
        extends AtomicReference<ByteBuffer>
        implements Disposable,
        SharedBuffer.Disposer {
            protected ByteBuffer[] m_aBuffer = new ByteBuffer[2];
            protected int m_ofWritable;
            protected int m_cBufferWritable;
            protected long m_cbWritable;
            protected int m_ofReadable;
            protected long m_cbReadable;
            protected long m_cbRequired;
            protected long m_lCrcBodyNext;
            protected boolean m_fHeader = true;
            protected SharedBuffer m_bufferShared;

            public ByteBuffer[] ensureCapacity(long cb) {
                BufferManager manager = SocketMessageBus.this.getSocketDriver().getDependencies().getBufferManager();
                Object[] aBuffer = this.m_aBuffer;
                int ofReadable = this.m_ofReadable;
                int ofWritable = this.m_ofWritable;
                long cbWritable = this.m_cbWritable;
                int cBufferWritable = this.m_cBufferWritable;
                int cBuffer = aBuffer.length;
                long cbAlloc = cb - cbWritable;
                long cbMin = Math.max(MessageConnection.this.getPacketSize(), 16384);
                while (cbAlloc > 0L) {
                    int ofEnd = ofWritable + cBufferWritable;
                    if (ofEnd < cBuffer && aBuffer[ofEnd] == null) {
                        ofAlloc = ofEnd;
                    } else if (ofEnd + 1 < cBuffer) {
                        ofAlloc = ofEnd + 1;
                    } else if (ofReadable > 0) {
                        ofAlloc = ofEnd - ofReadable;
                        System.arraycopy(aBuffer, ofReadable, aBuffer, 0, ofAlloc);
                        Arrays.fill(aBuffer, ofAlloc, ofAlloc + ofReadable, null);
                        ofWritable -= ofReadable;
                        ofEnd -= ofReadable;
                        ofReadable = 0;
                    } else {
                        ByteBuffer[] aBufferNew = new ByteBuffer[cBuffer * 2];
                        System.arraycopy(aBuffer, 0, aBufferNew, 0, cBuffer);
                        aBuffer = aBufferNew;
                        ofAlloc = cBuffer;
                        cBuffer = aBuffer.length;
                    }
                    long cbStop = Math.max(cbMin, cbWritable + cbAlloc + (long)MessageConnection.this.getMessageHeaderSize());
                    for (int i = ofAlloc; i < cBuffer && cbWritable < cbStop; ++i) {
                        ByteBuffer buff = this.getAndSet(null);
                        if (buff == null) {
                            buff = manager.acquirePref((int)Math.min(Integer.MAX_VALUE, cbStop - cbWritable));
                        }
                        buff.clear().mark();
                        int cbBuff = buff.remaining();
                        cbAlloc -= Math.min((long)cbBuff, cbAlloc);
                        cbWritable += (long)cbBuff;
                        ++cBufferWritable;
                        aBuffer[i] = buff;
                    }
                }
                this.m_aBuffer = aBuffer;
                this.m_ofReadable = ofReadable;
                this.m_ofWritable = ofWritable;
                this.m_cbWritable = cbWritable;
                this.m_cBufferWritable = cBufferWritable;
                return aBuffer;
            }

            @Override
            public void dispose() {
                int of = this.m_ofReadable;
                SharedBuffer buffShared = this.m_bufferShared;
                ByteBuffer[] aBuffer = this.m_aBuffer;
                ByteBuffer buffer0 = aBuffer[of];
                if (buffShared != null && buffShared.get() == buffer0) {
                    buffShared.dispose();
                    ++of;
                }
                BufferManager manager = SocketMessageBus.this.getSocketDriver().getDependencies().getBufferManager();
                int c = this.m_ofWritable + this.m_cBufferWritable;
                while (of < c) {
                    manager.release(aBuffer[of]);
                    ++of;
                }
                ByteBuffer buf = this.getAndSet(Buffers.getEmptyBuffer());
                if (buf != null) {
                    manager.release(buf);
                }
            }

            @Override
            public void dispose(ByteBuffer buffer) {
                if (!this.compareAndSet(null, buffer)) {
                    SocketMessageBus.this.getSocketDriver().getDependencies().getBufferManager().release(buffer);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void read() throws IOException {
                int cBuffer;
                int of;
                long cbAlloc = Math.abs(this.m_cbRequired) - this.m_cbWritable;
                ByteBuffer[] aBuffer = cbAlloc > 0L ? this.ensureCapacity(cbAlloc) : this.m_aBuffer;
                long cb = MessageConnection.this.read(aBuffer, of = this.m_ofWritable, cBuffer = this.m_cBufferWritable);
                if (cb > 0L) {
                    while (cBuffer > 0 && !aBuffer[of].hasRemaining()) {
                        aBuffer[of].reset();
                        ++of;
                        --cBuffer;
                    }
                    this.m_ofWritable = of;
                    this.m_cBufferWritable = cBuffer;
                    this.m_cbWritable -= cb;
                    long cbReady = this.m_cbReadable += cb;
                    if (cbReady >= Math.abs(this.m_cbRequired)) {
                        if (cBuffer > 0 && aBuffer[of].position() > 0) {
                            ByteBuffer buffLast = aBuffer[of];
                            int iPosWrite = buffLast.position();
                            try {
                                buffLast.limit(iPosWrite).reset();
                                this.onReady();
                            }
                            finally {
                                buffLast.mark();
                                buffLast.limit(buffLast.capacity()).position(iPosWrite);
                            }
                        } else {
                            this.onReady();
                        }
                    }
                } else if (cb < 0L) {
                    MessageConnection.this.migrate(new IOException("input shutdown"));
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onReady() throws IOException {
                ByteBuffer buffer0;
                boolean fFlush = false;
                long cbReadable = this.m_cbReadable;
                long cbRequired = this.m_cbRequired;
                boolean fHeader = this.m_fHeader;
                AtomicLong cbEvents = SocketMessageBus.this.m_cbEventQueue;
                long cMsgIn = MessageConnection.this.m_cMsgIn;
                long cMsgInUser = MessageConnection.this.m_cMsgUserIn;
                long cMsgInSkip = MessageConnection.this.m_cMsgInSkip;
                SharedBuffer buffShared0 = this.m_bufferShared;
                long lCrcBody = this.m_lCrcBodyNext;
                int cbHeader = MessageConnection.this.getMessageHeaderSize();
                int nProt = MessageConnection.this.getProtocolVersion();
                CRC32 crc32 = MessageConnection.this.f_crcRx;
                if (buffShared0 == null) {
                    buffer0 = this.m_aBuffer[this.m_ofReadable];
                    buffShared0 = this.m_bufferShared = new SharedBuffer(buffer0, this);
                } else {
                    buffer0 = buffShared0.get();
                }
                try {
                    while (cbReadable >= Math.abs(cbRequired)) {
                        AtomicInteger event;
                        if (fHeader) {
                            fHeader = false;
                            cbReadable -= (long)cbHeader;
                            int of = this.m_ofReadable;
                            ByteBuffer[] aBuff = this.m_aBuffer;
                            long lCrcHeaderCalc = 0L;
                            if (nProt > 4) {
                                if (crc32 != null) {
                                    crc32.reset();
                                    lCrcHeaderCalc = Buffers.updateCrc(crc32, aBuff, of, cbHeader - 4);
                                    lCrcHeaderCalc = lCrcHeaderCalc == 0L ? 1L : lCrcHeaderCalc;
                                }
                                cbRequired = Buffers.getLong(aBuff, of);
                                lCrcBody = Buffers.getInt(aBuff, of);
                                long lCrcHeaderRx = Buffers.getInt(aBuff, of);
                                if (lCrcHeaderCalc == lCrcHeaderRx || lCrcHeaderRx == 0L || lCrcHeaderCalc == 0L) continue;
                                throw new IOException("incorrect CRC, corrupted header buffer; CRC read: " + lCrcHeaderRx + " CRC re-calculated: " + lCrcHeaderCalc);
                            }
                            cbRequired = Buffers.getInt(aBuff, of);
                            continue;
                        }
                        long cbMsg = Math.abs(cbRequired);
                        long lCrcBodyCalc = 0L;
                        if ((long)buffer0.remaining() >= cbMsg) {
                            if (crc32 != null && lCrcBody != 0L) {
                                crc32.reset();
                                lCrcBodyCalc = Buffers.updateCrc(crc32, buffer0, cbMsg);
                                long l = lCrcBodyCalc = lCrcBodyCalc == 0L ? 1L : lCrcBodyCalc;
                                if (lCrcBodyCalc != lCrcBody) {
                                    throw new IOException("incorrect CRC, corrupted message buffer; CRC read: " + lCrcBody + " CRC re-calculated: " + lCrcBodyCalc);
                                }
                            }
                            int nPos = buffer0.position();
                            event = new SingleBufferMessageEvent(MessageConnection.this, buffShared0.attach(), nPos, (int)cbMsg);
                            buffer0.position(nPos + (int)cbMsg);
                        } else {
                            if (crc32 != null && lCrcBody != 0L) {
                                crc32.reset();
                                lCrcBodyCalc = Buffers.updateCrc(crc32, this.m_aBuffer, this.m_ofReadable, cbMsg);
                                long l = lCrcBodyCalc = lCrcBodyCalc == 0L ? 1L : lCrcBodyCalc;
                                if (lCrcBodyCalc != lCrcBody) {
                                    throw new IOException("incorrect checksum, corrupted message buffer");
                                }
                            }
                            event = this.makeMultiBufferMessageEvent(cbMsg);
                            buffShared0 = this.m_bufferShared;
                            buffer0 = buffShared0.get();
                        }
                        cbEvents.addAndGet(cbMsg);
                        if (cMsgInSkip == 0L) {
                            fFlush = true;
                            if (cbRequired >= 0L) {
                                ++cMsgIn;
                                ++cMsgInUser;
                                SocketMessageBus.this.addEvent((Event)((Object)event));
                            } else if (this.onControlMessage((Event)((Object)event))) {
                                ++cMsgIn;
                            } else {
                                cMsgInSkip = MessageConnection.this.m_cMsgInSkip;
                            }
                        } else {
                            SocketMessageBus.this.getLogger().log(Level.FINER, "{0} skipping" + (cbRequired < 0L ? " control " : " ") + "message of {1} bytes from {2} after migration, {3} remain to be skipped on {4}", new Object[]{SocketMessageBus.this.getLocalEndPoint(), cbMsg, MessageConnection.this.getPeer(), --cMsgInSkip, MessageConnection.this});
                            if (cMsgInSkip == 0L) {
                                SocketMessageBus.this.getLogger().log(Level.FINE, "{0} resuming migrated connection with {1}", new Object[]{SocketMessageBus.this.getLocalEndPoint(), MessageConnection.this});
                            }
                            event.dispose();
                        }
                        fHeader = true;
                        cbReadable -= cbMsg;
                        cbRequired = cbHeader;
                    }
                }
                finally {
                    MessageConnection.this.m_cMsgUserIn = cMsgInUser;
                    MessageConnection.this.m_cMsgIn = cMsgIn;
                    MessageConnection.this.m_cMsgInSkip = cMsgInSkip;
                    this.m_cbReadable = cbReadable;
                    this.m_cbRequired = cbRequired;
                    this.m_fHeader = fHeader;
                    this.m_lCrcBodyNext = lCrcBody;
                    if (fFlush) {
                        if (cbEvents.get() > MessageConnection.this.getReadThrottleThreshold() * 2L / 3L) {
                            SocketMessageBus.this.issueLocalBacklog();
                        }
                        SocketMessageBus.this.flushEvents();
                    }
                }
            }

            protected MultiBufferMessageEvent makeMultiBufferMessageEvent(long cbMsg) {
                long cbEnd;
                ByteBuffer[] aBuffer = this.m_aBuffer;
                SharedBuffer buffShared = this.m_bufferShared;
                int ofReadable = this.m_ofReadable;
                int ofEnd = ofReadable;
                int cbBuf = aBuffer[ofEnd].remaining();
                for (cbEnd = cbMsg; cbEnd > (long)cbBuf; cbEnd -= (long)cbBuf) {
                    cbBuf = aBuffer[++ofEnd].remaining();
                }
                int cBufferMsg = ofEnd - ofReadable + 1;
                ByteBuffer buffer0 = aBuffer[ofReadable];
                ByteBuffer bufferN = aBuffer[ofEnd];
                int iLimitN = bufferN.limit();
                bufferN.limit(bufferN.position() + (int)cbEnd);
                ByteBuffer[] aBufferMsg = new ByteBuffer[cBufferMsg];
                System.arraycopy(aBuffer, ofReadable + 1, aBufferMsg, 1, cBufferMsg - 2);
                aBufferMsg[0] = buffer0;
                SharedBuffer buffShared0 = buffShared;
                this.m_bufferShared = buffShared = new SharedBuffer(bufferN, this);
                aBufferMsg[cBufferMsg - 1] = bufferN.slice();
                MultiBufferMessageEvent event = new MultiBufferMessageEvent(MessageConnection.this, SocketMessageBus.this.getSocketDriver().getDependencies().getBufferManager(), aBufferMsg, 0, cBufferMsg, cbMsg, buffShared0.attach(), buffShared.attach());
                buffShared0.dispose();
                bufferN.position(bufferN.limit()).limit(iLimitN);
                while (ofReadable < ofEnd) {
                    aBuffer[ofReadable] = null;
                    ++ofReadable;
                }
                this.m_ofReadable = ofReadable;
                return event;
            }

            public boolean onControlMessage(Event event) throws IOException {
                try {
                    if (event.getType() != Event.Type.MESSAGE) {
                        throw new IllegalStateException("unexpected control event: " + String.valueOf(event));
                    }
                    BufferSequenceInputStream in = new BufferSequenceInputStream((BufferSequence)event.getContent());
                    byte nType = in.readByte();
                    switch (nType) {
                        case 1: {
                            MessageConnection.this.processReceipt(in);
                            boolean bl = true;
                            return bl;
                        }
                        case 2: {
                            MessageConnection.this.processSync(in);
                            boolean bl = false;
                            return bl;
                        }
                    }
                    String sDump = HeapDump.dumpHeapForBug("28240730");
                    SocketMessageBus.this.getLogger().log(SocketMessageBus.this.makeRecord(Level.WARNING, "{0} received a corrupt message from {1}; collected {2} for analysis", SocketMessageBus.this.getLocalEndPoint(), MessageConnection.this.getPeer(), sDump));
                    throw new IllegalStateException("unexpected control message type: " + nType);
                }
                finally {
                    event.dispose();
                }
            }

            @Override
            public String toString() {
                return "ready=" + String.valueOf(new MemorySize(this.m_cbReadable)) + ", pending=" + String.valueOf(new MemorySize(Math.abs(this.m_cbRequired))) + ", free=" + String.valueOf(new MemorySize(this.m_cbWritable));
            }
        }
    }

    public static class TaskEvent
    implements Event {
        private final Event m_event;
        private final Runnable[] m_aTask;

        public TaskEvent(Event event, Runnable ... aTask) {
            this.m_event = event;
            this.m_aTask = aTask;
        }

        @Override
        public Event.Type getType() {
            return this.m_event.getType();
        }

        @Override
        public EndPoint getEndPoint() {
            return this.m_event.getEndPoint();
        }

        @Override
        public Object getContent() {
            return this.m_event.getContent();
        }

        @Override
        public Object dispose(boolean fTakeContent) {
            Object o = this.m_event.dispose(fTakeContent);
            for (Runnable task : this.m_aTask) {
                task.run();
            }
            return o;
        }

        @Override
        public void dispose() {
            this.dispose(false);
        }

        public String toString() {
            return this.m_event.toString();
        }
    }

    public static class Pair<K, V> {
        private K m_key;
        private V m_value;

        public Pair(K key, V value) {
            this.m_key = key;
            this.m_value = value;
        }

        public K getKey() {
            return this.m_key;
        }

        public V getValue() {
            return this.m_value;
        }
    }
}

