开发者

SpringBoot线程池和Java线程池的使用和实现原理解析

开发者 https://www.devze.com 2023-04-12 10:35 出处:网络 作者: Twilight's
目录SpringBoot线程池和Java线程池的用法和实现原理使用默认的线程池方式一:通过@Async注解调用方式二:直接注入 ThreadPoolTaskExecutor线程池默认配置信息SpringBoot 线程池的实现原理覆盖默认的线程池管理多
目录
  • SpringBoot线程池和Java线程池的用法和实现原理
  • 使用默认的线程池
    • 方式一:通过@Async注解调用
    • 方式二:直接注入 ThreadPoolTaskExecutor
    • 线程池默认配置信息
  • SpringBoot 线程池的实现原理
    • 覆盖默认的线程池
    • 管理多个线程池
  • JAVA常用的四种线程池
    • newCachedThreadPool
    • newFixedThreadPool
    • newScheduledThreadPool
    • newSingleThreadExecutor
  • Java 线程池中的四种拒绝策略
    • CallerRunsPolicy
    • AbortPolicy
  • Java 线程复用的原理

    SpringBoot线程池和Java线程池的用法和实现原理

    使用默认的线程池

    方式一:通过@Async注解调用

    public class AsyncTest {
        @Async
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thre开发者_JAVAad.sleep(1000);
        }
    }

    启动类上需要添加@EnableAsync注解,否则不会生效。

    @SpringBootApplication
    //@EnableAsync
    public class Test1Application {
       public static void main(String[] args) throws InterruptedException {
          ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
          AsyncTest bean = run.getBean(AsyncTest.class);
          for(int index = 0; index <= 10; ++index){
             bean.async(String.valueOf(index));
          }
       }
    }

    方式二:直接注入 ThreadPoolTaskExecutor

    此时可不加 @EnableAsync注解

    @SpringBootTest
    class Test1ApplicationTests {
    
       @Resource
       ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
       @Test
       void contextLoads() {
          Runnable runnable = () -> {
             System.out.println(Thread.currentThread().getName());
          };
    
          for(int index = 0; index <= 10; ++index){
             threadPoolTaskExecutor.submit(runnable);
          }
       }
    
    }

    线程池默认配置信息

    SpringBoot线程池的常见配置:

    spring:
      task:
        execution:
          pool:
            core-size: 8
            max-size: 16                          # 默认是 Integer.MAX_VALUE
            keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
            allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
            queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
          shutdown:
            await-termination: false              # 线程关闭等待
          thread-name-prefix: task-               # 线程名称的前缀

    SpringBoot 线程池的实现原理

    TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

    注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutor 和 taskExecutor

    // TaskExecutionAutoConfiguration#applicationTaskExecutor()
    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
          AsyncAnnotationBeanPostProcessor.DEFAUL
              T_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
       return builder.build();
    }
    // ThreadPoolTaskExecutor#initializeExecutor()
    @Override
    protected ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
       blockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    
       ThreadPoolExecutor executor;
       if (this.taskDecorator != null) {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
             @Override
             public void execute(Runnable command) {
                Runnable decorated = taskDecorator.decorate(command);
                if (decorated != command) {
                   decoratedTaskMap.put(decorated, command);
                }
                super.execute(decorated);
             }
          };
       }
       else {
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
    
       }
    
       if (this.allowCoreThreadTimeOut) {
          executor.allowCoreThreadTimeOut(true);
       }
    
       this.threadPoolExecutor = executor;
       return executor;
    }
    
    // ExecutorConfigurationSupport#initialize()
    public void initialize() {
       if (logger.isInfoEnabled()) {
          logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
       }
       if (!this.threadNamePrefixSet && this.beanName != null) {
          setThreadNamePrefix(this.beanName + "-");
       }
       this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    }

    覆盖默认的线程池

    覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

    @Configuration
    public class ThreadPoolConfiguration {
    
        @Bean("taskExecutor")
        public ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            //设置线程池参数信息
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(50);
            taskExecutor.setQueueCapacity(200);
            taskExecutor.setKeepAliveSeconds(60);
            taskExecutor.setThreadNamePrefix("myExecutor--");
            taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            taskExecutor.setAwaitTerminationSeconds(60);
            //修改拒绝策略为使用当前线程执行
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //初始化线程池
            taskExecutor.initialize();
            return taskExecutor;
        }
    }

    管理多个线程池

    如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

    @Bean("taskExecutor2")
    public ThreadPoolTaskExecutor taskExecutor2() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }

    引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

    @Resource
    ThreadPoolTaskExecutor taskExecutor2;

    对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2")
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }

    线程池的四种拒绝策略

    JAVA常用的四种线程池

    ThreadPoolExecutor 类的构造函数如下:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    newCachedThreadPool

    不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

    newFixedThreadPool

    定长线程池,超出线程数的任务会在队列中等待。

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

    newScheduledThreadPool

    类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedworkQueue());
    }

    周期执行:

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(()->{
       System.out.println("rate");
    }, 1, 1, TimeUnit.SECONDS);

    延时执行:

    scheduledThreadPool.schedule(()->{
       System.out.println("delay 3 seconds");
    }, 3, TimeUnit.SECONDS);

    newSingleThreadExecutor

    单线程线程池,可以实现线程的顺序执行。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    Java 线程池中的四种拒绝策略

    • CallerRunsPolicy:线程池让调用者去执行。

    • AbortPolicy:如果线程池拒绝了任务,直接报错。

    • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

    • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

    CallerRunsPolicy

    直接在主线程中执行了run方法。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    效果类似于:

    Runnable thread = ()->{
       System.out.println(Thread.currentThread().getName());
       try {
          Thread.sleep(0);
       } catch (InterruptedException e) {
          throw new RuntimeException(e);
       }
    };
    
    thread.run();

    AbortPolicy

    直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

    public static class AbortPolicy implements RejectedExecutionHandler {
     
        public AbortPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecuandroidtor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    DiscardPolicy

    什么也不做。

    public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    DiscardOldestPolicy

    • e.getQueue().poll() : 取出队列最旧的任务。

    • e.execute(r) : 当前任务入队。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     
        public DiscardOldestPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

    Java 线程复用的原理

    java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never bephp serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Cr编程客栈eates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHmxvNCEzSeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), javascriptSTOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    到此这篇关于SpringBoot线程池和Java线程池的用法和实现原理的文章就介绍到这了,更多相关SpringBoot线程池和Java线程池用法内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号