开发者

SpringBoot CommandLineRunner的异步任务机制使用

开发者 https://www.devze.com 2024-08-21 10:18 出处:网络 作者: 架构随笔录
目录1.概要分析1.1 手动创建线程1.2 使用 Spring 的异步执行功能2.核心原理分析2.1 接口定义与功能2.2 执行时机2.3 命令行参数传递2.4 实现与注册2.5 执行顺序2.6 与 ApplicationRunner 的区别2.7 应用场景3.部分源码
目录
  • 1.概要分析
    • 1.1 手动创建线程
    • 1.2 使用 Spring 的异步执行功能
  • 2.核心原理分析
    • 2.1 接口定义与功能
    • 2.2 执行时机
    • 2.3 命令行参数传递
    • 2.4 实现与注册
    • 2.5 执行顺序
    • 2.6 与 ApplicationRunner 的区别
    • 2.7 应用场景
  • 3.部分源码分析
    • 3.1 启动Spring Boot应用程序
    • 3.2 调用所有的Runner实现类
  • 4.典型的应用场景分析
    • 4.1 TM(Transaction Manager,事务管理器)
    • 4.2 RM(Resource Manager,资源管理器)
    • 4.3 Seata RM和TM与Seata 编程Server之间的RPC通信
    • 4.4 Seata Server利用SpringBoot CommandLineRunner启动服务端通信渠道
  • 总的来说

    SpringBoot中,CommandLineRunner 本身并不是一个直接支持异步任务的机制。

    CommandLineRunner 接口定义了一个在 Spring Boot 应用程序启动后立即同步执行的方法 run(String... args)

    这意味着,通过实现 CommandLineRunner 接口定义的任务将在主线程中顺序执行,而不会创建新的线程来异步执行这些任务。

    然而,如果你希望在 CommandLineRunner 中执行异步任务,你可以手动创建线程或使用 Spring 的异步执行功能。

    以下是一些实现异步任务的方法。

    1.概要分析

    1.1 手动创建线程

    CommandLineRunnerrun 方法中,你可以直接创建并启动一个新的线程来执行异步任务。

    这种方法简单直接,但需要注意线程管理和异常处理。

    @Component  
    public class MyCommandLineRunner implements CommandLineRunner {  
      
        @Override  
        public void run(String... args) throws Exception {  
            new Thread(() -> {  
                // 异步执行的代码  
                System.out.println("异步任务执行中...");  
                // 模拟耗时操作  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e) {  
                    Thread.currentThread().interrupt();  
                }  
                System.out.println("异步任务完成");  
            }).start();  
      
            // 主线程继续执行,不会等待异步任务完成  
            System.out.println("CommandLineRunner 执行完毕,主线程继续");  
        }  
    }

    1.2 使用 Spring 的异步执行功能

    如果你希望利用 Spring 的异步支持来执行异步任务,你可以在 CommandLineRunner 中注入一个使用 @Async 注解的服务。但是,需要注意的是,由于 CommandLineRunnerrun 方法本身是在 Spring 容器完全初始化之后同步执行的,因此即使你调用了一个异步服务方法,run 方法本身仍然会立即返回,不会等待异步任务完成。

    首先,你需要在 Spring Boot 应用中启用异步支持,通过在启动类上添加 @EnableAsync 注解。

    然后,你可以创建一个异步服务:

    @Service  
    public class AsyncService {  
      
        @Async  
        public void executeAsyncTask() {  
            // 异步执行的代码  
            System.out.println("异步任务执行中...");  
            // 模拟耗时操作  
            try {  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                Thread.currentThread().interrupt();  
            }  
            System.out.println("异步任务完成");  
        }  
    }

    CommandLineRunner 中注入并使用这个服务

    @Component  
    public class MyCommandLineRunner implements CommandLineRunner {  
      
        @Autowired  
        private AsyncService asyncService;  
      
        @Override  
        public void run(String... args) throws Exception {  
            asyncService.executeAsyncTask(); // 调用异步方法,但 CommandLineRunner 的 run 方法会立即返回  
      
            System.out.println("CommandLineRunner 执行完毕,主线程继续,不会等待异步任务完成");  
        }  
    }

    虽然 CommandLineRunner 本身不支持异步执行,但你可以通过手动创建线程或使用 Spring 的异步支持来在 CommandLineRunner 中执行异步任务。然而,需要注意的是,CommandLineRunnerrun 方法本身仍然是同步执行的,它不会等待任何异步任务完成。

    如果你的应用程序依赖于异步任务的结果,你可能需要采用其他机制(如 FutureCompletableFuture 或消息队列)来管理异步任务的执行和结果。

    2.核心原理分析

    org.springframework.boot.CommandLineRunner Spring Boot 框架中的一个核心接口,其原理分析可以从以下几个方面进行。

    2.1 接口定义与功能

    CommandLineRunner 是一个函数式接口(Functional Interface),它只定义了一个抽象方法 run(String... args)

    这个方法在 Spring Boot 应用程序启动完成后被调用,允许开发者执行一些初始化操作或启动后的任务。这些任务可能包括数据初始化、缓存预热、日志记录等。

    2.2 执行时机

    Spring Boot 应用程序启动时,Spring 容器会完成一系列的初始化操作,包括 Bean 的加载和依赖注入等。

    在所有 Spring Bean 都初始化完成后,Spring Boot 会查找所有实现了 CommandLineRunner 接口的 Bean,并依次调用它们的 run 方法。

    这意味着 CommandLineRunner 的执行时机是在 Spring 上下文准备好之后,但在应用程序对外提供服务之前。

    2.3 命令行参数传递

    CommandLineRunnerrun 方法接收一个 String... args 参数,这个参数包含了启动应用程序时传递给它的命令行参数。

    这使得开发者可以根据命令行参数的不同来执行不同的初始化逻辑。

    2.4 实现与注册

    要使用 CommandLineRunner,开发者需要创建一个类并实现这个接口,然后重写 run 方法以定义自己的初始化逻辑。

    为了让 Spring 容器能够扫描到这个实现类并将其注册为一个 Bean,通常会在类上添加 @Component 或其他类似的注解(如 @Service@Repository 等)。此外,也可以通过编程方式在配置编程客栈类中显式地注册这个 Bean

    2.5 执行顺序

    如果应用程序中有多个实现了 CommandLineRunner 接口的类,那么它们的 run 方法将按照一定的顺序执行。

    默认情况下,执行顺序取决于 Spring 容器注册这些 Bean 的顺序。但是,开发者可以通过 @Order 注解或实现 Ordered 接口来指定执行顺序。

    @Order 注解的值越小,优先级越高,相应的 run 方法就会越早执行。

    2.6 与 ApplicationRunner 的区别

    值得注意的是,Spring Boot 还提供了另一个类似的接口 ApplicationRunner,它也用于在应用程序启动后执行初始化任务。与 CommandLineRunner 不同的是,ApplicationRunnerrun 方法接收一个 ApplicationArguments 参数而不是 String... args

    ApplicationArguments 提供了对命令行参数的更高级别访问,包括选项和非选项参数等。此外,如果同时存在 CommandLineRunnerApplicationRunner 的实现,那么 CommandLineRunner 的实现会先于 ApplicationRunner 的实现被调用。

    2.7 应用场景

    CommandLineRunner 适用于需要在应用程序启动后立即执行的任务场景,如数据初始化、配置加载、缓存预热等。通过使用 CommandLineRunner,开发者可以确保这些任务在应用程序对外提供服务之前完成,从而提高应用程序的性能和用户体验。

    综上所述,org.sphttp://www.devze.comringframework.boot.CommandLineRunner Spring Boot 框架中用于执行启动后任务的强大机制,它通过简单的接口定义和灵活的注册方式,为开发者提供了方便、高效的初始化操作手段。

    3.部分源码分析

    3.1 启动Spring Boot应用程序

    /**
     * 启动应用程序。
     * 
     * @param args 命令行参数
     * @return ConfigurableApplicationContext 应用程序上下文
     */
    public ConfigurableApplicationContext run(String... args) {
        // 记录应用程序启动时间
        long startTime = System.nanoTime();
        
        // 创建引导上下文
        DefaultBootstrapContext bootstrapContext = createBootstrapContext();
        ConfigurableApplicationContext context = null;
        
        // 配置无头模式属性
        configureHeadlessProperty();
        
        // 获取启动监听器
        SpringApplicationRunListeners listeners = getRunListeners(args);
        
        // 通知监听器应用程序即将启动
        listeners.starting(bootstrapContext, this.mainApplicationClass);
        
        try {
            // 创建并配置应用参数
            ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
            ConfigurableEnvironment environment = prepareEnvironment(listeners, bootstrapContext, applicationArguments);
            
            // 配置忽略BeanInfo的设置
            configureIgnoreBeanInfo(environment);
            
            // 打印启动横幅
            Banner printedBanner = printBanner(environment);
            
            // 创建应用程序上下文
    js        context = createApplicationContext();
            
            // 设置应用启动器
            context.setApplicationStartup(this.applicationStartup);
            
            // 准备应用程序上下文
            prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
            
            // 刷新上下文,使配置生效
            refreshContext(context);
            
            // 启动后配置
            afterRefresh(context, applicationArguments);
            
            // 计算应用程序启动时间
            Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
            
            // 如果启用了启动信息日志,则记录启动信息
            if (this.logStartupInfo) {
                new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), timeTakenToStartup);
            }
            
            // 通知监听器应用程序已启动
            listeners.started(context, timeTakenToStartup);
            
            // 调用应用程序运行者
            callRunners(context, applicationArguments);
        } catch (Throwable ex) {
            // 处理启动失败
            handleRunFailure(context, ex, listeners);
            throw new IllegalStateException(ex);
        }
        
        try {
            // 计算应用程序就绪时间
            Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
            
            // 通知监听器应用程序已就绪
            listeners.ready(context, timeTakenToReady);
        } catch (Throwable ex) {
            // 处理就绪失败
            handleRunFailure(context, ex, null);
            throw new IllegalStateException(ex);
        }
        
        // 返回应用程序上下文
        return context;
    }

    3.2 调用所有的Runner实现类

    /**
     * 调用所有的Runner实现类。
     * 
     * 本方法的目的是遍历ApplicationContext中所有的Runner实例,并根据它们的类型分别调用相应的方法。
     * Runner和CommandLineRunner是Spring Boot提供的一组接口,用于在应用程序启动后执行一些自定义的初始化逻辑。
     * 这里通过判断Runner的类型,来决定是调用ApplicationRunner还是CommandLineRunner的方法,从而实现对不同类型Runner的兼容处理。
     * 
     * @param context Spring应用上下文,用于获取BeanProvider以获取Runner实例。
     * @param args 命令行参数,传递给每个Runner的调用方法。
     */
    private void callRunners(ApplicationContext context, ApplicationArguments args) {
        // 通过BeanProvider获取所有Runner类型的bean,并以有序的方式遍历它们
        context.getBeanProvider(Runner.class).orderedStream().forEach((runner) -> {
            // 如果runner是ApplicationRunner类型,则调用callRunner方法,并传入ApplicationRunner和命令行参数
            if (runner instanceof ApplicationRunner) {
                callRunner((ApplicationRunner) runner, args);
            }
            // 如果runner是CommandLineRunner类型,则同样调用callRunner方法,并传入CommandLineRunner和命令行参数
            if (runner instanceof CommandLineRunner) {
                callRunner((CommandLineRunner) runner, args);
            }
        });
    }

    4.典型的应用场景分析

    Seata中的TMTransaction Manager,事务管理器)和RMResource Manager,资源管理器)是分布式事务框架中的关键角色,它们各自承担着特定的职责,以确保分布式事务的一致性和完整性。

    4.1 TM(Transaction Manager,事务管理器)

    4.1.1 定义与职责

    • (1)TM负责定义全局事务的范围,即开始全局事务、提交或回滚全局事务。它是分布式事务的发起者和终结者,类似于本地事务中的begin…commit…rollback操作,但针对的是全局的分布式事务。
    • (2)Seata框架中,TM与业务系统集成在一起,作为客户端存在。当业务操作需要跨多个服务或数据库时,TM会启动一个全局事务,并管理这个事务的生命周期。

    4.1.2 工作流程

    • (1)TMTCTransaction Coordinator,事务协调者)请求开启一个全局事务,TC生成一个全局唯一的事务ID(XID)并返回给TM
    • (2)TM携带XID进行远程服务调用,XID在微服务调用链中传播,确保所有参与的分支事务都能被正确关联到全局事务中。
    • (3)当业务操作完成后,TM根据业务逻辑向TC发起全局事务的提交或回滚请求。
    • (4)TC根据各分支事务的执行结果,决定全局事务的提交或回滚,并通知所有RM进行相应的操作。

    4.2 RM(Resource Manager,资源管理器)

    4.2.1 定义与职责

    • (1)RM负责管理分支事务处理的资源,如数据库连接、消息队列等。它是分布式事务中具体执行操作的组件。
    • (2)RMTC进行通信,注册分支事务、报告分支事务的状态,并根据TC的指令驱动分支事务的提交或回滚。
    • (3)Seata框架中,RM同样与业务系统集成在一起,作为客户端存在。每个参与全局事务的服务或数据库操作,都会有一个对应的RM来管理。

    4.2.2 工作流程

    • (1)在执行具体业务操作之前,RM会向TC注册分支事务,并将其纳入全局事务的管辖范围。
    • (2)RM执行本地事务操作,如数据库更新、消息发送等,并确保这些操作是可回滚和持久化的。
    • (3)RM将分支事务的执行结果(提交或回滚)上报给TC
    • (4)TC收到TM的全局事务提交或回滚请求时,它会根据各分支事务的状态决定全局事务的结果,并通知所有RM进行相应的提交或回滚操作。

    4.3 Seata RM和TM与Seata Server之间的RPC通信

    Seata中,TMTransaction Manager,事务管理器)与Seata Server(即TCTransaction Coordinator,事务协调者)之间的通信是通过RPCRemote Procedure Call,远程过程调用)实现的。

    RPC是一种允许程序在网络上调用远程计算机上程序的技术,就像调用本地计算机上的程序一样。

    4.3.1 TM与Seata Server之间的RPC通信

    (1)建立连接

    • TM在启动时会尝试与Seata Server建立长连接。这个连接是通过Netty等网络通信框架实现的,Netty提供了高效、异步的网络通信能力。
    • 在建立连接的过程中,TM会向Seata Server发送注册请求,包括应用ID、事务组名称等信息,以便Seata Server能够识别和管理该TM

    (2)事务管理

    • 一旦连接建立,TM就可以通过RPC调用Seata Server提供的服务来管理全局事务。
    • 例如,当业务操作需要跨多个服务或数据库时,TM会向Swww.devze.comeata Server请求开启一个全局事务,并获取一个全局唯一的事务ID(XID)
    • 在执行远程服务调用时,TM会将XID携带在调用中,以便参与的RM能够识别并将分支事务注册到全局事务中。
    • 当业务操作完成后,TM会根据业务逻辑向Seata Server发起全局事务的提交或回滚请求。

    (3)通信协议

    • Seata使用自定义的通信协议来进行RPC通信,该协议支持事务的创建、提交、回滚等操作。
    • 通信过程中,Seata还实现了心跳检测、超时重试等机制来确保通信的可靠性和稳定性。

    (4)性能优化

    • 为了提高RPC通信的性能,Seata采用了多种优化策略,如使用Netty的主从Reactor多线程模型来处理并发请求、采用批量发送请求来减少网络开销等。

    4.3.2 RM与Seata Server之间的RPC通信

    Seata中,RM负责管理分支事务处理的资源,如数据库连接等。

    RM执行分支事务时,它需要与Seata Server进行通信,以注册分支事务、报告分支事务的状态,并根据Seata Server的指令驱动分支事务的提交或回滚。

    这种通信是通过RPC机制实现的,它允许RM远程调用Seata Server上的服务。

    (1)建立连接

    • RM启动时,它会根据配置尝试与Seata Server建立长连接。这个连接是通过Netty等网络通信框架实现的,Netty提供了高效、异步的网络通信能力。
    • 在建立连接的过程中,RM会向Seata Server发送注册请求,包括应用ID、事务组名称等信息,以便Seata Server能够识别和管理该RM。

    (2)分支事务注册

    • 当RM执行一个分支事务时,它会向Seata Server注册该分支事务。注册过程中,RM会携带全局事务ID(XID)等信息,以便Seata Server能够将该分支事务关联到相应的全局事务中。
    • Seata Server在收到注册请求后,会为该分支事务分配一个唯一的分支事务ID,并将其注册到全局事务中。

    (3)状态报告与指令执行

    • 在分支事务执行过程中,RM会定期向Seata Server报告分支事务的状态,如正在执行、已提交、已回滚等。
    • 当全局事务需要提交或回滚时,Seata Server会根据各分支事务的状态和结果,向RM发送相应的提交或回滚指令。
    • RM在收到指令后,会执行相应的操作,如提交本地事务、回滚本地事务等,并将执行结果报告给Seata Server

    (4)心跳检测与异常处理

    • 为了保持连接的活跃状态,RM会定期向Seata Server发送心跳消息。
    • 如果Seata Server在一段时间内没有收到RM的心跳消息,它可能会认为RM已经离线,并采取相应的异常处理措施,如重试连接、记录日志等。

    Seata RMSeata Server之间的RPC通信是Seata分布式事务框架中的重要组成部分。通过高效的RPC通信机制,RM能够远程调用Seata Server提供的服务来管理分支事务,确保分布式事务的一致性和完整性。同时,Seata还通过多种优化策略来提高RPC通信的性能和可靠性。

    4.4 Seata Server利用SpringBoot CommandLineRunner启动服务端通信渠道

    Seata Server中,ServerRunner类是一个重要的组件,它继承自Spring BootCommandLineRunner接口。

    这意味着在Spring Boot应用启动后,ServerRunnerrun()方法会被自动执行。这种方法通常用于在应用启动后立即执行一段特定的代码,比如初始化资源、启动服务等。

    ServerRunner类的主要职责是初始化Netty通信渠道,即NettyRemotingServer

    Netty是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。

    Seata中,Netty用于节点间的通信,包括事务协调器(TC)、事务管理器(TM)和资源管理器(RM)之间的通信。

    以下是ServerRunner类中run()方法可能包含的逻辑的一个简单示例:

    @Override  
    public void run(String... args) throws Exception {  
        // 初始化Netty通信服务器  
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer();  
        nettyRemotingServer.setPort(serverPort); // 设置服务器端口  
        nettyRemotingServer.setHost(serverHost); // 设置服务器主机地址  
        nettyRemotingServer.start(); // 启动服务器  
      
        // 其他初始化逻辑,比如注册服务等  
    }

    在这个例子中,run()方法首先创建了一个NettyRemotingServer实例,并设置了服务器的主机地址和端口号。然后,它调用start()方法来启动Netty服务器,这样Seata Server就可以监听来自其他节点的请求了。

    总的来说

    ServerRunner类在Seata Server中扮演着重要的角色,它负责初始化Netty通信渠道,为Seata节点间的通信提供基础设施。

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号