开发者

flink RichFunction之坑及解决

开发者 https://www.devze.com 2022-12-18 10:23 出处:网络 作者: jingxian
目录flink RichFunction之坑NO原因解决方案flink中richfunction的一点小作用①传递参数②传递广播变量总结flink RichFunction之坑
目录
  • flink RichFunction之坑
    • NO
    • 原因
    • 解决方案
  • flink中richfunction的一点小作用
    • ①传递参数
    • ②传递广播变量
  • 总结

    flink RichFunction之坑

    flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一个数据库连接。

    在有些情况下他会一直创建然后销毁,创建销毁。

    举例: 重点在第三行的注释

      val value = jsenv.socketTextStream("192.168.1编程客栈3.11", 9090)
        val value2 = value.filter(x => {
          try {
            var a = 1 / 0   //此处若没有异常处理,任务不会断,但是会重复打开数据库连接
          } catch {
            case e: Exception =>
          }
          isInter(x)
        }).map(fun = x => {
          x.toLong
        })
        val value1 = value2.assignTimestampsAndwatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
          override def extractTimestamp(element: Long): Long = {
            println(element + "***************")
            element
          }
        })
    
        try {
          var a = 1 / 0
        } catch {
          case e: Exception =>
        }
        value1.map(new mymap)
        env.execute("test")
    
      }
    
      def isInter(input: String): Boole开发者_Go培训an = {
        val matcher = Pattern.compile("^[0-9]+$").matcher(input)
        matcher.find()
      }
    }
    
    
    class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] {
      var conn: Connection = _
      var pst: PreparedStatement = _
    
      override def open(parameters: Configuration): Unit = {
        conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql")
        println(conn)
        pst = conn.prepareStatement("i编程客栈nsert into testa 编程客栈(str) values (?)")
      }
    
      override def close(): Unit = {
        conn.close()
        pst.close()
      }
    
      override def map(in: ListBuffer[String]): Unit = {
        pst.setString(1, in.head)
        pst.execute()
      }
    }
    

    所以你是不是觉得那就价格异常处理不就得了?

    NO

    再看:

    flink RichFunction之坑及解决

    这个时候,如果传进来line不是数字或者格式不对,就会触发异常,然而此时就不会像上面那样帮你解决问题,而是一遍遍创建对象销毁对象,一条消息创建一个连接,我就问你慌不慌,

    原因

    据观察是因为,输入的数据有问题,直接导致

     val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
          override def extractTimestamp(element: Long): Long = {
            println(element + "***************")
            element
          }
        })
    
    

    这个崩溃了,不走这行代码了,没有获得eventime,然后估计。。。 剩下的我也没详细测。。。

    解决方案

    先fiiter过滤任何可能导致异常的脏数据确保数据都没问题就可以了。 

    flink中richfunction的一点小作用

    ①传递参数

    所有需要用户定义的函数都可以转换成richfunction,例如实现map operator中你需要实现一个内部类,并实现它的map方法:

    data.map (new MapFunction<String, Integer>() {
      public Integer map(String value) { return Integer.parseInt(value); }
    });

    然后我们可以将其转换为RichMapFunction:

    dataDgcYOJfdUP.map (new RichMapFunction<String, Integer>() {
      public Integer map(String value) { return Integer.parseInt(value); }
    });

    当然,RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。

    下面我们来看看RichFuction中传递参数的例子,以下代码是测试RichFilterFuction的例子,基于DataSet而非DataStream。

    flink RichFunction之坑及解决

    由代码可见,可以将Configuration中的limit参数的值传递进RichFuction里面,通过后面withParameters方法传递进去,最后的结果是

    flink RichFunction之坑及解决

    由此可见,我从configuration中获取了limit的值,并设定了fliter的阈值是2,从而过滤了1,2。

    ②传递广播变量

    原理和上面差不多,下面我直接把代码贴出来:

    flink RichFunction之坑及解决

    这是目前我学习到的RichFunction的用法,和大家分享一下。

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号