I'm writing a program to simulate peers on a P2P network using Java NIO non-blocking sockets. The idea is to have each peer use the same code for sending and receiving messages as well as a server to allow peers to bootstrap into the network.
The problem I'm having is although up to four peers can successfully join the network and talk to each other (Ping, Pong, Query and QueryHit), when I add a fifth peer the server always reports a "StreamCorruptedException". I've checked this website as well as Java NIO code/tutorial websites for solutions, but to no avail. I understand sending objects over non-blocking sockets is not easy/ideal (especially with ObjectOutputStream and ObjectInputStream), but I want to minimise the use of threads (also I don't want to re-write this from scratch!).
I'll show the most important methods first (message send and receive), but I can add more later if required.
Write method:
public void write(SelectionKey selKey){
SocketChannel channel = (SocketChannel)selKey.channel();
ArrayList<Message> queue = pending.get(channel);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
try{
oos = new ObjectOutputStream(baos);
}catch(Exception e){
System.err.println("Could not create object output stream. Aborting...");
return;
}
while(!queue.isEmpty()){
Message message = queue.get(0);
buffer.clear();
try{
oos.writeObject(message);
buffer = ByteBuffer.wrap(baos.toByteArray());
channel.write(buffer);
oos.flush();
baos.flush();
}catch(Exception e){
System.err.println("Could not parse object. Aborting...");
queue.remove(0);
return;
}
queue.remove(0);
}
selKey.interestOps(SelectionKey.OP_READ);
}
And the read method:
public Message read(SelectionKey selKey) throws IOException, ClassNotFoundException{
SocketChannel channel = (SocketChannel)selKey.channel();
Message message = null;
buffer = ByteBuffer.allocate(8192);
int bytesRead = channel.read(bu开发者_开发技巧ffer);
if(bytesRead > 0){
buffer.flip();
InputStream bais = new ByteArrayInputStream(buffer.array(), 0, buffer.limit());
ObjectInputStream ois = new ObjectInputStream(bais); //Offending line. Produces the StreamCorruptedException.
message = (Message)ois.readObject();
ois.close();
}
return message;
}
Any help will be greatly appreciated!
int bytesRead = channel.read(buffer);
if(bytesRead > 0){
buffer.flip();
InputStream bais = new ByteArrayInputStream(buffer.array(), 0, buffer.limit());
ObjectInputStream ois = new ObjectInputStream(bais); //Offending line. Produces the StreamCorruptedException.
message = (Message)ois.readObject();
ois.close();
}
Here you're mixing blocking and non-blocking I/O. The buffer will not have exhaustively read from the socket, it will only have read what is available. If you're going to take this approach you need to read all of the data into the buffer first.
You can't assume that your read will return the entire object in one chunk. You need an "out-of-band" (well, out of ObjectStream band) protocol to insure that you are receiving complete messages, assembling messages from read chunks as necessary. For non-blocking this will be a good deal more complicated than what you have now.
精彩评论