开发者

Trying to packetize TCP with non-blocking IO is hard! Am I doing something wrong?

开发者 https://www.devze.com 2022-12-26 05:03 出处:网络
Oh how I wish TCP was packet-based like UDP is! [see comments] But alas, that\'s not the case, so I\'m trying to implement my own packet layer. Here\'s the chain of events so far (ignoring writing pac

Oh how I wish TCP was packet-based like UDP is! [see comments] But alas, that's not the case, so I'm trying to implement my own packet layer. Here's the chain of events so far (ignoring writing packets)

Oh, and my Packets are very simply structured: two unsigned bytes for length, and then byte[length] data. (I can't imagine if they were any more complex, I'd be up to my ears in if statements!)

  • Server is in an infinite loop, accepting connections and adding them to a list of Connections.
  • PacketGatherer (another thread) uses a Selector to figure out which Connection.SocketChannels are ready for reading.
  • It loops over the results and tells each Connection to read().
  • Each Connection has a partial IncomingPacket and a list of Packets which have been fully read and are waiting to be processed.
  • On read():
    • Tell the partial IncomingPacket to read more data. (IncomingPacket.readData below)
    • If it's done reading (IncomingPacket.complete()), make a Packet from it and stick the Packet into the list waiting to be processed and then replace it with a new IncomingPacket.

There are a couple problems with this. First, only one packet is being read at a time. If the IncomingPacket needs only one more byte, then only one byte is read this pass. This can of course be fixed with a loop but it starts to get sorta complicated and I wonder if there is a better overall way.

Second, the logic in IncomingPacket is a little bit crazy, to be able to read the two bytes for the length and then read the actual data. Here is the code, boiled down for quick & easy reading:

int readBytes;         // number of total bytes read so far
byte length1, length2; // each byte in an unsigned short int (see getLength())

public int getLength() { // will be inaccurate if readBytes < 2
    return (int)(length1 << 8 | length2);
}

public void readData(SocketChannel c) {
    if (readBytes < 2) { // we don't yet know the length of the actual data
        ByteBuffer lengthBuffer = ByteBuffer.allocate(2 - readBytes);
        numBytesRead = c.read(lengthBuffer);

        if(readBytes == 0) {
            if(numBytesRead >= 1)
                length1 = lengthBuffer.get();

            if(numBytesRead == 2)
                length2 = lengthBuffer.get();
        } else if(readBytes == 1) {
            if(numBytesRead == 1)
                length2 = lengthBuffer.get();
        }
        readBytes += numBytesRead;
    }

    if(readBytes >= 2) { // then we know we have the entire length variable
        // lazily-instantiate data buffers based on getLength()
        // read into data buffers, increment readBytes

        // (does not read more than the amount of this packet, so it does not
        // need to handle overflow into the next packet's data)
    }
}

public boolean complete() {
    return (readBytes > 2 && readBytes == getLength()+2);
}

Basically I need feedback on my code and overall process. Please suggest any improvements. Even overhauling my entire system would be okay, if you have suggestions for how better to implement the whole thing. Book recommendations are welcome too; I love books. I just get the feeling that something isn't quite right.


Here's the general solution I came up with thanks to Juliano's answer: (feel free to comment if you have any questions)

public void fillWriteBuffer() {
    while(!writePackets.isEmpty() && writeBuf.remaining() >= writePackets.peek().size()) {
        Packet p = writePackets.poll();
        assert p != null;
        p.writeTo(writeBuf);
    }
}

public void fillReadPackets() {
    do {
        if(readBuf.position() < 1+2) {
            // haven't yet received the length
            break;
        }

        short packetLength = readBuf.getShort(1);

        if(readBuf.limit() >= 1+2 + packetLength) {
            // we have a complete packet!

            readBuf.flip();

            byte packetType = readBuf.get();

            packetLength = readBuf.getShort();

            byte[] packetData = new byte[packetLength];
            readBuf.get(packetData);

            Packet p = new Packet(packetType, packetData);
            readPackets.add(p);
            readBuf.compact();
        } else {
            // not a complete packet
            break;
    开发者_如何学Python    }

    } while(true);
}


Probably this is not the answer you are looking for, but someone would have to say it: You are probably overengineering the solution for a very simple problem.

You do not have packets before they arrive completely, not even IncomingPackets. You have just a stream of bytes without defined meaning. The usual, the simple solution is to keep the incoming data in a buffer (it can be a simple byte[] array, but a proper elastic and circular buffer is recommended if performance is an issue). After each read, you check the contents of the buffer to see if you can extract an entire packet from there. If you can, you construct your Packet, discard the correct number of bytes from the beginning of the buffer and repeat. If or when you cannot extract an entire packet, you keep those incoming bytes there until the next time you read something from the socket successfully.

While you are at it, if you are doing datagram-based communication over a stream channel, I would recommend you to include a magic number at the beginning of each "packet" so that you can test that both ends of the connection are still synchronized. They may get out of sync if for some reason (a bug) one of them reads or writes the wrong number of bytes to/from the stream.


Can't you just read whatever number of bytes that are ready to be read, and feed all incoming bytes into a packet parsing state machine? That would mean treating the incoming (TCP) data stream like any other incoming data stream (via serial line, or USB, a pipe, or whatever...)

So you would have some Selector determining from which connection(s) there are incoming bytes to be read, and how many. Then you would (for each connection) read the available bytes, and then feed those bytes into a (connection specific) state machine instance (the reading and feeding could be done from the same class, though). This packet parsing state machine class would then spit out finished packets from time to time, and hand those over to whoever will handle those complete and parsed packets.

For an example packet format like

    2 magic header bytes to mark the start
    2 bytes of payload size (n)
    n bytes of payload data
    2 bytes of checksum

the state machine would have states like (try an enum, Java has those now, I gather)

wait_for_magic_byte_0,
wait_for_magic_byte_1,
wait_for_length_byte_0,
wait_for_length_byte_1,
wait_for_payload_byte (with a payload_offset variable counting),
wait_for_chksum_byte_0,
wait_for_chksum_byte_1

and on each incoming byte you can switch the state accordingly. If the incoming byte does not properly advance the state machine, discard the byte by resetting the state machine to wait_for_magic_byte_0.


Ignoring client disconnects and server shutdown for now, here's more or less traditional structure of a socket server:

  • Selector, handles sockets:
    • polls open sockets
    • if it's the server socket, create new Connection object
    • for each active client socket find the Connection, call it with event (read or write)
  • Connection (one per socket), handles I/O on one socket:
    • Communicates to Protocol via two queues, input and output
    • keeps two buffers, one for reading, one for writing, and respective offsets
    • on read event: read all available input bytes, look for message boundaries, put whole messages onto Protocol input queue, call Protocol
    • on write event: write the buffer, or if it's empty, take message form output queue into buffer, start writing it
  • Protocol (one per connection), handles application protocol exchange on one connection:
    • take message from input queue, parse application portion of the message
    • do the server work (here's where the state machine is - some messages are appropriate in one state, while not in the other), generate response message, put it onto output queue

That's it. Everything could be in a single thread. The key here is separation of responsibilities. Hope this helps.


I think you're approaching the issue from a slightly wrong direction. Instead of thinking of packets, think of a data structure. That's what you're sending. Effectively, yes, it's an application layer packet, but just think of it as a data object. Then, at the lowest level, write a routine which will read off the wire, and output data objects. That will give you the abstraction layer I think you're looking for.

0

精彩评论

暂无评论...
验证码 换一张
取 消