因为不懂 Flink on K8S 类加载机制,引发了线上事故。(建议收藏)

来源:3分钟秒懂大数据


大家好,我是土哥。

这几天写了一篇原创文章,在这里,给大家讲解一下 Flink on Yarn 和 Flink on K8S 的类加载机制的区别及应用。

1 Flink 类加载概述

Flink 任务运行的过程中包含三大部分,分别为客户端,JobManager,TaskManager 等,任务的生命周期包含多种状态,JVM 会随着不同阶段去加载不同的类,根据不同的类,我们可以将这些类分为 3 种类加载方式:

1. Java Classpath 类加载:

Java 通用的 classpath 类加载路径,包含了 JDK 库、Flink lib 目录(Apache Flink及相关依赖类)中的代码。这些统一通过 AppClassLoader 进行加载

2. Flink Plugin 组件类加载:

存放于 Flink 的 /plugins 目录中的插件代码。Flink 的插件机制确保在启动时对它们进行动态加载。

3. 动态用户代码类加载:

用户代码包含一些用户自定义函数和算子等,这些 JAR 会通过  FlinkUserCodeClassLoader 动态加载和卸载。

在 Flink中:

Flink AppClassLoader 和 FlinkUserCodeClassLoader 是两个不同的 ClassLoader。

Flink AppClassLoader 是 Flink 系统级别的 ClassLoader,负责加载 Flink 系统级别的类和库,例如 Flink 的核心库和所有的依赖库。它是由系统启动器创建并管理的,一般情况下,用户无需关心该 ClassLoader。

FlinkUserCodeClassLoader 是用户自定义代码所使用的 ClassLoader,它是在 Flink 的任务管理器(TaskManager)中为每个任务动态创建的,用于加载用户自定义函数和算子实现。FlinkUserCodeClassLoader 的目的是隔离用户代码,以确保用户自定义函数之间的互相隔离和保护。

2 Flink 类加载机制

要说起 Flink 类加载机制,先简单介绍一下 Java 的类加载机制。

在 JVM 中,一个类加载的过程大致分为加载、链接(验证、准备、解析)、初始化 5 个阶段。

而我们通常提到类的加载,就是指利用类加载器(ClassLoader)通过类的全限定名来获取定义此类的二进制字节码流,进而构造出类的定义。

我们先来看一个普通的类加载顺序

从上图可知,通常的类加载都是委托给最顶成的启动类进行加载。这种经典的类加载机制被称作双亲委派机制

所谓“双亲委派”,就是在类加载的过程中,一个类加载器在加载某个类时,会先委托给它的父类加载器进行加载。如果父类加载器无法加载成功,再由自己来加载该类。这样做的好处在于,保证 Java 虚拟机安全、稳定地运行,避免出现恶意代码污染系统。

双亲委派机制可以防止 Java 程序中出现重复的类,保障了上层的 ClassLoader不会出现被下层的 ClassLoader 覆盖的情况,从而保护了 Java 程序的安全性。

flink 同样提供了这样的加载器 ParentFirstClassLoader。

但 flink 作为一个分布式的计算引擎,经常会有一些第三方的 jar 需要被加载,全部委托给系统类加载不现实,如果仍然用双亲委派模型,就会因为 Flink 框架指定版本的类先加载,而出现莫名其妙的兼容性问题。如 NoSuchMethodError、IllegalAccessError 等。

Flink 实现了 ChildFirstClassLoader 类加载器并作为默认策略。它打破了双亲委派模型,使得用户代码的类先加载,官方文档中将这个操作称为 "Inverted Class Loading"

如下为 Flink 中类加载的继承图。

所以在 Flink 中,child-first 作为默认的类加载策略。

Flink 任务在运行过程中,经常会通过如下参数来更改类加载顺序。

3 Flink on K8S 与 on Yarn 的类加载机制

Flink on K8S 类加载机制 和 Flink on Yarn 的类加载机制不同。

1. Session 模式

无论使用 Yarn 还是 Kubernetes 环境:

当使用 Flink Session 集群启动时,JobManager 和 TaskManager 由 Java classpath 中的 Flink 框架类(Flink framework classes)进行启动加载。

当通过 session 提交(REST或命令行方式)的 job 或应用程序时,由 FlinkUserCodeClassLoader 进行加载。

上述两句话意味着在启动 Flink Session 集群之前,需要在 Java classpath 中设置正确的 Flink 框架类路径,以确保 JobManager 和 TaskManager 能够成功启动。

在提交 job 或应用程序时,需要提供正确的 Flink 用户代码和依赖项,并由 FlinkUserCodeClassLoader 加载它们。这个 ClassLoader 与 Flink 框架类 ClassLoader 是不同的,它专门用于加载用户代码和依赖项。

2. Per-job模式

目前,只有 Yarn 支持 Per-Job 模式。默认情况下,在 Per-Job 模式下运行 Flink 集群会将用户 jar(启动命令中指定的 JAR 文件以及 Flink 文件夹中的所有 JAR 文件usrlib)包含到系统类路径( AppClassLoader )中。

可以使用 yarn.classpath.include-user-jar 配置选项来控制此行为。当设置为时 DISABLED,Flink 会将用户 jar 包含在用户类路径中,并通过FlinkUserCodeClassLoader 动态加载它们。

3.Application 模式

在 K8S 环境中,当使用 Application 模式提交任务时,用户 jar 文件(启动命令指定的 jar 文件和 Flink 的 usrlib 目录中的 jar 包)会由 FlinkUserCodeClassLoader 进行动态加载

在 Yarn 环境中,不管是 PerJob 模式还是 Application 模式,用户的 jar 文件会包含在系统的 classpath,会由 AppClassLoader 进行加载。

