开发者

ConcurrentLinkedQueue$Node remains in heap after remove()

开发者 https://www.devze.com 2022-12-25 07:11 出处:网络
I have a multithreaded app writing and reading a ConcurrentLinkedQueue, which is conceptually used to back entries in a list/table.I originally used a ConcurrentHashMap for this, which worked well.A n

I have a multithreaded app writing and reading a ConcurrentLinkedQueue, which is conceptually used to back entries in a list/table. I originally used a ConcurrentHashMap for this, which worked well. A new requirement required tracking the order entries came in, so they could be removed in oldest first order, depending on some conditions. ConcurrentLinkedQueue appeared to be a good choice, and functionally it works well.

A configurable amount of entries are held in memory, and when a new entry is offered when the limit is reached, the queue is searched in oldest-first order for one that can be removed. Certain entries are not to be removed by the system and wait for client interaction.

What appears to be happening is I have an entry at the front of the queue that occurred, say 100K entries ago. The queue appears to have the limited number of configured entries (size() == 100), but when profiling, I found that there were ~100K ConcurrentLinkedQueue$Node objects in memory. This appears to be by design, just glancing at the source for ConcurrentLinkedQueue, a remove merely removes the reference to the object being stored bu开发者_如何转开发t leaves the linked list in place for iteration.

Finally my question: Is there a "better" lazy way to handle a collection of this nature? I love the speed of the ConcurrentLinkedQueue, I just cant afford the unbounded leak that appears to be possible in this case. If not, it seems like I'd have to create a second structure to track order and may have the same issues, plus a synchronization concern.


What actually is happening here is the remove method prepares a polling thread to null out the linked reference.

The ConcurrentLinkedQueue is a non blocking thread safe Queue implementation. However when you try to poll a Node from the Queue it is a two function process. First you null the value then you null the reference. A CAS operation is a single atomic function that would not offer immidiate resolution for a poll.

What happens when you poll is that the first thread that succeeds will get the value of the node and null that value out, that thread will then try to null the reference. It is possible another thread will then come in and try to poll from the queue. To ensure this Queue holds a non blocking property (that is failure of one thread will not lead to the failure of another thread) that new incomming thread will see if the value is null, if it is null that thread will null the reference and try again to poll().

So what you see happening here is the remove thread is simply preparing any new polling thread to null the reference. Trying to achieve a non blocking remove function I would think is nearly impossible because that would require three atomic functions. The null of the value the null referencing to said node, and finally the new reference from that nodes parent to its successor.

To answer your last question. There is unforutnalty no better way to implement remove and maintain the non blocking state of the queue. That is at least at this point. Once processors start comming out with 2 and 3 way casing then that is possible.


The queue's main semantics is add/poll. If you use poll() on the ConcurrentLinkedQueue, it will be cleaned as it should. Based on your description, poll() should give you removing oldest entry. Why not to use it instead of remove()?


Looking at the source code for 1.6.0_29, it seems that CLQ's iterator was modified to try removing nodes with null items. Instead of:

p = p.getNext();

The code is now:

Node<E> next = succ(p);
if (pred != null && next != null)
    pred.casNext(p, next);
p = next; 

This was added as part of the fix for bug: http://bugs.sun.com/view_bug.do?bug_id=6785442

Indeed when I try the following I get an OOME with the old version but not with the new one:

Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i=0; i<10000; i++)
{
    for (int j=0; j<100000; j++)
    {
        queue.add(j);
    }
    boolean start = true;
    for (Iterator<Integer> iter = queue.iterator(); iter.hasNext(); )
    {
        iter.next();
        if (!start)
            iter.remove();
        start = false;
    }
    System.out.println(i);
}
0

精彩评论

暂无评论...
验证码 换一张
取 消