从官网说起

-yn或者--yarncontainer参数也不生效了。并行度和任务槽
并行度(Parallelism)
什么是并行度(Parallelism)
如何设置并行度
• 算子级别 dataStream.map(new CustomerMapFunction()).setParallelism(1);• 执行环境级别 streamExecutionEnvironment.setParallelism(1);• 客户端级别 bin/flink -run --parallelism 1 demo.jar• 配置文件级别可以在 flink-conf.yaml配置文件中设置并行度:parallelism.default: 1
任务槽(Task Slot)
什么是任务槽(Task Slot)
如何设置任务槽
Task Slot时可以将它理解成一个TaskManager可以利用的CPU核心数,因此也要根据实际情况(集群的CPU资源和作业的计算量)来确定。在Flink程序中,有2中方式可以设置任务槽。• 如果我们用 flink run来提交任务时,可以通过-ys或者--yarnslots参数来指定task slot 数量;• 另外一种方式是通过 flink-conf.yaml配置文件来设置:taskManager.numberOfTaskSlots = 1
如何确定TaskManager数量
// 设置执行环境并行度为6
env.setParallelism(6);
// 设置Source并行度为1
DataStreamtext = env.readTextFile("/Users/casey/demo.txt").setParallelism(1);
DataStream> counts = text.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
--yarnslots 3,即每个TaskManager分配3个Slot。TaskManager、Slot和task的分布将如下图所示:
Flink on Yarn内存计算
flink run -m yarn-cluster -ys 2 -p 1 -yjm 1G -ytm 2G -c com.example.FlinkDemo -ynm test_task testFlink.jar
• -m:指定flink运行模式• -ys:指定TaskManager的slot数量• -p:指定并行度• -yjm:设置jobManager内存• -ytm:设置taskManager内存• -c:指定主类名• -ynm:设置flink任务名称
• jobManager个数 = 1 • taskManager个数 = p / ys + 1 向上取整 • yarn的vcore个数=slot个数+1 • yarn的container个数 = taskManager个数 + jobManager个数 • yarn内存数 = jobManager个数 * yjm + taskManager个数 * ytm
ys=2,p=1,yjm=1,ytm=1。
yarn内存数 = jobManager个数 * yjm + taskManager个数 * ytm = 1*1 + (1/1+1)*1 = 3G
往期推荐