Flink 的侧输出流,怎么玩?

来源:安瑞哥是码农

之所以想到要写这篇文章呢,因为上篇文章写过Doris 通过Flink CDC 只需要一个简单的命令,就能同步整个mysql库的所有表,其中这个多表的同步过程,就使用到了Flink的侧流输出功能。


所谓侧流输出(也叫旁路输出),指的是通过对读取的数据源(DataStream),根据不同的业务规则进行条件筛选,然后根据筛选后生成的不同数据分支,做不同的业务目的输出(比如写到不同的数据目的地中)。


看到Flink官网的如此描述后,我就在想,印象中 Spark 是不是不具备这个功能?


为此,我还专门回过头去折腾了下 Spark 的 structured stream,试图根据两个不同的数据处理逻辑,得到两股不同的流,用两股流分别做输出,伪代码如下:


val rawDF = spark.readStream //获取数据源
            .format("kafka"//确定数据源的来源格式
            .option("kafka.bootstrap.servers""xxxxx")
            .option(....)
            .load()
//第1个数据流            
val df01 = rawDF.map(//第1个处理逻辑)

//第2个数据流 
val df02 = rawDF.map(//第2个处理逻辑)

//第1个数据流输出
df01.writeStream
    .outputMode(OutputMode.Append()) 
    .format(//第1个输出)
    .option("checkpointLocation","xxxx")
    .start()
    .awaitTermination()
    
//第2个数据流输出     
df02.writeStream
    .outputMode(OutputMode.Append()) 
    .format(//第2个输出)
    .option("checkpointLocation","xxxx")
    .start()
    .awaitTermination()    

你们猜,这个代码能运行起来吗?


其实可以,整个过程不会报错,但是第2个数据流,永远都不会输出。


虽然Spark官网没有明确说明,一个spark流式任务中,是否能做多个外部输出,但是通过实际的动手验证发现:不行。


所以目前来看,对比Spark,侧输出流是Flink的专属,而且是DataStream API的专属(SQL API 不具备)。



那么今天这篇内容,咱就来聊聊Flink的这个侧流输出能力,到底实际用起来怎么样?会遇到哪些坑?



0. 准备工作


侧输出流本身是 Flink 内置的功能,所以使用它,不需要跟Flink CDC 一样引入额外的pom依赖,直接编码实现就可以。


官方文档给的示例非常的简单,只演示了一个侧流跟一个主流的情况,但是从它的描述来看,使用的时候,其实是可以根据你的业务要求,切分出任意多个侧输出流的。


那为了尽可能测试该功能,我准备从kafka数据源中,以 client_ip 这个字段值的最后一个数字作为区分,分成4股不同的流,对每股流的输出计划如下:


1. 主输出流:写入到kafka的一个topic中;


2. 侧输出流01:写入到Doris表中;


3. 侧输出流02:直接打印到屏幕输出;


4. 侧输出流03:写入到hdfs文件系统中。


因为官网提到不同的流输出,可以支持不同的数据类型,所以我也决定不同的数据流,用不同的数据类型来表示。


废话少唠,直接上代码。



1. 完整编码


根据官方文档的示例,带有详细注释的编码如下:


package com.anryg.side_output

import java.time.Duration
import java.util.Properties

import com.alibaba.fastjson.{JSONJSONObject}
import org.apache.doris.flink.cfg.{DorisExecutionOptionsDorisOptionsDorisReadOptions}
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.sink.writer.SimpleStringSerializer
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.{SimpleStringEncoderSimpleStringSchema}
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchemaKafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.{OutputFileConfigStreamingFileSink}
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector


/**
  * @DESC: Flik侧输出流案例测试
  * @Auther: Anryg
  * @Date: 2023/11/15 10:11
  */

object FlinkSideoutTest01 {

    private final val hdfsPrefix = "hdfs://192.168.211.106:8020" //HDFS路径前缀

    /**
      * @DESC: 定义hdfs输出方式
      * */

    private def hdfsSink[Out](path:String): StreamingFileSink[Out] ={
        val hdfsSink = StreamingFileSink.forRowFormat(new Path(hdfsPrefix + path),
            new SimpleStringEncoder[Out]("UTF-8"))
            .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd")) /**默认基于时间分配器,以小时为单位进行切分yyyy-MM-dd--HH*/
            .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("kafka").withPartSuffix(".csv").build()) //设置生成文件的配置策略
            .withRollingPolicy( //设置文件的滚动策略,也就是分文件策略,也可以同时设置文件的命名规则,这里暂时用默认
            DefaultRollingPolicy.builder()
                .withRolloverInterval(Duration.ofSeconds(10)) //文件滚动间隔,设为10分钟,即每10分钟生成一个新文件
                .withInactivityInterval(Duration.ofSeconds(10)) //空闲间隔时间,也就是当前文件有多久没有写入数据,则进行滚动
                .withMaxPartSize(MemorySize.ofMebiBytes(1024)) //单个文件的最大文件大小,设置为1G
                .build())
            .build()

        hdfsSink
    }

    /**
      * @DESC: 定义Doris的输出方式
      * */

    private def dorisSink[Sting](label:String,table:String): DorisSink[String] ={
        /**配置Doris相关参数*/
        val dorisBuilder = DorisOptions.builder
        dorisBuilder.setFenodes("192.168.221.173:8030")
            .setTableIdentifier(table) //比如:example_db.dns_logs_from_flink01
            .setUsername("root")
            .setPassword("")
        /**确定数据写入到Doris的方式,即stream load方式*/
        val executionBuilder = DorisExecutionOptions.builder
        executionBuilder.setLabelPrefix(label) //streamload label prefix

        /**添加额外的数据格式设置,否则写不进去*/
        val properties = new Properties
        properties.setProperty("format""json")
        properties.setProperty("read_json_by_line""true")

        executionBuilder.setStreamLoadProp(properties)

        val builder = DorisSink.builder[String//注意这个数据类型String 需要加上
        builder.setDorisReadOptions(DorisReadOptions.builder.build) //确定数据读取策略
            .setDorisExecutionOptions(executionBuilder.build) //确定数据执行策略
            .setSerializer(new SimpleStringSerializer())  //确定数据数据序列化(写入)类型
            .setDorisOptions(dorisBuilder.build) //添加Doris配置
            .build()
        builder.build()
    }

    /**
      * @DESC: 定义kafka的输出方式
      * */

    private def kafkaSink(topic:String): KafkaSink[String] ={
         KafkaSink.builder[String]()
             .setBootstrapServers("192.168.211.107:6667")
             .setRecordSerializer(
                 KafkaRecordSerializationSchema.builder()
                 .setTopic(topic)
                 .setValueSerializationSchema(new SimpleStringSchema())
                 .build()
             )
             .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
             .build()
     }


    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        env.enableCheckpointing(10000CheckpointingMode.AT_LEAST_ONCE)
        //env.setParallelism(args(0).toInt)

        env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
        env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FLinkSideoutTest01"//设置checkpoint的hdfs目录
        env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION//设置checkpoint记录的保留策略

        val kafkaSource = KafkaSource.builder()  //获取kafka数据源
            .setBootstrapServers("192.168.211.107:6667")
            .setTopics("test")
            .setGroupId("SideoutTest01")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build()

        import org.apache.flink.streaming.api.scala._  //引入隐私转换函数

        val kafkaDS = env.fromSource(kafkaSource,
            WatermarkStrategy.noWatermarks()
            ,"kafka-data"//读取数据源生成DataStream对象

        val rawDS = kafkaDS.map(line => {
                        val rawJson = JSON.parseObject(line)      //原始string是一个json,对其进行解析
                        val message = rawJson.getString("message")  //获取业务数据部分
                        val msgArray = message.split(",")  //指定分隔符进行字段切分
                        msgArray
                    }).filter(_.length == 9//只留字段数为9的数据
                    .filter(array => ! array(0).trim.equals(""))
                    .map(array => (array(0),array(1).toLowerCase,array(2),array(3),array(4),array(5),array(6).toLowerCase(),array(7),array(8))) //将其转化成为元组,为了方便下一步赋予schema

        /**定义第1个侧输出流的标签*/
        val outputTag01 = OutputTag[(String,String,String,String,String,String,String,String,String)]("side-output01")

        /**定义第2个侧输出流的标签*/
        val outputTag02 = OutputTag[String]("side-output02")

        /**定义第3个侧输出流的标签*/
        val outputTag03 = OutputTag[(String,String,String)]("side-output03")

        val mainDS = rawDS.process(new ProcessFunction[(String,String,String,String,String,String,String,String,String),String] {
                        override def processElement(value: (StringStringStringStringStringStringStringStringString), ctx: ProcessFunction[(StringStringStringStringStringStringStringStringString), String]#Context, out: Collector[String]): Unit = {
                            val sufix = value._1.substring(value._1.length - 1//拿到client_ip的最后一个数字,然后根据其数字来进行数据流分组


                            if (sufix.equals("1") || sufix.equals("2")) {
                                out.collect(value._1)  /**主输出流*/
                            }
                            else if (sufix.equals("3") || sufix.equals("4") || sufix.equals("5")){
                                ctx.output(outputTag01, value) /**侧输出流01*/
                            }
                            else if (sufix.equals("6") || sufix.equals("7") || sufix.equals("8")) {
                                ctx.output(outputTag02, value._2)  /**侧输出流02*/
                            }
                            else {
                                val out03 = (value._1, value._2, value._3)
                                ctx.output(outputTag03, out03)  /**侧输出流03*/
                            }
                        }
                    })

        /**获取侧输出流01*/
        val sideDS01 = mainDS.getSideOutput(outputTag01)

        /**获取侧输出流02*/
        val sideDS02 = mainDS.getSideOutput(outputTag02)

        /**获取侧输出流03*/
        val sideDS03 = mainDS.getSideOutput(outputTag03)

        /**将主输出流输出到kafka*/
        mainDS.sinkTo(kafkaSink("example_topic"))

        /**将第1个侧流输出到Doris表*/
        sideDS01.map(array => {
            val json = new JSONObject(9)  //将数据封装为json对象
            json.put("client_ip", array._1)
            json.put("domain",array._2)
            json.put("time",array._3)
            json.put("target_ip",array._4)
            json.put("rcode",array._5)
            json.put("query_type",array._6)
            json.put("authority_record",array._7)
            json.put("add_msg",array._8)
            json.put("dns_ip",array._9)
            json.toJSONString //转换为json string
        }).sinkTo(dorisSink("side_ouput01""example_db.flink_side_stream01"))


        /**将第2个侧流直接打印出来*/
        sideDS02.print("side stream02: ")

        /**将第3个侧流输出到HDFS的CSV文件*/
        sideDS03.addSink(hdfsSink[(String,String,String)]("/tmp/flink_side-stream03"))
        
        env.execute("FlinkSideoutTest01")
    }
}

从代码中可以看出来,整个编码逻辑还是非常清晰的。


只不过对于Flink来说,不同的输出对象,不管是kafka、Doris、还是HDFS,每一个Sink对象的API风格都不一样,没有统一的格式与套路,相比之下,Spark则会更规整一些。


而且随着 Flink 版本的不同,相同功能的 API 写法还不一样,就还挺难受的。



2. 运行结果


整个编码过程虽然看起来内容比较多,而且涉及到多个不同的Sink对象但是编写完却出奇的顺利(居然没有发现坑)。


一跑就通,出奇的丝滑,简直 Amazing...


最关键是4个输出都符合预期。


先看主输出流(写kafka):



再看第1个侧输出流(写Doris表):



然后看第2个侧输出流(直接控制台输出):



最后看第3个侧输出流(写HDFS):




总结


虽然说根据官网文档的要求,整个编码过程很顺利,没有遇到明显的让程序跑不起来的坑。


但是,这里还是有个必须要注意的点,那就是在实践中,会有这么个问题,那就是,但凡有任何一个流在sink的时候发生了异常,程序默认是不会退出的


也就是说,程序在运行过程中有没有问题,除非你去观察日志,否则即便你不对代码逻辑进行try...catch,程序本身是会忽略这些异常继续跑的。


这就需要开发者在编码时,确定多个流之间的关系,是相互不影响呢,还是要作为一个「事务」而捆绑在一起。


请使用浏览器的分享功能分享到微信等