Flink确定TaskManager个数以及内存计算

从官网说起

我们先来看一段描述,以下摘抄自Apache Flink官方Release Note:
这段话的意思是:从flink1.5版本开始,Flink on Yarn模式下的容器数量,也就是TaskManager的数量将由程序的并行度自动推算,即使你在启动flink程序时主动设置了-yn或者--yarncontainer参数也不生效了。
那么具体是怎么推算出来的呢?我们先来回忆一下flink的并行度(Parallelism)和任务槽(Task Slot)。

并行度和任务槽

并行度(Parallelism)

什么是并行度(Parallelism)

一个Flink Job在生成执行计划时被划分成多个Task。Task可以由多线程并发执行,每个线程处理Task输入数据的一个子集。而并发的数量就称为并行度(Parallelism)。

如何设置并行度

在Flink程序中,有4种方式可以设置并行度。从低到高分别为:算子级别、执行环境级别、客户端级别、配置文件级别。实际执行时,优先级则是反过来的,算子级别最高。
  • • 算子级别dataStream.map(new CustomerMapFunction()).setParallelism(1);
  • • 执行环境级别streamExecutionEnvironment.setParallelism(1);
  • • 客户端级别bin/flink -run --parallelism 1 demo.jar
  • • 配置文件级别可以在flink-conf.yaml配置文件中设置并行度:parallelism.default: 1

任务槽(Task Slot)

什么是任务槽(Task Slot)

Flink运行时由两个组件组成:JobManager与TaskManager。JobManager和TaskManager本质上都是JVM进程,为了提高Flink程序的运行效率和资源利用率,Flink在TaskManager中实现了任务槽(Task Slot)。任务槽是Flink计算资源的基本单位,每个任务槽可以在同一时间执行一个Task,而TaskManager可以拥有一个或者多个任务槽。
任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,也就是说在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存,CPU资源不算在内。

如何设置任务槽

一般来说,我们设定Task Slot时可以将它理解成一个TaskManager可以利用的CPU核心数,因此也要根据实际情况(集群的CPU资源和作业的计算量)来确定。在Flink程序中,有2中方式可以设置任务槽。
  • • 如果我们用flink run来提交任务时,可以通过-ys或者--yarnslots参数来指定task slot 数量;
  • • 另外一种方式是通过flink-conf.yaml配置文件来设置:taskManager.numberOfTaskSlots = 1

如何确定TaskManager数量

了解了并行度和任务槽的概念以及如何设置之后,以Flink自带示例中的WordCount程序为例,我们来看下TaskManager数量是如何推算的:
// 设置执行环境并行度为6
env.setParallelism(6);
// 设置Source并行度为1
DataStream text = env.readTextFile("/Users/casey/demo.txt").setParallelism(1);
DataStream> counts = text.flatMap(new Tokenizer())
    .keyBy(0)
    .sum(1);
counts.print();
在执行flink程序时,我们指定--yarnslots 3,即每个TaskManager分配3个Slot。TaskManager、Slot和task的分布将如下图所示:
由上图很容易可以得出结论:TaskManager数量 = max(parallelism) / yarnslots(向上取整)。比如:一个并行度为6,每个TaskManager有3个任务槽为3的flink任务,就会启动2个TaskManager。

Flink on Yarn内存计算

最后我们来看一下Flink on Yarn模式的内存计算。我们一般用如下命令来提交Flink on Yarn任务:
flink run -m yarn-cluster -ys 2 -p 1 -yjm 1G -ytm 2G -c com.example.FlinkDemo -ynm test_task testFlink.jar
  • • -m :指定flink运行模式
  • • -ys:指定TaskManager的slot数量
  • • -p :指定并行度
  • • -yjm :设置jobManager内存
  • • -ytm :设置taskManager内存
  • • -c :指定主类名
  • • -ynm:设置flink任务名称
计算公式:
  • • jobManager个数 = 1
  • • taskManager个数 = p / ys + 1 向上取整
  • • yarn的vcore个数=slot个数+1
  • • yarn的container个数 = taskManager个数 + jobManager个数
  • • yarn内存数 = jobManager个数 * yjm + taskManager个数 * ytm
举个例子:
ys=2,p=1,yjm=1,ytm=1。
yarn内存数 = jobManager个数 * yjm + taskManager个数 * ytm = 1*1 + (1/1+1)*1 = 3G

往期推荐

Flink CDC Mysql 启动模式详解

本地提交Flink任务到远程 Yarn 集群

Flink CDC零代码实现数据同步实践

Flink CDC数据接入Apache Doris实践

请使用浏览器的分享功能分享到微信等