开发者

Why can't I construct a ThreadPoolExecutor backed by a DelayQueue?

开发者 https://www.devze.com 2023-02-15 11:03 出处:网络
I\'m trying to create a ThreadPoolExecutor: // Thingy implements Delayed and Runnable ExecutorService executor = new ThreadPoolExe开发者_StackOverflow中文版cutor(1, 1, 0l, TimeUnit.SECONDS, new Delay

I'm trying to create a ThreadPoolExecutor:

// Thingy implements Delayed and Runnable
ExecutorService executor = new ThreadPoolExe开发者_StackOverflow中文版cutor(1, 1, 0l, TimeUnit.SECONDS, new DelayQueue<Thingy>());

The compiler is saying "cannot find symbol":

symbol  : constructor ThreadPoolExecutor(int,int,long,java.util.concurrent.TimeUnit,java.util.concurrent.DelayQueue<Thingy>)

but I don't understand — DelayQueue implements BlockingQueue, so shouldn't I be able to use this constructor?


This is a generics problem. You can't use DelayQueue<Thingy>, it has to be DelayQueue<Runnable> as the ThreadPoolExecutor constructor is not declared to accept queues of sub-types of Runnable.


RunnableScheduledFuture is Runnable and Delayed, but it cannot be cast to BlockingQueue<Runnable>. See why at The Java Tutorials

Have a look to ScheduledThreadPoolExecutor, it can schedule commands to run after a given delay, or to execute periodically.


While this is an old question I wanted to post my answer as I was searching for just this solution recently. It is possible to use a DelayQueue behind a ThreadPoolExecutor, it just takes a bit of code wrapping. The trick is to get the DelayQueue to present itself as a BlockingQueue.

I started by defining an interface DR that extends both Runnable and Delayed. Note that the static methods here create instances of the DR (Instance class not shown).

public interface DR extends Delayed, Runnable {    
    public static DR make( Runnable r )
    {
        if (r instanceof DR)
        {
            return (DR)r;
        }
        Impl impl = new Impl(r);
        if (r instanceof Delayed)
        {
            impl.expires = ((Delayed) r).getDelay( TimeUnit.MILLISECONDS );
        }
        return impl;
    }

    public static DR make( Runnable r, long expires )
    {
        if (r instanceof DR)
        {
            if (expires == ((DR)r).getDelay( TimeUnit.MILLISECONDS ))
            {
                return (DR)r;
            }
        }
        return new Impl(r, expires);
    }
}

Implementations will should override: public int compareTo(Delayed o), public boolean equals( Object o ), and public int hashCode().

Create a class that extends DelayQueue. This class adds a single method that presents the DelayQuue as a BlockingQueue. The class returned simply wraps the DelayQueue and uses the make methods of the DR interface to convert from Runnable to DR where necessary.

public class DelayedBlockingQueue extends DelayQueue<DR>  {

    public BlockingQueue<Runnable> asRunnableQueue() {
        return new BlockingQueue<Runnable>(){
            DelayedBlockingQueue dbq = DelayedBlockingQueue.this;
            public boolean add(Runnable e) {
                return dbq.add( DR.make( e ));
            }

            private List<DR> makeList( Collection<? extends Runnable> coll)
            {
               return coll.stream().map( r -> DR.make( r ) ).collect( Collectors.toList() ) ;
            }

            public boolean addAll(Collection<? extends Runnable> arg0) {
                return dbq.addAll(makeList( arg0 ) );
            }

            public void clear() {
                dbq.clear();
            }

            public boolean contains(Object o) {
                if (o instanceof Runnable) { 
                    return dbq.contains( DR.make( (Runnable)o ) );
                } 
                return false;
            }

            public boolean containsAll(Collection<?> arg0) {
                List<DR> lst = new ArrayList<DR>();
                for (Object o : arg0)
                {
                    if (o instanceof Runnable)
                    {
                        lst.add(  DR.make(  (Runnable)o ) );
                    }
                    else {
                        return false;
                    }
                }

                return dbq.containsAll( lst );
            }

            public int drainTo(Collection<? super Runnable> c, int maxElements) {
                return dbq.drainTo( c, maxElements );
            }

            public int drainTo(Collection<? super Runnable> c) {
                return dbq.drainTo( c );
            }

            public Runnable element() {
                return dbq.element();
            }                

            public void forEach(Consumer<? super Runnable> arg0) {
                dbq.forEach( arg0 );
            }

            public boolean isEmpty() {
                return dbq.isEmpty();
            }

            public Iterator<Runnable> iterator() {
                return WrappedIterator.create( dbq.iterator() ).mapWith( dr -> (Runnable)dr );
            }

            public boolean offer(Runnable e, long timeout, TimeUnit unit) throws InterruptedException {
                return dbq.offer( DR.make( e ), timeout, unit );
            }

            public boolean offer(Runnable e) {
                return dbq.offer( DR.make( e ) );
            }

            public Runnable peek() {
                return dbq.peek();
            }

            public Runnable poll() {
                return dbq.poll();
            }

            public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
                return dbq.poll( timeout, unit );
            }

            public void put(Runnable e) throws InterruptedException {
                dbq.put( DR.make(e) );
            }

            public int remainingCapacity() {
                return dbq.remainingCapacity();
            }

            public Runnable remove() {
                return dbq.remove();
            }

            public boolean remove(Object o) {
                if (o instanceof Runnable)
                {
                    return dbq.remove( DR.make(  (Runnable)o) );
                }
                return false;
            }

            public boolean removeAll(Collection<?> arg0) {
                List<DR> lst = new ArrayList<DR>();
                for (Object o : arg0)
                {
                    if (o instanceof Runnable)
                    {
                        lst.add(  DR.make(  (Runnable)o ) );
                    }

                }               
                return dbq.removeAll( lst );
            }

            public boolean retainAll(Collection<?> arg0) {
                return dbq.retainAll( arg0 );
            }

            public int size() {
                return dbq.size();
            }

            public Runnable take() throws InterruptedException {
                return dbq.take();
            }

            public Object[] toArray() {
                return dbq.toArray();
            }

            public <T> T[] toArray(T[] arg0) {
                return dbq.toArray( arg0 );
            }                
    };                       
}

To use the solution create the DelayedBlockingQueue and use the asRunnableQueue() method to pass the runnable queue to the ThreadPoolExecutor constructor.

DelayedBlockingQueue queue = new DelayedBlockingQueue();
ThreadPoolExecutor execService = new ThreadPoolExecutor( 1, 5, 30, TimeUnit.SECONDS, queue.asRunnableQueue() );
0

精彩评论

暂无评论...
验证码 换一张
取 消