开发者

Why ThreadPoolExecutor has BlockingQueue as its argument?

开发者 https://www.devze.com 2023-04-07 22:22 出处:网络
I have tried creating and executing ThreadPoolExecutor with int poolSize = 2; int maxPoolSize = 3; ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

I have tried creating and executing ThreadPoolExecutor with

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

If i try 7th,8th... task continuously

  threadPool.execute(task); 开发者_运维百科 

after the queue reached maximum size

it is start throwing "RejectedExecutionException". Means i lost of adding those tasks.

Here then what is the role of BlockingQueue if it is missing the tasks? Means why it is not waiting?

From the definition of BlockingQueue

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

Why cant we go for linkedlist (normal queue implementation instead of blocking queue)?


The problem occurs because you're task queue is too small and this is indicated by the documentation of the execute method:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

So the first problem is that you're setting your queue size to a very small number:

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

And then you state "If [I] try 7th, 8th... task" then you would get a RejectedExecutionException because you're past the capacity of the queue. There are two ways to resolve your problem (I would recommend doing both):

  1. Increase the size of the queue.
  2. Catch the exception and re-try adding the task.

You should have something along the lines of this:

public void ExecuteTask(MyRunnableTask task) {
    bool taskAdded = false;
    while(!taskAdded) {
        try {
            executor.execute(task);
            taskAdded = true;
        } catch (RejectedExecutionException ex) {
            taskAdded = false;
        }
    }   
}

Now, to address your other questions...

Here then what is the role of BlockingQueue if it is missing the tasks?

The role of the BlockingQueue is to complete the Producer/Consumer pattern and if it's large enough, then you shouldn't see the issues you're encountering. As I mentioned above, you need to increase the queue size and catch the exception then retry executing the task.

Why cant we go for linkedlist?

A linked list is neither thread safe, nor is it blocking. The Producer/Consumer pattern tends to work best with a blocking queue.

Update

Please don't be offended by the following statements, I'm intentionally using more stringent language in order to put emphasis on the fact that your first assumption should never be that there is something wrong with the library you're using (unless you wrote the library yourself and you know that there is a specific problem in it)!

So let's put this concern to rest right now: neither the ThreadPoolExecutor nor the Java library are the problem here. It's entirely your (mis)use of the library that's causing the problem. Javmex has a great tutorial explaining the exact situation you're seeing.

There could be several reasons why you're filling up the queue faster than you're emptying it:

  1. The thread that's adding tasks for executing is adding them too fast.
  2. The tasks are taking too long to execute.
  3. Your queue is too small.
  4. Any combination of the above 3.

There are a bunch of other reasons too, but I think the above would be the most common.

I would give you a simple solution with an unbounded queue, but it would NOT resolve your (mis)use of the library. So before we go blaming the Java library, let's see a concise example that demonstrates the exact problem you're encountering.

Update 2.0

Here are a couple of other questions addressing the specific problem:

  1. ThreadPoolExecutor Block When Queue Is Full?
  2. How to make ThreadPoolExecutor's submit() method block if it is saturated?


The blocking queue is mainly for the Consumers (threads in the pool). The threads can wait for new tasks to become available on the queue, they will be automatically woken up. A plain linked list would not serve that purpose.

On the producer side the default behavior is to throw an exception in case the queue is full. This can be easily customized by implementing your own RejectedExceptionHandler. In your handler you can get hold of the queue and call the put method that will block till more space becomes available.

But this is not a good thing to do - the reason is that if there is a problem in this executor (deadlock , slow processing) it would cause a ripple effect on the rest of the system. For example if you are calling the execute method from a servlet - if the execute method blocks then all the containers threads will be held up and your application will come to a halt. That is probably the reason why the default behavior is to throw an exception and not to wait. Also there is no implementation of the RejectedExceptionHandler that does this - to discourage people from using it.

There is an option (CallersRunPolicy) to execute in the calling thread which can be another option if you want the processing to happen.

The general rule is - it is better to fail processing of one request, rather than bring the whole system down. You might want to read about the circuit-breaker pattern.


