来源:安瑞哥是码农
之所以想到要写这篇文章呢,因为上篇文章写过Doris 通过Flink CDC 只需要一个简单的命令,就能同步整个mysql库的所有表,其中这个多表的同步过程,就使用到了Flink的侧流输出功能。
所谓侧流输出(也叫旁路输出),指的是通过对读取的数据源(DataStream),根据不同的业务规则进行条件筛选,然后根据筛选后生成的不同数据分支,做不同的业务目的输出(比如写到不同的数据目的地中)。
看到Flink官网的如此描述后,我就在想,印象中 Spark 是不是不具备这个功能?
为此,我还专门回过头去折腾了下 Spark 的 structured stream,试图根据两个不同的数据处理逻辑,得到两股不同的流,用两股流分别做输出,伪代码如下:
val rawDF = spark.readStream //获取数据源
.format("kafka") //确定数据源的来源格式
.option("kafka.bootstrap.servers", "xxxxx")
.option(....)
.load()
//第1个数据流
val df01 = rawDF.map(//第1个处理逻辑)
//第2个数据流
val df02 = rawDF.map(//第2个处理逻辑)
//第1个数据流输出
df01.writeStream
.outputMode(OutputMode.Append())
.format(//第1个输出)
.option("checkpointLocation","xxxx")
.start()
.awaitTermination()
//第2个数据流输出
df02.writeStream
.outputMode(OutputMode.Append())
.format(//第2个输出)
.option("checkpointLocation","xxxx")
.start()
.awaitTermination()
你们猜,这个代码能运行起来吗?
其实可以,整个过程不会报错,但是第2个数据流,永远都不会输出。
虽然Spark官网没有明确说明,一个spark流式任务中,是否能做多个外部输出,但是通过实际的动手验证发现:不行。
所以目前来看,对比Spark,侧输出流是Flink的专属,而且是DataStream API的专属(SQL API 不具备)。

那么今天这篇内容,咱就来聊聊Flink的这个侧流输出能力,到底实际用起来怎么样?会遇到哪些坑?
0. 准备工作
侧输出流本身是 Flink 内置的功能,所以使用它,不需要跟Flink CDC 一样引入额外的pom依赖,直接编码实现就可以。
官方文档给的示例非常的简单,只演示了一个侧流跟一个主流的情况,但是从它的描述来看,使用的时候,其实是可以根据你的业务要求,切分出任意多个侧输出流的。

那为了尽可能测试该功能,我准备从kafka数据源中,以 client_ip 这个字段值的最后一个数字作为区分,分成4股不同的流,对每股流的输出计划如下:
1. 主输出流:写入到kafka的一个topic中;
2. 侧输出流01:写入到Doris表中;
3. 侧输出流02:直接打印到屏幕输出;
4. 侧输出流03:写入到hdfs文件系统中。
因为官网提到不同的流输出,可以支持不同的数据类型,所以我也决定不同的数据流,用不同的数据类型来表示。
废话少唠,直接上代码。
1. 完整编码
根据官方文档的示例,带有详细注释的编码如下:
package com.anryg.side_output
import java.time.Duration
import java.util.Properties
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.doris.flink.cfg.{DorisExecutionOptions, DorisOptions, DorisReadOptions}
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.sink.writer.SimpleStringSerializer
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.configuration.MemorySize
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.{OutputFileConfig, StreamingFileSink}
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
/**
* @DESC: Flik侧输出流案例测试
* @Auther: Anryg
* @Date: 2023/11/15 10:11
*/
object FlinkSideoutTest01 {
private final val hdfsPrefix = "hdfs://192.168.211.106:8020" //HDFS路径前缀
/**
* @DESC: 定义hdfs输出方式
* */
private def hdfsSink[Out](path:String): StreamingFileSink[Out] ={
val hdfsSink = StreamingFileSink.forRowFormat(new Path(hdfsPrefix + path),
new SimpleStringEncoder[Out]("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd")) /**默认基于时间分配器,以小时为单位进行切分yyyy-MM-dd--HH*/
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("kafka").withPartSuffix(".csv").build()) //设置生成文件的配置策略
.withRollingPolicy( //设置文件的滚动策略,也就是分文件策略,也可以同时设置文件的命名规则,这里暂时用默认
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(10)) //文件滚动间隔,设为10分钟,即每10分钟生成一个新文件
.withInactivityInterval(Duration.ofSeconds(10)) //空闲间隔时间,也就是当前文件有多久没有写入数据,则进行滚动
.withMaxPartSize(MemorySize.ofMebiBytes(1024)) //单个文件的最大文件大小,设置为1G
.build())
.build()
hdfsSink
}
/**
* @DESC: 定义Doris的输出方式
* */
private def dorisSink[Sting](label:String,table:String): DorisSink[String] ={
/**配置Doris相关参数*/
val dorisBuilder = DorisOptions.builder
dorisBuilder.setFenodes("192.168.221.173:8030")
.setTableIdentifier(table) //比如:example_db.dns_logs_from_flink01
.setUsername("root")
.setPassword("")
/**确定数据写入到Doris的方式,即stream load方式*/
val executionBuilder = DorisExecutionOptions.builder
executionBuilder.setLabelPrefix(label) //streamload label prefix
/**添加额外的数据格式设置,否则写不进去*/
val properties = new Properties
properties.setProperty("format", "json")
properties.setProperty("read_json_by_line", "true")
executionBuilder.setStreamLoadProp(properties)
val builder = DorisSink.builder[String] //注意这个数据类型String 需要加上
builder.setDorisReadOptions(DorisReadOptions.builder.build) //确定数据读取策略
.setDorisExecutionOptions(executionBuilder.build) //确定数据执行策略
.setSerializer(new SimpleStringSerializer()) //确定数据数据序列化(写入)类型
.setDorisOptions(dorisBuilder.build) //添加Doris配置
.build()
builder.build()
}
/**
* @DESC: 定义kafka的输出方式
* */
private def kafkaSink(topic:String): KafkaSink[String] ={
KafkaSink.builder[String]()
.setBootstrapServers("192.168.211.107:6667")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000, CheckpointingMode.AT_LEAST_ONCE)
//env.setParallelism(args(0).toInt)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)) //新的设置state backend的方式
env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FLinkSideoutTest01") //设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略
val kafkaSource = KafkaSource.builder() //获取kafka数据源
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("SideoutTest01")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
import org.apache.flink.streaming.api.scala._ //引入隐私转换函数
val kafkaDS = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks()
,"kafka-data") //读取数据源生成DataStream对象
val rawDS = kafkaDS.map(line => {
val rawJson = JSON.parseObject(line) //原始string是一个json,对其进行解析
val message = rawJson.getString("message") //获取业务数据部分
val msgArray = message.split(",") //指定分隔符进行字段切分
msgArray
}).filter(_.length == 9) //只留字段数为9的数据
.filter(array => ! array(0).trim.equals(""))
.map(array => (array(0),array(1).toLowerCase,array(2),array(3),array(4),array(5),array(6).toLowerCase(),array(7),array(8))) //将其转化成为元组,为了方便下一步赋予schema
/**定义第1个侧输出流的标签*/
val outputTag01 = OutputTag[(String,String,String,String,String,String,String,String,String)]("side-output01")
/**定义第2个侧输出流的标签*/
val outputTag02 = OutputTag[String]("side-output02")
/**定义第3个侧输出流的标签*/
val outputTag03 = OutputTag[(String,String,String)]("side-output03")
val mainDS = rawDS.process(new ProcessFunction[(String,String,String,String,String,String,String,String,String),String] {
override def processElement(value: (String, String, String, String, String, String, String, String, String), ctx: ProcessFunction[(String, String, String, String, String, String, String, String, String), String]#Context, out: Collector[String]): Unit = {
val sufix = value._1.substring(value._1.length - 1) //拿到client_ip的最后一个数字,然后根据其数字来进行数据流分组
if (sufix.equals("1") || sufix.equals("2")) {
out.collect(value._1) /**主输出流*/
}
else if (sufix.equals("3") || sufix.equals("4") || sufix.equals("5")){
ctx.output(outputTag01, value) /**侧输出流01*/
}
else if (sufix.equals("6") || sufix.equals("7") || sufix.equals("8")) {
ctx.output(outputTag02, value._2) /**侧输出流02*/
}
else {
val out03 = (value._1, value._2, value._3)
ctx.output(outputTag03, out03) /**侧输出流03*/
}
}
})
/**获取侧输出流01*/
val sideDS01 = mainDS.getSideOutput(outputTag01)
/**获取侧输出流02*/
val sideDS02 = mainDS.getSideOutput(outputTag02)
/**获取侧输出流03*/
val sideDS03 = mainDS.getSideOutput(outputTag03)
/**将主输出流输出到kafka*/
mainDS.sinkTo(kafkaSink("example_topic"))
/**将第1个侧流输出到Doris表*/
sideDS01.map(array => {
val json = new JSONObject(9) //将数据封装为json对象
json.put("client_ip", array._1)
json.put("domain",array._2)
json.put("time",array._3)
json.put("target_ip",array._4)
json.put("rcode",array._5)
json.put("query_type",array._6)
json.put("authority_record",array._7)
json.put("add_msg",array._8)
json.put("dns_ip",array._9)
json.toJSONString //转换为json string
}).sinkTo(dorisSink("side_ouput01", "example_db.flink_side_stream01"))
/**将第2个侧流直接打印出来*/
sideDS02.print("side stream02: ")
/**将第3个侧流输出到HDFS的CSV文件*/
sideDS03.addSink(hdfsSink[(String,String,String)]("/tmp/flink_side-stream03"))
env.execute("FlinkSideoutTest01")
}
}
从代码中可以看出来,整个编码逻辑还是非常清晰的。
只不过对于Flink来说,不同的输出对象,不管是kafka、Doris、还是HDFS,每一个Sink对象的API风格都不一样,没有统一的格式与套路,相比之下,Spark则会更规整一些。
而且随着 Flink 版本的不同,相同功能的 API 写法还不一样,就还挺难受的。
2. 运行结果
整个编码过程虽然看起来内容比较多,而且涉及到多个不同的Sink对象,但是编写完却出奇的顺利(居然没有发现坑)。
一跑就通,出奇的丝滑,简直 Amazing...
最关键是4个输出都符合预期。
先看主输出流(写kafka):

再看第1个侧输出流(写Doris表):

然后看第2个侧输出流(直接控制台输出):

最后看第3个侧输出流(写HDFS):

总结
虽然说根据官网文档的要求,整个编码过程很顺利,没有遇到明显的让程序跑不起来的坑。
但是,这里还是有个必须要注意的点,那就是在实践中,会有这么个问题,那就是,但凡有任何一个流在sink的时候发生了异常,程序默认是不会退出的。
也就是说,程序在运行过程中有没有问题,除非你去观察日志,否则即便你不对代码逻辑进行try...catch,程序本身是会忽略这些异常继续跑的。
这就需要开发者在编码时,确定多个流之间的关系,是相互不影响呢,还是要作为一个「事务」而捆绑在一起。