Usually you have a single bound tcp port and several connections on these. At least there are usually more connections as bound ports. My case is different: I want to bind a lot of ports and usually have no (or at least very few) connections.
So I want to use NIO to accept the incoming connections.
However, I need to pass the accepted connections to the existing jsch ssh library. That requires IO sockets instead of NIO sockets, it spawns one (or two) thread(s) per connection. But that's fine for me.
Now, I thought that the following lines would deliver the very same result:
Socket a = serverSocketChannel.accept().socket();
Socket b = serverSocketChannel.socket().accept();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking( true );
Socket c = channel.socket();
Socket d = serverSocket.accept();
However the getInputStream()
and getOut开发者_开发百科putStream()
functions of the returned sockets seem to work different. Only if the socket was accepted using the last call, jsch can work with it. In the first three cases, it fails (and I am sorry: I don't know why).
So is there a way to convert such a socket?
The input and output streams returned by sockets obtained from SocketChannels are internally synchronized on the channel at some points. So you can't use them for full-duplex protocols like SSH, because the system will lock up. Same applies to streams converted from channels via the Channels class (which is what the first case amounts to).
I had to do this because we use JSch and had 1000's of threads listening on ports. I'll leave it to you do handle accepting the connections, but this works with JSch from a thread launched to handle the connection:
* Handler that allows for input and output streams of an NIO socket to be
* read as blocking I/O
* <p>
* Reading and writing from the streams <b>must</b> be done on the same thread
* @author Eric
public class NioBlockingSocketStreamer
private static long NIO_SELECT_TIMEOUT = 0;
private static long NIO_WRITE_TIMEOUT_SECONDS = 30;
/** poison pill result that signals a disconnected channel */
private static int NIO_READ_IOEXCEPTION = -2;
private final SocketChannel socketChannel;
private Selector selector;
private final InputStream inputStream = new MyInputStream(this);
private final OutputStream outputStream = new MyOutputStream(this);
private final LinkedBlockingQueue<ByteBuffer> pendingWrites;
private final MyReadQueue<ReadRequest> pendingRead;
private volatile boolean disconnected = false;
public NioBlockingSocketStreamer(SocketChannel socketChannel,
int writeQueueSize)
pendingRead = new MyReadQueue<ReadRequest>( );
pendingWrites = new LinkedBlockingQueue<ByteBuffer>(writeQueueSize);
this.socketChannel = socketChannel;
public NioBlockingSocketStreamer start( )
throws IOException
this.selector = );
final NioBlockingSocketStreamer streamer = this;
Thread t = new Thread(new Runnable( )
public void run( )
streamer.nioServiceLoop( );
catch (ClosedChannelException e)
//shutting down
catch (ClosedSelectorException e)
//shutting down
catch (IOException e)
e.printStackTrace( );
t.start( );
return this;
public void nioServiceLoop( )
throws IOException
ByteBuffer writeBuffer = null;
ReadRequest readRequest = null;
for (;;)
if (readRequest == null)
readRequest = pendingRead.poll( );
int interestOps = readRequest != null
? SelectionKey.OP_READ
: 0;
if (writeBuffer == null && pendingWrites.isEmpty( ))
interestOps &= ~SelectionKey.OP_WRITE;
interestOps |= SelectionKey.OP_WRITE;
socketChannel.register(selector, interestOps);
int n =;
if (n > 0)
Set<SelectionKey> selectedKeys = selector.selectedKeys( );
for (SelectionKey selectedKey : selectedKeys)
if (selectedKey.isValid( ))
if (selectedKey.isReadable( ))
int cbRead;
cbRead =;
catch (IOException e)
//" An existing connection was forcibly closed by the remote host"
//is the typical disconnection
interestOps &= ~SelectionKey.OP_READ;
readRequest = null;
if (selectedKey.isWritable( ))
if (writeBuffer == null)
writeBuffer = pendingWrites.poll( );
if (writeBuffer != null)
catch (IOException e)
//" An existing connection was forcibly closed by the remote host"
//is the typical disconnection
if (!writeBuffer.hasRemaining( ))
writeBuffer = null;
selectedKeys.clear( );
disconnected = true;
//submit poison pill result for outstanding read request
if (readRequest != null)
//since the read request queue became empty before we disconnected, check for the next
//read request that may have queued in the meantime
readRequest = pendingRead.poll( );
if (readRequest != null)
private ReadRequest readRequest = new ReadRequest( );
public synchronized int read(ByteBuffer buffer)
throws IOException
if (disconnected)
throw new IOException("disconnected");
readRequest.buffer = buffer;
int result;
selector.wakeup( );
result = readRequest.getResult( );
catch (InterruptedException e)
throw new IOException(e);
throw new IOException("disconnected");
return result;
public void write(ByteBuffer buffer)
throws IOException
if (disconnected)
throw new IOException("disconnected");
boolean queued = false;
if (pendingWrites.remainingCapacity( ) > 0)
queued = pendingWrites.add(buffer);
catch (IllegalStateException e)
throw new IOException("write queue capacity exceeded");
queued = pendingWrites.offer(buffer, NIO_WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
catch (InterruptedException e)
throw new IOException(e);
if (!queued)
throw new IOException("unable to queue write");
if (selector != null) //selector should never be null
selector.wakeup( );
public InputStream getInputStream( )
return inputStream;
public OutputStream getOutputStream( )
return outputStream;
* Implements a simple blocking mechanism for getting the results of a read
* <p>
* <b>Note</b> This class must avoid autoboxing due to Retroweaver
static class ReadRequest
ByteBuffer buffer;
LinkedBlockingQueue<Integer> result = new LinkedBlockingQueue<Integer>(1);
void submitResult(int cb)
result.add(new Integer(cb));
int getResult( )
throws InterruptedException
return (int) result.take( );
static class MyInputStream
extends InputStream
private final NioBlockingSocketStreamer streamer;
MyInputStream(NioBlockingSocketStreamer streamer)
this.streamer = streamer;
public int read(byte[] b,
int off,
int len)
throws IOException
ByteBuffer buffer = ByteBuffer.allocate(len);
int n =;
if (n > 0)
System.arraycopy(buffer.array( ), 0, b, off, n);
return n;
public int read(byte[] b)
throws IOException
return read(b, 0, b.length);
public int read( )
throws IOException
return read(new byte[1], 0, 1);
public void close( )
throws IOException
// TODO Auto-generated method stub
static class MyOutputStream
extends OutputStream
private final NioBlockingSocketStreamer streamer;
MyOutputStream(NioBlockingSocketStreamer streamer)
this.streamer = streamer;
public void write(byte[] b,
int off,
int len)
throws IOException
ByteBuffer buffer = ByteBuffer.allocate(len);
buffer.put(b, off, len);
buffer.flip( );
public void write(byte[] b)
throws IOException
write(b, 0, b.length);
public void write(int b)
throws IOException
write(new byte[1], 0, 1);
public void close( )
throws IOException
// TODO Auto-generated method stub
public void flush( )
throws IOException
// TODO Auto-generated method stub
static class MyReadQueue<T>
//FIXME using LinkedBlockingQueue is overkill
private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<T>(1);
public T poll( )
return queue.poll( );
public void offer(T request)
throws InterruptedException
queue.offer(request, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
This sequence works in our production environment:
final SocketAddress serverAddr =
new InetSocketAddress(
final ServerSocketChannel serverChannel = );
serverChannel.socket( ).bind(
final SocketChannel socketChannel = serverChannel.accept( );
final Socket socket = socketChannel.socket( );
final OutputStream out = socket.getOutputStream( );
final InputStream in = socket.getInputStream( );
All channels will be in blocking
mode by default.
Beware, that I did not show any cleanup/exception handling code.
Also, there are some bugs related to NIO and socket streams