开发者

Queue with notifications on isEmpty() changes

开发者 https://www.devze.com 2023-02-07 10:50 出处:网络
I have an BlockingQueue<Runnable>(taken from ScheduledT开发者_StackOverflow社区hreadPoolExecutor) in producer-consumer environment. There is one thread adding tasks to the queue, and a thread po

I have an BlockingQueue<Runnable>(taken from ScheduledT开发者_StackOverflow社区hreadPoolExecutor) in producer-consumer environment. There is one thread adding tasks to the queue, and a thread pool executing them.

I need notifications on two events:

  1. First item added to empty queue
  2. Last item removed from queue

Notification = writing a message to database.

Is there any sensible way to implement that?


A simple and naïve approach would be to decorate your BlockingQueue with an implementation that simply checks the underlying queue and then posts a task to do the notification.

NotifyingQueue<T> extends ForwardingBlockingQueue<T> implements BlockingQueue<T> {
  private final Notifier notifier; // injected not null

  …

  @Override public void put(T element) {
    if (getDelegate().isEmpty()) {
      notifier.notEmptyAnymore();
    }
    super.put(element);
  }

  @Override public T poll() {
    final T result = super.poll();
    if ((result != null) && getDelegate().isEmpty())
      notifier.nowEmpty();
  }
  … etc
}

This approach though has a couple of problems. While the empty -> notEmpty is pretty straightforward – particularly for a single producer case, it would be easy for two consumers to run concurrently and both see the queue go from non-empty -> empty.

If though, all you want is to be notified that the queue became empty at some time, then this will be enough as long as your notifier is your state machine, tracking emptiness and non-emptiness and notifying when it changes from one to the other:

AtomicStateNotifier implements Notifier {
  private final AtomicBoolean empty = new AtomicBoolean(true); // assume it starts empty
  private final Notifier delegate; // injected not null

  public void notEmptyAnymore() {
    if (empty.get() && empty.compareAndSet(true, false))
      delegate.notEmptyAnymore();
  }

  public void nowEmpty() {
    if (!empty.get() && empty.compareAndSet(false, true))
      delegate.nowEmpty();
  }
}

This is now a thread-safe guard around an actual Notifier implementation that perhaps posts tasks to an Executor to asynchronously write the events to the database.


The design is most likely flawed but you can do it relatively simple: You have a single thread adding, so you can check before adding. i.e. pool.getQueue().isEmpty() - w/ one producer, this is safe.

Last item removed cannot be guaranteed but you can override beforeExecute and check the queue again. Possibly w/ a small timeout after isEmpty() returns true. Probably the code below will be better off executed in afterExecute instead.

protected void beforeExecute(Thread t, Runnable r) { 
  if (getQueue().isEmpty()){
    try{
      Runnable r = getQueue().poll(200, TimeUnit.MILLISECONDS);
      if (r!=null){ 
        execute(r);
      }  else{
        //last message - or on after execute by Setting a threadLocal and check it there
        //alternatively you may need to do so ONLY in after execute, depending on your needs
     }
    }catch(InterruptedException _ie){
      Thread.currentThread().interrupt();
    }
  }
}

sometime like that

I can explain why doing notifications w/ the queue itself won't work well: imagine you add a task to be executed by the pool, the task is scheduled immediately, the queue is empty again and you will need notification.

0

精彩评论

暂无评论...
验证码 换一张
取 消