You're not using BlockingQueue in the way it's intended to be used.

A BlockingQueue is used to implement the producer consumer pattern. Producer thread(s) put items on the queue via the blocking put() method, while consumer thread(s) take items from the queue via the blocking take() method.

put() blocks - meaning if the queue is full, it waits until a consumer has taken an item from the queue before adding it, then returns.

take() blocks - meaning if the queue is empty, it waits until a producer has put an item on the queue before taking it and returning it.

This pattern completely disconnects the producers from consumers, except that they share the queue.

Try using the queue like that: Have an executor run some threads that act as producers and some that act as consumers.


The problem occurs because you're task queue is too small

IMHO this doesn't answers the OP question (although Kiril gives more detail after that) because the size of a queue being too small is totally subjective. For example he may be protecting an external resource that can't handle more than two concurrent requests (besides that I think 2 was for make a quick test). Also, what size we could say that is not to small? 1000? What if the execution tries to execute 5000 tasks? The scenario remains because the true question is why the procuder or caller thread is not being blocked if the ThreadPoolExecutor uses a LinkedBlockingQueue?

The role of the BlockingQueue is to complete the Producer/Consumer pattern and if it's large enough, then you shouldn't see the issues you're encountering. As I mentioned above, you need to increase the queue size and catch the exception then retry executing the task.

This is true if you know the maximum number of tasks that could be queued in runtime and you can allocate enough heap for that, but this isn't always the case. The benefit of using bounded queues is that you cant protect yourself against OutOfMemoryError.

The correct answer should be the one from @gkamal.

The blocking queue is mainly for the Consumers (threads in the pool). The threads can wait for new tasks to become available on the queue, they will be automatically woken up. A plain linked list would not serve that purpose.

My two cents:

Developers could have decided that consumer won't block neither and in that case a BlockingQueue will no longer needed (although you still need to use a concurrent collection or handle synchronization manually), but in this case when the worker tries to poll a task from a empty collection you must

  1. Kill the thread (an handle new task creating fresh threads), or
  2. Do a busy waiting wasting cpu

Since creating new threads is an expensive task (that is way we are using thread pools after all) and do a busy waiting to, the best option is to block that consumer and notify later when a task is available.

Actually, there is a special case where workers won't block (at least not indefinitely), when there are more threads than the configured corePoolSize.

 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.

We can see these two scenarios in ThreadPoolExecutor (java 1.8)

/**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // <---- wait at most keepAliveTime
                    workQueue.take(); // <---- if there are no tasks, it awaits on notEmpty.await();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


From your setting, the request maybe rejected if concurrent more than 5. Because the sequence order will be pool size -> queue -> max pool size.

  1. Inital two threads will be created
  2. Afterward if request more than 2 are coming, the subsequence request will be put into queue.
  3. If the queue are full(Which you are set 2), new thread will be created but will not exceed max pool size(Which you are set 3).
  4. If more request are coming, and all thread/worker are busy and queue are full then request will be rejected(Subject to your rejectPolicy config).

More detail can read from here: https://dzone.com/articles/scalable-java-thread-pool-executor


Your question is perfectly legitimate and in this case you should ignore "go read the docs" or other superior comments you received.

Here then what is the role of BlockingQueue if it is missing the tasks? Means why it is not waiting?

As the 2nd voted answer (not the accepted one) tells you, the use of a BlockingQueue is consistent with the need of consumers to block for work to be available (items in the queue). The blocking nature of the queue is seemingly not used on the enqueue side, and you are right, this is unintuitive and AFAIK undocumented.

A legitimate use-case for the Executor is to have a fixed number of threads (e.g. 1 or 2 or as many as the number of cores of your machine), and yet never drop incoming work items, AND don't accumulate them in the queue. This use-case would be solved with a fixed number of core threads, and a bounded blocking queue that blocks when full. Thereby the system putting backpressure onto upstream systems. This is a perfectly reasonable design decision.

0

精彩评论

暂无评论...
验证码 换一张
取 消