开发者

What Java concurrency classes might help me?

开发者 https://www.devze.com 2022-12-23 18:15 出处:网络
I\'m trying to write some code that has the following behavior: There are many concurrent and random calls to X by various threads (Thread X)

I'm trying to write some code that has the following behavior:

  • There are many concurrent and random calls to X by various threads (Thread X)
  • At some point in the future, one call is made to Y by one thread (Thread Y)
  • Until Y is called, X should be allowed to pass through unchallenged, with concurrent calls to X perfectly valid
  • Once Y is called, any existing calls to X should be allowed to finish up, but new calls to X should be rejected in some way (RuntimeException, etc)
  • Y should not proceed to execute until all existing calls to X have been completed
  • Update: When Y is called, it should send a signal to the objects that are running in Thread X that tells them to abort() in some graceful way (ideally finishing up very quickly so that Y can proceed)

I looked at using Semaphore, CountDownLatch, and even writing up my own AbstractQueuedSynchronizer, but none fit the above requirements. For example, CountDownLatch assumes you know how many calls to X are going to be made, which we don't know.

It seems like I almost want a mixture of a CountUpDownLatch and maybe some sort of simple AtomicBoolean, but that's what I'm currently using and I'm finding myself getting deadlocked from time to time. Then again, I'm using a sketchy looking implementation of a CountUpDownLatch that is part of HSQLDB that seriously doesn't look thread safe.

Any ideas on how I sh开发者_如何学编程ould approach this problem?


Looks like a task for ReentrantReadWriteLock. Something like this:

class A {
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    public void x() {
        if (!lock.readLock().tryLock()) throw new RuntimeException();
        try {
            ...
        } finally {
            lock.readLock().unlock();
        }
    }

    public void y() {
        lock.writeLock().lock();
        try {
            ...
        } finally {
            lock.writeLock().unlock();
        }
    }
}


Java 7 is comming out with a new concurrency construct class, java.util.concurrent.Phaser. The phaser is a glorified cylcic barrier that allows awaiting on specific phases (or iterations) of a concurrent execution. You can download the latest binary jar at JSR 166's Interest Site.

My thought here is you have a volatile boolean yHasEnetered flag that initializes default to false. So in your X execution you could do :

        if(yHasEnetered)
            throw new IllegalStateException();
        phaser.register();
        //do work here
        phasre.arrive();

The phaser itself will keep a running count of all registered parties so that when Y thread has entered, it can set the flag accordingly, register itself and await advance.

        yHasEntered=true;
        int phase = phaser.register();
        phaser.arriveAndAwaitAdvance(phase);
        //do y work here
        yHasEntered=false;

At this point for Y thread. It will register itself get the phase that the phaser is currently on, arrive and wait for all other threads executing to reach their respective arrive block.


Consider using an implementation of ExecutorService. Add tasks using the submit() methods, then tell the service not to accept more tasks with the shutdown() method -- tasks that have already been accepted will complete normally.


Ok, how about using an ActiveObject?

class X {
    private static final ExecutorService service = Executors.newCachedThreadPool();


    public void x(){
       //Using anonymous class for brevity. Normal Runnable is ok. May be static inner class.  
       service.submit(new Runnable(){
                             @Override
                             public void run(){
                                   //do stuff
                             }
                       });
    }

    //Derived from Java6 ExecutorService API        
    public void shutDownAndAwait(){
        service.shutdownNow();
        try{
           if (!service.awaitTermination(60, TimeUnit.SECONDS))
              throw new TerminationException("Service did not terminate");
           }
        } catch (InterruptedException ie) {
           // (Re-)Cancel if current thread also interrupted
           service.shutdownNow();
           // Preserve interrupt status
           Thread.currentThread().interrupt();
        }
   }//shutdownAndAwait

}//X


class Y {

    private final X x = //inject reference

    public void y(){
        x.shutdownAndAwait();
    }
}

The problem is that now execution of x() is asynchronous, so the original thread that called the x() are not sure when it was completed. And they are not interrupted when y() is called...

0

精彩评论

暂无评论...
验证码 换一张
取 消