您的位置:首页 > 博客中心 > 前端开发 >

flink解析canal-json数据

时间:2022-04-11 14:43

引入依赖

    <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.33</version>
        </dependency>

  

val env = StreamExecutionEnvironment.getExecutionEnvironment
    println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行")
    val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args))
    println(begin_date)

    //添加kakka数据源

    val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps())
      .setStartFromEarliest())  //设置消费kafka位置
      .map(JSON.parseObject(_))
      .filter(_.get("table")=="epidemic_report")
      .filter(_.get("type").toString.matches("(INSERT|UPDATE)"))
      .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass))
//      .filter(_.getSet_id=="1")
      .filter(_.getCreat_time > begin_date)

 

本类排行

今日推荐

热门手游