可以通过设置 yarn.classpath.include-user-jar=DISABLED,Flink 会将用户 jar 文件含在用户的 classpath 中,并由 FlinkUserCodeClassLoader 进行动态加载。

4 案例介绍

下面通过生产环境中的真实案例来给大家分析一下,在 K8S 上运行所出现的问题。

4.1 问题描述

Flink 任务在 K8S 环境运行时,报 NoSuchMethodError 错误 。具体报错如下:

4.2 原因分析

该任务报错为 "java.lang.NoSuchMethodError",具体来说是调用了一个不存在的方法 "org.apache.commons.lang3.StringUtils.joinWith(Ljava/lang/String; 可能是因为缺少对应的依赖包或者版本不兼容导致的。

根据问题定位我们知道,Flink 任务在 K8S 环境运行时,通过 StringUtils 调用 joinWith 方法时,没有找到该方法。

StringUtils 类在 commons-lang3 包中,其中 commons-lang3 3.5 以下版本,StringUtils 类中没有 joinWith 方法,在 3.5 及以上版本,StringUtils 类中有 joinWith 方法。

首先,用户的 UDF 中,已经将 commons-lang3  3.12 版本打入依赖,StringUtils 类中有 joinWith 方法。K8S 集群中,Flink 引擎中的  commons-lang3 版本为 3.3.2 版本,StringUtils 类没有 joinWith 方法。所以,可以明确任务在 K8S 集群运行时,调用的 Flink 引擎的 commons-lang3 版本,而没有调用用户的 commons-lang3 版本。

4.3 具体原因

Flink on K8S 类加载机制 和 Flink on Yarn 的类加载机制不同。

在 K8S 环境中,当使用 Application 模式提交任务时,用户 jar 文件(启动命令指定的 jar 文件和 Flink 的 usrlib 目录中的 jar 包)会由 FlinkUserCodeClassLoader 进行动态加载。

在 Yarn 环境中,不管是 PerJob 模式还是 Application 模式,用户的 jar 文件会包含在系统的 classpath,会有 AppClassLoader 进行加载

可以通过设置 yarn.classpath.include-user-jar=DISABLED,Flink 会将用户 jar 文件含在用户的 classpath 中,并由 FlinkUserCodeClassLoader 进行动态加载。

上述内容涉及到第一章和第二章讲的两个知识点。

4.3.1 知识点 1

在 Flink中,Flink AppClassLoaderFlinkUserCodeClassLoader 是两个不同的 ClassLoader。

Flink AppClassLoader 是 Flink 系统级别的 ClassLoader,负责加载 Flink 系统级别的类和库,例如 Flink 的核心库和所有的依赖库。它是由系统启动器创建并管理的,一般情况下,用户无需关心该 ClassLoader。

FlinkUserCodeClassLoader 是用户自定义代码所使用的 ClassLoader,它是在 Flink 的任务管理器(TaskManager)中为每个任务动态创建的,用于加载用户自定义函数和算子实现。FlinkUserCodeClassLoader 的目的是隔离用户代码,以确保用户自定义函数之间的互相隔离和保护。

4.3.2 知识点 2

类加载参数  

1、classloader.resolve-order:parent-first

2、classloader.resolve-order:child-first

如果在 Flink 中将 classloader.resolve-order 设置为 parent-firt,

则 Flink AppClassLoader 将优先于 FlinkUserCodeClassLoader 进行类加载。

也就是说,当用户自定义代码中存在与系统级别库相同的类时,Flink AppClassLoader 将优先加载系统级别库中的类,而不是用户自定义代码中的类。

这种加载机制与传统的 Java 应用程序的类加载机制是相同的,即优先从父 ClassLoader 中加载类,如果父 ClassLoader 中不存在该类,则委托给子 ClassLoader 进行加载。

4.4 深度分析

在上述任务报错中,由于是 K8S 环境,当使用 Application 模式提交任务时,用户 jar 文件(启动命令指定的 jar 文件和 Flink 的 usrlib 目录中的 jar 包)会由 FlinkUserCodeClassLoader 进行动态加载。

但是由于设置了 classloader.resolve-orde 设置 parent-first ,尽管用户的 Flink UDF 已经将 commons-lang3 3.5 版本打入依赖,但是由于 Flink 引擎中的 commons-lang3 版本为 3.3.2 版本,因此在 FlinkUserCodeClassLoader 尝试加载用户自定义代码时,仍然会优先加载 Flink 引擎中的 commons-lang3 3.3.2 版本,而不是用户定义的 commons-lang3 3.12 版本

因为当用户自定义代码中存在与系统级别库相同的类时,Flink AppClassLoader 将优先加载系统级别库中的类,而不是用户自定义代码中的类。

因此,当用户的代码中使用了 Flink 引擎中不存在的方法时,如 StringUtils.joinWith 方法,就会出现 NoSuchMethodError 异常。

4.5 解决方案

1、修改 Flink 引擎中的 commons-lang3 版本,更新为 commons-lang3:jar:3.12 版本。

2、修改 K8S 环境,类加载机制,设置 classloader.resolve-order = child-first.(前提是 用户 jar 放在 FlinkUserCodeClassLoader 的 ClassPath 路径下)。

3、修改用户 UDF 中的 StringUtils.joinWith 方法,使用其他方法代替,这样可以避免任务报错。

5 总结

经过上述的知识点讲解和实际案例分析,相信各位小伙伴已经对 Flink 的类加载机制以及 Fink on Yarn 和 Flink on K8S 之间的加载机制有所收获,还想了解什么技术内容及生产环境遇到的问题,欢迎在土哥评论区进行回复,我会在空余时间出相关技术问题,感谢各位的支持~

请使用浏览器的分享功能分享到微信等