开发者

PowerJob的ProcessorLoader工作流程源码解读

开发者 https://www.devze.com 2023-12-23 10:38 出处:网络 作者: codecraft
目录序ProcessorLoaderProcessorDefinitionProcessorBeanPowerJobProcessorLoaderProcessorFactoryBuiltInDefaultProcessorFactoryJarContainerProcessorFactoryAbstractBuildInSpringProcessorFactoryBuiltInSpring
目录
  • ProcessorLoader
    • ProcessorDefinition
    • ProcessorBean
  • PowerJobProcessorLoader
    • ProcessorFactory
      • BuiltInDefaultProcessorFactory
      • JarContainerProcessorFactory
      • AbstractBuildInSpringProcessorFactory
      • BuiltInSpringProcessorFactory
      • BuildInSpringMethodProcessorFactory
    • 小结

      本文主要研究一下PowerJob的ProcessorLoader

      ProcessorLoader

      tech/powerjob/worker/processor/ProcessorLoader.Java

      public interface ProcessorLoader {
          ProcessorBean load(ProcessorDefinition definition);
      }
      ProcessorLoader定义了load方法,用于根据ProcessorDefinition加载ProcessorBean

      ProcessorDefinition

      tech/powerjob/worker/extension/processor/ProcessorDefinition.java

      @Getter
      @Setter
      @ToString
      @Accessors(chain = true)
      public class ProcessorDefinition implements Serializable {
          /**
           * 后台配置的处理器类型
           */
          private String processorType;
          /**
           * 后台配置的处理器信息
           */
          private String processorInfo;
          @Override
          public boolean equals(Object o) {
              if (this == o) {
                  return true;
              }
              if (o == null || getClass() != o.getClass()) {
                  return false;
              }
              ProcessorDefinition that = (ProcessorDefinition) o;
              return Objects.equals(processorType, that.processorType) && Objects.equals(processorInfo, that.processorInfo);
          }
          @Override
          public int hashCode() {
              return Objects.hash(processorType, processorInfo);
          }
      }
      ProcessorDefinition定义了processorType及processorInfo两个属性

      ProcessorBean

      tech/powerjob/worker/extension/processor/ProcessorBean.java

      @Getter
      @Setter
      @Accessors(chain = true)
      public class ProcessorBean {
          /**
           * 真正用来执行逻辑的处理器对象
           */
          private transient BasicProcessor processor;
          /**
           * 加载该处理器对象的 classLoader,可空,空则使用 {@link Object#getClass()#getClassLoader() 代替}
           */
          private transient ClassLoader classLoader;
      }
      ProcessorBean定义了BasicProcessor及ClassLoader两个属性

      PowerJobProcessorLoader

      tech/powerjob/worker/processor/PowerJobProcessorLoader.java

      @Slf4j
      public class PowerJobProcessorLoader implements ProcessorLoader {
          private final List<ProcessorFactory> processorFactoryList;
          private final Map<ProcessorDefinition, ProcessorBean> def2Bean = new ConcurrentHashMap<>(128);
          public PowerJobProcessorLoader(List<ProcessorFactory> processorFactoryList) {
              this.processorFactoryList = processorFactoryList;
          }
          @Override
          public ProcessorBean load(ProcessorDefinition definition) {
              return def2Bean.computeIfAbsent(definition, ignore -> {
                  final String processorType = definition.getProcessorType();
                  log.info("[ProcessorFactory] start to load Processor: {}", definition);
                  for (ProcessorFactory pf : processorFactoryList) {
                      final String pfName = pf.getClass().getSimpleName();
                      if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) {
                          log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType);
                          continue;
                      }
                      log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition);
                      try {
                          ProcessorBean processorBean = pf.build(definition);
                          if (processorBean != null) {
                              log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition);
                              return processorBean;
                          }
                      } catch (Throwable t) {
                          log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t);
                      }
                  }
                  throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
              });
          }
      }
      PowerJobProcessorLoader实现了ProcessorLoader接口,其构造器要求传入processorFactoryList,它还定义了def2Bean,用于维护ProcessorDefinition与ProcessorBean的关系;其load方法使用ConcurrentHashMap的computeIfAbsent,将加载好的ProcessorBean放入到def2Bean;其加载过程为遍历processorFactoryList,找到支持该processorType的ProcessorFactory,然后执行其build方法进行构造

      ProcessorFactory

      tech/powerjob/worker/extension/processor/ProcessorFactory.java

      public interface ProcessorFactory {
          /**
           * 支持的处理器类型,类型不匹配则跳过该 ProcessorFactory 的加载逻辑
           * 对应的是控制台的'处理器类型' TAB,不做任何定制的情况下,取值范围为 {@link ProcessorType#name()}
           * @return 支持的处理器类型
           */
          Set<String> supportTypes();
          /**
           * 根据处理器定义构建处理器对象
           * 注意:Processor 为单例对象,即 PowerJob 对每一个 ProcessorBean 只调用一次 build 方法
           * @param processorDefinition 处理器定义
           * @return null or ProcessorBean
           */
          ProcessorBean build(ProcessorDefinition processorDefinition);
      }
      ProcessorFactory接口定义了supportTypes、build方法;它有四个实现类,其中BuiltInSpringProcessorFactory及BuildInSpringMethodProcessorFactory继承自AbstractBuildInSpringProcessorFactory,另外两个为BuiltInDefaultProcessorFactory、JarContainerProcessorFactory

      BuiltInDefaultProcessorFactory

      tech/powerjob/worker/processor/impl/BuiltInDefaultProcessorFactory.java

      @Slf4j
      public class BuiltInDefaultProcessorFactory implements ProcessorFactory {
          @Override
          public Set<String> supportTypes() {
              return Sets.newHashSet(ProcessorType.BUILT_IN.name());
          }
          @Override
          public ProcessorBean build(ProcessorDefinition processorDefinition) {
              String className = processorDefinition.getProcessorInfo();
              try {
                  Class<?> clz = Class.forName(className);
                  BasicProcessor basicProcessor = (BasicProcessor) clz.getDeclaredConstructor().newInstance();
                  return new ProcessorBean()
                          .setProcessor(basicProcessor)
                          .setClassLoader(basicProcessor.getClass().getClassLoader());
              }catch (Exception e) {
                  log.warn("[ProcessorFactory] load local Processor(className = {}) failed.", className, e);
              }
              return null;
          }
      }
      BuiltInDefaultProcessorFactory是默认的处理器工厂,通过全限定类名加载处理器,但没有IOC功能

      JarContainerProcessorFactory

      tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java

      @Slf4j
      public class JarContainerProcessorFactory implements ProcessorFactory {
          private final WorkerRuntime workerRuntime;
          public JarContainerProcessorFactory(WorkerRuntime workerRuntime) {
              this.workerRuntime = workerRuntime;
          }
          @Override
          public Set<String> supportTypes() {
              return Sets.newHashSet(ProcessorType.EXTERNAL.name());
          }
          @Override
          public ProcessorBean build(ProcessorDefinition processorDefinition) {
              String processorInfo = processorDefinition.getProcessorInfo();
              String[] split = processorInfo.split("#");
              String containerName = split[0];
              String className = split[1];
              log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);
              OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);
              if (omsContainer != null) {
                  return new ProcessorBean()
                          .setProcessor(omsContainer.getProcessor(className))
                          .setClassLoader(omsContainer.getContainerClassLoader());
              } else {
                  log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);
              }
              return null;
          }
      }
      JarContainerProcessorFactory主要是通过OmsContainer来加载ProcessorBean

      AbstractBuildInSpringProcessorFactory

      tech/powerjob/worker/processor/impl/AbstractBuildInSpringProcessorFactory.java

      @Slf4j
      public abstract class AbstractBuildInSpringProcessorFactory implements ProcessorFactory {
          protected final ApplicationContext applicationContext;
          protected AbstractBuildInSpringProcessorFactory(ApplicationContext applicationContext) {
              this.applicationContext = applicationContext;
          }
          @Override
          public Set<String> supportTypes() {
              return Sets.newHashSet(ProcessorType.BUILT_IN.name());
          }
          protected boolean checkCanLoad() {
              try {
                  ApplicationContext.class.getClassLoader();
                  return applicationContext != null;
              } catch (Throwable ignore) {
              }
              return false;
          }
          @SuppressWarnings("unchecked")
          protected static <T> T getBean(String className, ApplicationContext ctx) throws Exception {
              // 0. 尝试直接用 Bean 名称加载
              try {
                  final Object bean = ctx.getBean(className);
                  if (bean != null) {
                      return (T) bean;
                  }
              } catch (Exception ignore) {
              }
              // 1. ClassLoader 存在,则直接使用 clz 加载
              ClassLoader classLoader = ctx.getCl编程assLoader();
              if (classLoader != null) {
                  return (T) ctx.getBean(classLoader.loadClass(className));
              }
              // 2. ClassLoader 不存在(系统类加载器不可见),尝试用类名称小写加载
              String[] split = className.split("\\.");
              String beanName = split[split.length - 1];
              // 小写转大写
              char[] cs = beanName.toCharArray();
              cs[0] += 32;
              String beanName0 = String.valueOf(cs);
              log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0);
              return (T) ctx.getBean(beanName0);
          }
      }
      AbstractBuildInSpringProcessorFactory是两个spring相关ProcessorFactory的抽象类,它使用spring的ApplicationContext来加载

      BuiltInSpringProcessorFactory

      tech/www.devze.compowerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java

      @Slf4j
      public class BuiltInSpringProcessorFactory extends AbstractBuildInSpringProcessorFactory {
          public BuiltInSpringProcessorFactory(ApplicationContext applicationContext) {
              super(applicationContext);
          }
          @Override
          pub编程lic ProcessorBean build(ProcessorDefinition processorDefinition) {
              try {
                  boolean canLoad = checkCanLoad();
                  if (!canLoad) {
                      log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuiltInSpringProcessorFactory'");
                      return null;
                  }
                  String processorInfo = processorDefinition.getProcessorInfo();
                  //用于区分方法级别的参数
                  if (processorInfo.contains("#")) {
                      return null;
                  }
                  BasicProcessor basicProcessor = getBean(processorInfo, applicationContext);
                  return new ProcessorBean()
                          .setProcessor(basicProcessor)
                          .setClassLoader(basicProcessor.getClass().getClassLoader());
              } catch (NoSuchBeanDefinitionException ignore) {
                  log.warn("[ProcessorFactory] can't find the processor in SPRING");
              } catch (Throwable t) {
                  log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t);
              }
              return null;
          }
      }
      BuiltInSpringProcessorFactory通过ApplicationContext加载spring相关的Bean,但它不处理processorInfo包含#的processorDefinition

      BuildInSpringMethodProcessorFactory

      tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java

      @Slf4j
      public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringProcessorFactory {
          private static final List<String> jobHandlerRepository = new LinkedList<>();
          private final static String DELIMITER = "#";
          public BuildInSpringMethodProcessorFactory(ApplicationContext applicationContext) {
              super(applicationContext);
          }
          @Override
          public ProcessorBean build(ProcessorDefinition processorDefinition) {
              try {
                  boolean canLoad = checkCanLoad();
                  if (!canLoad) {
                      log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuildInSpringMethodProcessorFactory'");
                      return null;
                  }
                  String processorInfo = processorDefinition.getProcessorInfo();
                  if (!processorInfo.contains(DELIMITER)) {
                      log.info("[ProcessorFactory] can't parse processorDefinition, this processor can't load by 'BuildInSpringMethodProcessorFactory'");
                      return null;
                  }
                  String[] split = processorInfo.split(DELIMITER);
                  String methodName = split[1];
                  String className = split[0];
                  Object bean = getBean(className,applicationContext);
                  Method[] methods = bean.getClass().getDeclaredMethods();
                  for (Method method : methods) {
                      PowerJobHandler powerJob = method.getAnnotation(PowerJobHandler.class);
                      if (powerJob == null) {
                          continue;
                      }
                      String name = powerJob.name();
                      //匹配到和页面定义相同的methodName
                      if (!name.equals(methodName)) {
                          continue;
                      }
                      if (name.trim().length() == 0) {
                          throw new RuntimeException("method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                      }
                      if (containsJobHandler(name)) {
                          throw new RuntimeException("jobhandler[" + name + "] naming conflicts.");
                      }
                      method.setAccessible(true);
                      registerJobHandler(methodName);
                      MethodBasicProcessor processor = new MethodBasicProcessor(bean, method);
                      return new ProcessorBean()
                              .setProcessor(processor)
                              .setClassLoader(processor.getClass().getClassLoader());
                  }
              } catch (NoSuchBeanDefinitionException ignore) {
                  log.warn("[ProcessorFactory] can't find the processor in SPRING");
              } catch (Throwable t) {
                  log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t);
              }
              return null;
          }
          public static void registerJobHandler(String name) {
              jobHandlerRepository.add(name);
          }
          private boolean containsJowww.devze.combHandler(String name) {
              return jobHandlerRepository.contains(name);
          }
      }
      BuildInSpringMethodProcessorFactory专门用于processorInfo包含#的processorDefinition,它会遍历指定class的methods,找到方法上标注有@PowerJobHandler注解且方法名一致的method,注册到jobHandlerRepository,其创建的是MethodBasicProcessor

      小结

      PowerJob的ProcessorLoader定义了load方法,用于根据ProcessorDefinition加载ProcessorBean;PowerJobProcessorLoader实现了ProcessorLoader接口,它会遍历processorFactoryList,找到支持该processorType的ProcessorFactory,然后执行其build方python法进行构造;ProcessorFactory接口定义了supportTypes、build方法;它有四个实现类,其中BuiltInSpringProcessorFactory及BuildInSpringMethodProcessorFactory继承自AbstractBuildInSpringProcessorFactory,另外两个为BuiltInDefaultProcessorFactory、JarContainerProcessorFactory。

      以上就是PowerJob的ProcessorLoader的详细内容,更多关于PowerJob ProcessorLoader的资料请关注编程客栈(www.devze.com)其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消