开发者

Smart way of processing list with threads

开发者 https://www.devze.com 2023-02-18 23:59 出处:网络
I have a list of read only data which are a bunch of end points on the internet that I want to process. I was wondering 开发者_StackOverflowis there any kind of built in class or pattern that I should

I have a list of read only data which are a bunch of end points on the internet that I want to process. I was wondering 开发者_StackOverflowis there any kind of built in class or pattern that I should follow to process this ?

Right now I'm just diving the initial list into N blocks and creating N threads to process each request.


Use an ExecutorService to handle your concurrent processing.

public void processAll(List<Endpoint> endpoints, int numThreads) {
    ExecutorService executor = Executors.newFixedThreadPool(numThreads);

    for(final Endpoint endpoint : endpoints) {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                doProcessing(endpoint);
            }
        });
    }
    // instead of a loop, you could also use ExecutorService#invokeAll()
    // with the passed-in list if Endpoint implements
    // java.util.concurrent.Callable

    executor.shutdown();
}

private void doProcessing(Endpoint endpoint) {
    // do whatever you do with each one
}

This is just a bare-bones example. Have a look at the API for some examples about how to use a more specific type of ExecutorService, handle Futures, and do all kinds of nifty stuff.


Any reason why you can't use the appropriate concurrent container? http://download.oracle.com/javase/6/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html


Sounds like a Queue (use one of the implementations in java.util.concurrent) is what you need. That way each thread can pick up a link when it's ready, which makes more sense than partitioning in advance.


You will need three thinks:

  • two blocking list - first with data to porcess, second for results
  • Executor service
  • some kind of latch

My example application:

public class App {

    private static final int NUMBER_OF_THREADS = 3;

    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> data = prepareData();

        BlockingQueue<String> results = new LinkedBlockingQueue<String>();

        ExecutorService executor = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);

        for (int i = 0; i < NUMBER_OF_THREADS; i++)
            executor.execute(new Processor<String>(data, results,
                    countDownLatch, i + ""));

        countDownLatch.await();
    }

    private static BlockingQueue<String> prepareData() {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        for (int i = 0; i < 1000; i++) {
            queue.add(i + "");
        }
        return queue;
    }

}

class Processor<T> implements Runnable {

    private BlockingQueue<T> dataSource;

    private CountDownLatch latch;

    private String name;

    private BlockingQueue<String> results;

    public Processor(BlockingQueue<T> dataSource,
            BlockingQueue<String> results, CountDownLatch countDownLatch,
            String processName) {
        this.dataSource = dataSource;
        this.results = results;
        this.latch = countDownLatch;
        this.name = processName;
    }

    @Override
    public void run() {
        T t = null;
        while ((t = dataSource.poll()) != null) {
            try {
                String result = "Process " + name + " processing: "
                        + t.toString();
                System.out.println(result);
                results.put(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        latch.countDown();
    }
}

After prepare data create some Processors to process data. Each processor have reference to thread save data source. Get Object, process them and finally put result to another thread save collection that contains results.

When data source go empty then call latch.countDown() to say "everything done" to main thread or thread that wait for results.


Take a look at the java.util.concurrent package and the ExecutorService.

Brian Goetz's book Java Concurrency in Practice is a must to understand this stuff.


Blocking Queues are probably the best way to go for you. Google it and you'll find plenty of info, this for one is a good tutorial:http://www.developer.com/java/ent/article.php/3645111/Java-5s-BlockingQueue.htm

0

精彩评论

暂无评论...
验证码 换一张
取 消