I just discovered that just has an NIO facility, Java NIO Pipe t开发者_如何学Gohat's designed for passing data between threads. Is there any advantage of using this mechanism over the more conventional message passing over a queue, such as an ArrayBlockingQueue?
Usually the simplest way to pass data for another thread to process is to use an ExecutorService. This wraps up both a queue and a thread pool (can have one thread)
You can use a Pipe when you have a library which supports NIO channels. It is also useful if you want to pass ByteBuffers of data between threads.
Otherwise its usually simple/faster to use a ArrayBlockingQueue.
If you want a faster way to exchange data between threads I suggest you look at the Exchanger however it is not as general purpose as an ArrayBlockingQueue.
The Exchanger and GC-less Java
I believe a NIO Pipe was designed so that you can send data to a channel inside the selector loop in a thread safe way, in other words, any thread can write to the pipe and the data will be handled in the other extreme of the pipe, inside the selector loop. When you write to a pipe you make the channel in the other side readable.
So after having a lot of trouble with pipe (check here) I decided to favor non-blocking concurrent queues over NIO pipes. So I did some benchmarks on Java's ConcurrentLinkedQueue. See below:
public static void main(String[] args) throws Exception {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// first test nothing:
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
for (int i = 0; i < 1000000; i++) {
bench.mark();
// s = queue.poll();
bench.measure();
}
System.out.println(bench.results());
Thread.sleep(100);
}
System.out.println();
// first test empty queue:
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
for (int i = 0; i < 1000000; i++) {
bench.mark();
s = queue.poll();
bench.measure();
}
System.out.println(bench.results());
Thread.sleep(100);
}
System.out.println();
// now test polling one element on a queue with size one
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
String x = "pela";
for (int i = 0; i < 1000000; i++) {
queue.offer(x);
bench.mark();
s = queue.poll();
bench.measure();
if (s != x) throw new Exception("bad!");
}
System.out.println(bench.results());
Thread.sleep(100);
}
System.out.println();
// now test polling one element on a queue with size two
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
String x = "pela";
for (int i = 0; i < 1000000; i++) {
queue.offer(x);
queue.offer(x);
bench.mark();
s = queue.poll();
bench.measure();
if (s != x) throw new Exception("bad!");
queue.poll();
}
System.out.println(bench.results());
Thread.sleep(100);
}
}
The results:
totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos)
Conclusion:
The maxTime can be scary but I think it is safe to conclude we are in the 50 nanos range for polling a concurrent queue.
I suppose the pipe will have better latency as it could very likely be implemented with coroutines behind the scenes. Thus, the producer immediately yields to the consumer when data is available, not when the thread scheduler decides.
Pipes in general represent a consumer-producer problem and are very likely to be implemented this way so that both threads cooperate and are not preempted externally.
精彩评论