开发者

Bounded, auto-discarding, non-blocking, concurrent collection

开发者 https://www.devze.com 2023-01-21 06:08 出处:网络
I\'m looking for a collection that: is a Deque/List - i.e. supports inserting elements at \"the top\" (newest items go to the top) - deque.addFirst(..) / list.add(0, ..). It could be a Queue, but th

I'm looking for a collection that:

  • is a Deque/List - i.e. supports inserting elements at "the top" (newest items go to the top) - deque.addFirst(..) / list.add(0, ..). It could be a Queue, but the iteration order should be reverse - i.e. the most recently added items should come first.
  • is bounded - i.e. has a limit of 20 items
  • auto-discards the oldest items (those "at the bottom", added first) when the capacity is reached
  • non-blocking - if the deque is empty, retrievals should not block. It should also not block / return false / null / throw exception is the deque is full.
  • concurrent - multiple threads should be able to operate on it

I can take LinkedBlockingDeque and wrap it into my custom collection that, on add operations checks the size and discards the last item(s). I开发者_开发技巧s there a better option?


I made this simple imeplementation:

public class AutoDiscardingDeque<E> extends LinkedBlockingDeque<E> {

    public AutoDiscardingDeque() {
        super();
    }

    public AutoDiscardingDeque(int capacity) {
        super(capacity);
    }

    @Override
    public synchronized boolean offerFirst(E e) {
        if (remainingCapacity() == 0) {
            removeLast();
        }
        super.offerFirst(e);
        return true;
    }
}

For my needs this suffices, but it should be well-documented methods different than addFirst / offerFirst are still following the semantics of a blocking deque.


I believe what you're looking for is a bounded stack. There isn't a core library class that does this, so I think the best way of doing this is to take a non-synchronized stack (LinkedList) and wrap it in a synchronized collection that does the auto-discard and returning null on empty pop. Something like this:

import java.util.Iterator;
import java.util.LinkedList;

public class BoundedStack<T> implements Iterable<T> {
    private final LinkedList<T> ll = new LinkedList<T>();
    private final int bound;

    public BoundedStack(int bound) {
        this.bound = bound;
    }

    public synchronized void push(T item) {
        ll.push(item);
        if (ll.size() > bound) {
            ll.removeLast();                
        }
    }

    public synchronized T pop() {
        return ll.poll();
    }

    public synchronized Iterator<T> iterator() {
        return ll.iterator();
    }
}

...adding methods like isEmpty as required, if you want it to implement eg List.


The simplest and classic solution is a bounded ring buffer that overrides the oldest elements.

The implementation is rather easy. You need one AtomicInteger/Long for index + AtomicReferenceArray and you have a lock free general purpose stack with 2 methods only offer/poll, no size(). Most concurrent/lock-free structures have hardships w/ size(). Non-overriding stack can have O(1) but w/ an allocation on put.

Something along the lines of:

package bestsss.util;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class ConcurrentArrayStack<E> extends AtomicReferenceArray<E>{
    //easy to extend and avoid indirections, 
    //feel free to contain the ConcurrentArrayStack if feel purist


    final AtomicLong index = new AtomicLong(-1);

    public ConcurrentArrayStack(int length) {
        super(length);  //returns           
    }

    /**
     * @param e the element to offer into the stack
     * @return the previously evicted element
     */
    public E offer(E e){
        for (;;){
            long i = index.get();
            //get the result, CAS expect before the claim
            int idx = idx(i+1);
            E result = get(idx);

            if (!index.compareAndSet(i, i+1))//claim index spot
                continue;

            if (compareAndSet(idx, result, e)){
                return result;
            }
        }
    }

    private int idx(long idx){//can/should use golden ratio to spread the index around and reduce false sharing
        return (int)(idx%length());
    }

    public E poll(){
        for (;;){
            long i = index.get();
            if (i==-1)
                return null;

            int idx = idx(i);
            E result = get(idx);//get before the claim

            if (!index.compareAndSet(i, i-1))//claim index spot
                continue;

            if (compareAndSet(idx, result, null)){
                return result;
            }
        }
    }
}

Last note: having mod operation is an expensive one and power-of-2 capacity is to preferred, via &length()-1 (also guards vs long overflow).


Here is an implementation that handles concurrency and never returns Null.

import com.google.common.base.Optional;

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class BoundedStack<T> {
   private final Deque<T> list = new ConcurrentLinkedDeque<>();
   private final int maxEntries;
   private final ReentrantLock lock = new ReentrantLock();

   public BoundedStack(final int maxEntries) {
      checkArgument(maxEntries > 0, "maxEntries must be greater than zero");
      this.maxEntries = maxEntries;
   }

   public void push(final T item) {
      checkNotNull(item, "item must not be null");

      lock.lock();
      try {
         list.push(item);
         if (list.size() > maxEntries) { 
            list.removeLast();
         }
      } finally {
         lock.unlock();
      }
   } 

   public Optional<T> pop() {
      lock.lock();
      try {
         return Optional.ofNullable(list.poll());
      } finally {
         lock.unlock();
      }
   }

   public Optional<T> peek() {
      return Optional.fromNullable(list.peekFirst());
   }

   public boolean empty() {
      return list.isEmpty();
   }
}


For the solution @remery gave, could you not run into a race condition where after if (list.size() > maxEntries) you could erroneously remove the last element if another thread runs pop() in that time period and the list is now within capacity. Given there is no thread synchronization across pop() and public void push(final T item).

For the solution @Bozho gave I would think a similar scenario could be possible? The synchronization is happening on the AutoDiscardingDeque and not with the ReentrantLock inside LinkedBlockingDeque so after running remainingCapacity() another thread could remove some objects from the list and the removeLast() would remove an extra object?

0

精彩评论

暂无评论...
验证码 换一张
取 消