开发者

Producer Consumer - Using Executors.newFixedThreadPool

开发者 https://www.devze.com 2023-03-27 18:12 出处:网络
My understanding of a Producer-Consumer pattern is that it could be implemented using a queue shared between the producer and the consumer. Producer submits work to a shared queue, consumer retrieves

My understanding of a Producer-Consumer pattern is that it could be implemented using a queue shared between the producer and the consumer. Producer submits work to a shared queue, consumer retrieves it and processes it. It could also be implemented by the producer directly submitting to the consumer (Producer thre开发者_StackOverflow社区ads submitting to Consumer's executor service directly). 

Now, I've been looking at the Executors class that provides some common implementations of thread pools. The method newFixedThreadPool, according to the spec, "reuses a fixed number of threads operating off a shared unbounded queue". Which queue are they talking about here? 

If the Producer directly submits a task to a consumer, is it the internal queue of the ExecutorService that contains the list of Runnables?

Or is it the intermediate queue, in case the producer submits to a shared queue? 

May be I'm missing the whole point, but would someone please clarify?


You are right, ExecutorService is not only a thread pool, but it is a full Producer-Consumer implementation. This internal queue is in fact a thread-safe queue of Runnables (FutureTask to be precise) holding tasks you submit().

All the threads in the pool are blocked on that queue, waiting for tasks to be executed. When you submit() a task, exactly one thread will pick it up and run it. Of course submit() is not waiting for thread in the pool to finish processing.

On the other hand if you submit a huge number of tasks (or long-running ones) you might end-up with all threads in the pool being occupied and some tasks waiting in the queue. Once any thread is done with its task, it will immediately pick the first one from the queue.


public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  


Check this out:
Producer-Consumer example in Java (RabbitMQ) (It's written for another library but it's in Java and it demonstrates clearly the concept ;)
Hope it helps!

P.S.:Actually, it has several examples but you get the idea ;)

0

精彩评论

暂无评论...
验证码 换一张
取 消