开发者

Flink DataStream基础框架源码分析

开发者 https://www.devze.com 2022-12-02 10:31 出处:网络 作者: xiangel
目录引言概览深入DataStreamDataStream属性和方法类体系Transformation属性和方法类体系StreamOperator属性和方法类体系FunctionDataStream生成提交执行的GraphStreamGraph属性和方法StreamGraph生成JobGraph属性和方
目录
  • 引言
  • 概览
  • 深入DataStream
    • DataStream
      • 属性和方法
      • 类体系
    • Transformation
      • 属性和方法
      • 类体系
    • StreamOperator
      • 属性和方法
      • 类体系
    • Function
    • DataStream生成提交执行的Graph
      • StreamGraph
        • 属性和方法
        • StreamGraph生成
      • JobGraph
        • 属性和方法
    • 总结

      引言

      希望通过对Flink底层源码的学习来更深入了解Flink的相关实现逻辑。这里新开一个Flink源码解析的系列来深入介绍底层源码逻辑。说明:这里默认相关读者具备Flink相关基础知识和开发经验,所以不会过多介绍相关的基础概念相关内容,Flink使用的版本为1.15.2。初步确定按如下几个大的方面来介绍

      计算模型

      • DataStream基础框架

      部署&调度

      存储体系

      底层支撑

      概览

      本篇是第一篇,介绍计算模型的基础DataStream的相关内容,这一篇只介绍DataStream的基础内容,如如何实现相关的操作,数据结构等,不会涉及到窗口、事件事件和状态等信息

      DataStream是对数据流的一个抽象,其提供了丰富的操作算子(例如过滤、map、join、聚合、定义窗口等)来对数据流进行处理,下图描述了Flink中源数据通过DataStream的转换最后输出的整个过程。

      Flink DataStream基础框架源码分析

      通过上图可以来构想下,一般一个DataStream具有如下主要属性

      Flink DataStream基础框架源码分析

      属性说明
      上游依赖标识上游依赖信息,这样能把整个处理流程串联起来
      并行度处理逻辑的并行度信息,这样可以提高处理的速度
      输入格式指定输入数据的格式,如InnerType { public int id; public String www.devze.comtext; }
      输出格式指定输出数据的格式
      处理逻辑上游datastream转换到目前的datastream的具体逻辑操作,如map的具体逻辑信息。

      最终整个数据流会生成一个DAG图(有向无环图),通过这个DAG图就可以生成对应的任务来运行了。下面来具体分析DataStream的实现和生成DAG图(Flink中叫Graph)

      深入DataStream

      首先我们通过下图来看看,DataStream中的一些主要的辅助类,DataStream类本身主要逻辑是对各类转换关系和sink的操作,而前面说到的一些主要属性信息都是通过辅助类来处理的。

      Flink DataStream基础框架源码分析

      Transformation:本身主要管理了输出格式、上游依赖、并行度、id编号等信息以及StreamOperator的工厂类(Stre开发者_JS开发amOperatorFactory)

      StreamOperator:主要是各编程客栈类操作的具体处理逻辑

      Function:用户自定义函数的接口,如DataStream中map处理时需要传入的MapFunction就是Function的子接口

      DataStream

      属性和方法

      Flink DataStream基础框架源码分析

      DataStream的属性比较简单,就2个,1个是实行的环境信息,另一个是Transformation。

      DataStream中的方法主要分为以下几类

      • 基础属性信息:如获取并行度,id,输出格式等,大多数是代理来调用Transformation中对应的方法
      • 转换操作:各类的转换处理,如map、filter、shuffle、join等
      • 输出处理:各类输出的sink处理,如保存为文本等,不过大多数方法都不推荐使用了,这里主要的方法是addSink()
      • 触发执行: 如executeAndCollect,内部是调用了env.executeAsync来执行streaming dataflow

      除了转换操作外,其他几类的逻辑都比较直观和简单,这里重点介绍下转换操作的处理,转换操作这里分为3类,1.返回是一个DataStream。如map、filter、union、keyBy等;2.返回的是一个Streams,即输入是多个DataStream,这类的操作主要是多流关联的操作,如join、coGroup。这些Streams的类中实现了一些方法,来返回一个DataStream; 3.window类,返回的是AllWindowedStream类型,同样这些类中也是有方法,来返回一个DataStream。

      说明:如上的各个分类都是个人基于理解上做的各个分类处理,非官方定义

      类体系

      DataStream的类图关系比较简单,就如下这几个类,具体每个子类的信息见下表

      Flink DataStream基础框架源码分析

      子类名说明
      SingleOutputStreamOperator只有1个输出的DataStream
      IterativeStream迭代的DataStream,具体使用场景后面分析
      DataStreamSource最开始的DataStream,里面有source的信息
      KeyedStream有一个Key信息的DataStream

      Transformation

      属性和方法

      DataStream类本身主要是提供了给外部的编程接口的支持,而对Streaming flow算子节点本身的一些属性和操作则由Transformation来负责

      Flink DataStream基础框架源码分析

      从上图可以看出其主要属性有节点id,名称,并行度,输出类型还有一些与资源相关的内容,还有一个是上游的输入Transformation,由于这个因不同的Transformation会有不同的数据个数,所以这个信息是放在各个子类中的。如ReduceTransformation是有一个input属性来记录上游依赖,而如TwoInputTransformation则是有2个属性input1和input2来记录上游依赖,另外如SourceTransformation,这个是源头的Transformation,是没有上游依赖的Transformation,所以没有属性来记录,但是有个Source属性来记录Source输入

      具体对数据的操作处理,在Transformation里面有个StreamOperatorFactory属性,其中的StreamOperator实现了各种的处理算子。注意这里不是所有的Transformation都包含StreamOperatorFactory,如SourceTransformation中就没有,这个具体大家可以看看相关的代码。

      Transformation的方法基本上是对上述属性的get和set操作,这里重点要说明一下的是PhysicalTransformation(下面类体系来介绍)中的setChainingStrategy方法,这里的ChainingStrategy是一个枚举类,主要是控制多个连续的算子是否可以进行链式处理,这个具体的我们在下面介绍St编程客栈reamGraph时再介绍

      Flink DataStream基础框架源码分析

      类体系

      Flink DataStream基础框架源码分析

      Transformation的大多数类均为PhysicalTransformation的子类,PhysicalTransformation为有物理操作的,重点是这类的子类是支持Chaining操作的。我们先来看看其重要子类

      子类名说明
      SourceTransformation连接Source的Transformation,是整个streaming flow的最开始的转换处理
      SinkTransformation输出的转换处理,是整个streaming flow的最后一个
      OneInputTransformation只有1个输入的转换处理,如map、filter这类的处理
      TwoInputTransformation有2个输入的转换处理

      StreamOperator

      属性和方法

      StreamOperator负责对数据进行处理的具体逻辑,如map处理的StreamMap,由于各个Operator的处理方式的不同,这里主要以AbstractStreamOperator来介绍一些主要的属性,如output的数据,StreamConfig,StreamingRuntimeContext等。

      下面我们重点介绍下相关的方法

      StreamOperator接口有定义了重要的3个方法(这里只介绍与数据基础处理相关的部分)

      方法说明
      open()数据处理的前处理,如算子的初始化操作等
      finish()数据处理的后处理,如缓存数据的flush操作等
      close()该方法在算子生命周期的最后调用,不管是算子运行成功还是失败或者取消,主要是对算子使用到的资源的各种释放处理

      另外关注的对数据进行实际处理的方法,

      接口方法说明
      OneInputStreamOperatorprocessElement()对数据元素进行处理,实际该接口在OneInputStreamOperator的父接口Input中定义
      TwoInputStreamOperatorprocessElement1()对input1的数据元素进行处理
      processElement2()对input2的数据元素进行处理

      类体系

      StreamOpterator的子类非常多,包括测试类的一起有287个,这些大致可以归属到如下3个子类中,

      Flink DataStream基础框架源码分析

      类名说明
      OneInputStreamOperator只有1个输入的源
      TwoInputStreamOperator有2个输入源
      AbstractStreamOperator

      Function

      Function是针对所有的用户自定义的函数,各子类主要是实现对应的,这里定义了种类丰富的各类Function的子接口类来适配各种不同的加工场景,具体的就看源码了,这里就不详细介绍了

      本节的最后,我们通过一个例子来看看这几个类是怎么组合的。如下是一个常见的对DataStream进行map处理的操作

            text
              .flatMap(new Tokenizer)
      

      处理后对应的DataStream的结构如下图

      Flink DataStream基础框架源码分析

      DataStream生成提交执行的Graph

      前面分析了DataStream,是单个节点的,接下来看看整个streaming flow在flink中是怎么转换为可以执行的逻辑的。一般整个数据流我们叫做DAG,那在Flink中叫PipeLine,其实现类是StreamGraph。这里先介绍2个概念

      • StreamNode:streaming流中的一个节点,代表对应的算子
      • StreamEdge:Graph中的边,来连接上下游的StreamNode

        如上图所示,圆形为StreamNode,箭头为StreamEdge,这样通过这2者就可以构建一个StreamGraph了。

        StreamGraph是最原始的Graph,而其中会做一些优化生成JobGraph,最后会生成待执行的ExecutionGraph,这里我们先介绍下基础概念,后面会深入介绍相关的内容。

      • JobGraph: 优化后的StreamGraph,具体做的优化就是把相连的算子,如果支持chaining的,合并到一个StreamNode;
      • ExecutionGraph: 和JobGraph结构一致

      StreamGraph

      下面我们来看看StreamGraph的主要属性和方法,以及如何从DataStream转换为StreamGraph的。

      属性和方法

      重要属性如下(这里只介绍与生成图相关的属性,还有一些如状态,存储类的后面介绍)

      属性说明
      Map<Integer, StreamNode> streamNodesStreamNode数据,kv格式,key为Transformation的id
      Set sourcesStreamGraph的所有source集合,存储的是Transformation的id
      Set sinksStreamGraph的sink集合

      说明:StreamGraph只记录了StreamNode的信息,StreamEdge的信息是记录在StreamNode中的。如下2个属性记录了StreamNode的输入Edge和输出Edge

          //StreamNode.Java
          private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
          private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
      

      主要方法

      方法说明
      addSource()添加source节点
      addSink()添加sink节点
      addOperator()添加算子节点
      addVirtualSideOandroidutputNode()添加一个虚拟的siteOutput节点

      StreamGraph生成

      下面我们来看看DataStream是如何生成StreamGraph的。通过前面对DataStream的分析可知,DataStream的前后依赖关系是通过Transformation来存储的,这里StreamExecutionEnvironment有个transformations记录了所有的Transformation

          //StreamExecutionEnvironment.java
          List<Transformation<?>> transformations
      

      这里的数据是在DataStream进行转换处理生成了新的Transformation,同时会把该实例添加到transformations里面,使用的是如下方法

          getExecutionEnvironment().addOperator(resultTransform);
      

      而具体转换为StreamGraph是通过StreamExecutionEnvironment的 getStreamGraph()方法。最终转换的逻辑是通过StreamGraphGenerator类来实现。

      这里要介绍一个新的类体系TransformationTranslator,有各种的子类来转换对应类型的Transformation。这里有定义了2个方法分别支持转换Streaming和BATch。

       //TransformationTranslator.java
       Collection<Integer> translateForBatch(final T transformation, final Context context);
       Collection<Integer> translateForStreaming(final T transformation, final Context context);
      

      对应的映射关系存储在StreamGraphGenerator类的translatorMap中。

      //StreamGraphGenerator.java 
      private static final Map<
                          Class<? extends Transformation>,
                          TransformationTranslator<?, ? extends Transformation>>
                  translatorMap;
      static {
              @SuppressWarnings("rawtypes")
              Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                      tmp = new HashMap<>();
              tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
              tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
              tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
              tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
              tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
              tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
          ...
      

      下面我们通过OneInputTransformationTranslator为例来看看是如何进行转换的。具体逻辑如下

       //调用awww.devze.comddOperator添加StreamNode
       streamGraph.addOperator(
                      transformationId,
                      slotSharingGroup,
                      transformation.getCoLocationGroupKey(),
                      operatorFactory,
                      inputType,
                      transformation.getOutputType(),
                      transformation.getName());
      //获取上游依赖的transformations,然后添加边
              final List<Transformation<?>> parentTransformations = transformation.getInputs();
              for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
                  streamGraph.addEdge(inputId, transformationId, 0);
              }
      

      除了添加节点和边外,还有一些如设置节点的并行度等操作,这块大家可以去看看具体的代码。

      这样当把所有的Transformation都转换完,这样StreamGraph就生成好了。

      JobGraph

      有了StreamGraph,为什么还需要一个JobGraph呢,这个和Spark中的Stage类似,如果有多个算子能够合并到一起处理,那这样性能可以提高很多。所以这里 根据一定的规则进行,先我们介绍相关的类

      • JobVertex:job的顶点,即对应的计算逻辑(这里用的是Vertex, 而前面用的是Node,有点差异),通过inputs记录了所有来源的Edge,而输出是ArrayList来记录
      • JobEdge: job的边,记录了源Vertex和咪表Vertex.
      • IntermediateDataSet: 定义了一个中间数据集,但并没有存储,只是记录了一个Producer(JobVertex)和一个Consumer(JobEdge)

        主要的概念就这些,下面我们看看JobGraph的结构以及如何从StreamGraph转换为JobGraph

      属性和方法

      JobGraph的属性主要是通过Map<JobVertexID, JobVertex> taskVertices记录了JobVertex的信息。

      另外这个JobGraph是提交到集群去执行的,所以会有一些执行相关的信息,相关的如下:

          private JobID jobID;
          private final String jobName;
          private SerializedValue<ExecutionConfig> serializedExecutionConfig;
          /** Set of JAR files required to run this job. */
          private final List<Path> userJars = new ArrayList<Path>();
           /** Set of custom files required to run this job. */
          private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts =
                  new HashMap<>();
          /** Set of blob keys identifying the JAR files required to run this job. */
          private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
          /** List of classpaths required to run this job. */
          private List<URL> classpaths = Collections.emptyList();
      

      而相关的方法主要是

      方法说明
      addVertex()添加顶点
      getVertices()获取顶点

      而如何从StreamGraph转换到JobGraph这块的内容还是比较多,这块后续我们单独开一篇来介绍

      总结

      本篇从0开始介绍了DataStream的相关内容,并深入介绍了DataStream、Transformation、StreamOperator和Function之间的关系。另外介绍了streaming flow转换为提交执行的StreamGraph的过程及StreamGraph的存储结构。而从StreamGraph->JobGraph->ExecutionGraph这块涉及的内容也较多,且还涉及到提交部署的内容,这块后面单独来介绍。最后本篇介绍的DataStream只是介绍了最基础的计算框架,没有涉及到flink的streaming flow中的时间、状态、window等内容,更多关于Flink DataStream基础的资料请关注我们其它相关文章!

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      关注公众号