Trino集群优化:Trino的容错执行配置

上篇介绍了 Trino 在处理 OOM 时,为我们提供的磁盘溢写功能,但官方并不推荐那种方式,今天就来介绍一下官方推荐的方式:容错执行 + Exchange manager。

默认情况下,如果 Trino 的一个节点在执行任务时资源不足,或者由于其他原因导致查询执行失败,那么这个查询就会失败,需要手动重新执行,查询运行的时间越长,查询执行失败的概率就越大。

容错执行是一种允许集群在查询失败时通过重新执行整个 query 或部分 task 来减轻失败影响的机制。当启用容错执行功能时,中间的交换数据会被暂存起来,这样 worker 节点在发生故障时,其他 worker 节点也可以重新使用这些数据。

?注意:

容错机制不适用于由于查询本身错误或用户操作错误导致的问题。例如,Trino不会浪费资源去重试一个因为SQL语法无法解析而失败的查询。简单来说,如果用户输入了错误的SQL语句,导致SQL无法解析,Trino 不会尝试去重试这个查询,而是直接报错。

容错配置

容错执行特性默认是禁用的。我们可以设置retry-policy 这个属性来启用容错执行特性,可选的值有:QUERY 和 TASK,比如:

retry-policy=QUERY

⚠️警告:

如果设置了重试策略(retry-policy),对于没有明确支持容错执行的 connectors 来说,在执行查询时会导致执行失败,并且显示错误信息:This connector does not support query retries。

以下这些 connectors 支持容错执行:

  • BigQuery connector
  • Delta Lake connector
  • Hive connector
  • Iceberg connector
  • MariaDB connector
  • MongoDB connector
  • MySQL connector
  • Oracle connector
  • PostgreSQL connector
  • Redshift connector
  • SQL Server connector

我们可以为容错执行设置以下属性:

属性名称 描述 默认值
retry-policy 重试策略,可选值:QUERY(重试整个查询) 和 TASK(重试失败的任务),详见retry policy NONE
exchange.deduplication-buffer-size 协调器(coordinator)使用一个内存缓冲区(in-memory buffer),这个缓冲区用于容错执行(fault-tolerant execution)过程中存储查询阶段(query stages)的输出结果。如果这个缓冲区在查询执行时满了,会导致执行失败,并输出错误信息:"Exchange manager must be configured for the failure recovery capabilities to be fully functional",除非配置了交换管理器exchange manager](https://trino.io/docs/current/admin/fault-tolerant-execution.html#fte-exchange-manager)。 32MB
fault-tolerant-execution.exchange-encryption-enabled 启用对暂存数据的加密,如果 Trino 处理的是敏感数据,建议设置为 True。详见Encryption true

更多相关的属性设置可以参考Properties reference,尤其是 Resource management properties 和 Exchange properties。

重试策略

retry-policy属性决定了在查询失败时执行整个 query 还是查询中特定的 task。

QUERY

如果重试策略设置为QUERY,在 worker 节点发生错误时,Trino 会自动重试整个 query,当 Trino 集群包含大量小查询时,建议使用QUERY这种重试策略。

默认情况下,对于执行结果不超过32MB的查询来说,Trino并不会实现容错。比如,如果一个SELECT语句返回给用户的数据集非常大,就可能超出这个限制,我们可以修改exchange.deduplication-buffer-size这个属性来增加这个限制的大小,使其超过默认的32MB,但是这样会在 coordinator 节点上消耗更多的内存。

为了使查询能够对具有较大结果集的查询执行容错处理,强烈建议配置一个交换管理器:exchange manager,该管理器使用外部存储来处理中间数据存储,可以在内存不足时,将数据溢出到外部存储。

TASK

如果重试策略设置为TASK,在查询执行失败时,Trino会重新尝试执行这个失败的 task,而不是整个查询。使用这个策略时,必须配置交换管理器:exchange manager。当执行大量的批处理查询时,建议使用TASK重试策略,集群可以针对查询中的小任务进行更有效的重试,而不是当一个任务失败时就需要重试整个查询,这样做可以节省资源并提高效率。

当重试策略设置为TASK时,一些相关的配置属性的默认值会被自动更改,以适应容错集群的需求。这种自动更改不会影响那些已经手动设置过这些属性的集群。如果你已经在config.properties中手动配置了某些属性,并且重试策略设置为TASK,强烈建议设置task.low-memory-killer.policy的值为:total-reservation-on-blocked-nodes,以避免在内存不足时需要手动终止查询。

?温馨提示:

TASK重试策略最适合用于处理大量的批查询,但是对于高频执行的短查询,可能会导致比较大的延迟。

最佳实践是,建议运行一个专门的集群,该集群使用TASK重试策略来处理大量的批处理。这个集群应该与处理大量小查询的集群独立开来。

加密

Trino在将数据暂存之前会对数据进行加密。这种加密措施防止了除写入数据的Trino集群之外的任何人访问查询数据,包括存储系统的管理员。对于每一个查询,Trino都会随机生成一个新的加密密钥,并且在查询完成后,这个密钥会被丢弃。

这种做法确保了每次查询的加密都是独立的,进一步增强了安全性,因为即使某个密钥被泄露,它也只能解密对应的那次查询的数据,而不会影响到其他查询的数据安全。

高级配置

你可以使用以下列出的配置属性来进一步配置容错执行。这些属性的默认值应该适用于大多数部署场景,但如果需要进行测试或故障排查,你可以修改这些值。

Retry limits

以下配置决定了在什么情况下,如果查询或任务连续失败,系统将不再尝试重新执行它们。这些配置属性通常用于避免在失败的情况下无休止地重试,从而节省资源并防止潜在的系统过载。

属性名称 描述 默认值 重试策略
query-retry-attempts 最大重试次数 4 仅针对 QUERY 策略
task-retry-attempts-per-task 每个Task最大重试次数 4 仅针对 TASK 策略
retry-initial-delay 查询失败后重试之前最小等待时间 10s QUERY 和 TASK
retry-max-delay 查询失败后重试之前最大等待时间,每次连续失败之后,等待时间都会增加。 1m QUERY 和 TASK
retry-delay-scale-factor 每次查询或任务失败时重试时间增加的系数。 2.0 QUERY 和 TASK

Task sizing

对于TASK重试策略,如何管理每个Task中处理的数据量非常重要。如果 Task 太小,任务协调的开销可能会超过执行任务本身所需的时间和资源。如果 Task 太大,单个任务可能需要的资源量会超过任何单一节点所能提供的资源,这将导致任务无法在一个节点上完成,从而阻碍查询的执行。

Trino 支持自动调整 Task 大小,但这个调整是有限的。如果在执行具有容错能力的任务时出现问题时,你可以手动设置 Task 的大小,下面这些属性仅支持TASK重试策略:

属性名称 描述 默认值
fault-tolerant-execution-standard-split-size Task 从源表读取数据时处理的标准数据分片大小 64MB
fault-tolerant-execution-max-task-split-count 单个 Task 能处理最大的分片数 256
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period 在任意分布的non-writer阶段,任务大小增加之前,所创建的任务数量 64
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor 任意分布的non-writer任务增长因子,最小是1.0,对于每次任务大小的增加,新的任务大小是旧的任务目标大小乘以这个增长因子。 1.26
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min 任意分布的non-writer任务最小数据大小 512MB
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max 任意分布的non-writer任务最大数据大小 50GB
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period 任意分布的non-writer任务目标数据大小 512MB
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor 任意分布的writer写入任务增长因子,最小是1.0,对于每次任务大小的增加,新的任务大小是旧的任务目标大小乘以这个增长因子。 1.26
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min 任意分布的writer任务最小数据大小 4GB
fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max 任意分布的writer任务最大数据大小 50GB
fault-tolerant-execution-hash-distribution-compute-task-target-size 非写入任务的哈希分布的目标输入数据大小 512MB
fault-tolerant-execution-hash-distribution-write-task-target-size 任意分布的writer任务目标数据大小 4GB
fault-tolerant-execution-hash-distribution-write-task-target-max-count 在一个容错执行的哈希分配阶段中,写入任务数量的软上限 2000

Node allocation

对于TASK重试策略,节点会根据可用内存和估计的内存使用量来为任务分配资源。如果一个任务因为超出了节点上的可用内存而失败,那么这个任务将会被重新启动,并请求分配一个完整的节点来执行该任务。

初始 Task 的内存是通过fault-tolerant-task-memory来设置的,这个属性仅适用于TASK重试策略,默认为5GB。

其他参数

属性名称 描述 默认值 重试策略
fault-tolerant-execution-task-descriptor-storage-max-memory 内存数据大小上限 (JVM heap size * 0.15) TASK
fault-tolerant-execution-max-partition-count 任务最大分区数,不建议大于50 50 TASK
fault-tolerant-execution-min-partition-count 最小任务分区数 4 TASK
fault-tolerant-execution-min-partition-count-for-write 对于写入操作,最小任务分区数
TASK
max-tasks-waiting-for-node-per-stage 在每个阶段,允许配置的一定数量的任务等待节点分配,而在这些任务等待的过程中,会暂停对当前阶段其他任务的调度,直到这些任务获得所需的资源 5 TASK

Exchange manager

交换缓冲(Exchange spooling)的功能是存储和管理缓冲数据,以实现容错执行。你可以配置一个基于文件系统的交换管理器(exchange manager),它能够将这些缓冲数据存储在指定的位置。这些位置包括:AWS S3,S3-compatible systems,Azure Blob Storage,Google Cloud Storage,HDFS。

配置

为了配置 exchange manager,需要在 coordinator 和 worker 节点都创建一个配置文件:etc/exchange-manager.properties,配置文件中设置exchange-manager.name属性为 filesystem 或者 hdfs,并且可以为不同的存储设置附加属性,以下是一些常见的属性:

属性名称 描述 默认值 支持的文件系统
exchange.base-directories 逗号分隔的URI地址
Any
exchange.sink-buffer-pool-min-size sink端最小的缓冲区大小,缓冲区越大,写入的并行度也越高,使用的内存也会增加。 10 Any
exchange.sink-buffers-per-partition 每个分区的缓冲区数量,缓冲区越大,写入的并行度也越高,使用的内存也会增加。 2 Any
exchange.sink-max-file-size sink端文件的最大数据大小 1GB Any
exchange.source-concurrent-readers 从缓冲区读取数据的并行度,并行度越大,使用的内存也会增加。 4 Any
exchange.s3.aws-access-key AWS access key
AWS S3, GCS
exchange.s3.aws-secret-key AWS secret key
AWS S3, GCS
exchange.s3.iam-role IAM role
AWS S3, GCS
exchange.s3.external-id IAM role 的 External ID
AWS S3, GCS
exchange.s3.region S3存储的地区
AWS S3, GCS
exchange.s3.endpoint S3 storage endpoint
Any S3-compatible storage
exchange.s3.max-error-retries S3客户端最大重试次数 10 Any S3-compatible storage
exchange.s3.path-style-access 是否使用path-style-access false Any S3-compatible storage
exchange.s3.upload.part-size S3多部分上传时每个部分的大小 5MB Any S3-compatible storage
exchange.gcs.json-key-file-path

GCS
exchange.gcs.json-key

GCS
exchange.azure.endpoint 端口
Azure Blob Storage
exchange.azure.connection-string 连接串
Azure Blob Storage
exchange.azure.block-size Azure block blob并行上传的数据大小 4MB Azure Blob Storage
exchange.azure.max-error-retries Azure客户端最大重试次数 10 Azure Blob Storage
exchange.hdfs.block-size HDFS存储块大小 4MB HDFS
hdfs.config.resources 逗号分隔的hdfs配置文件,比如:/etc/hdfs-site.xml。每个节点都必须存在这些文件。
HDFS

推荐在config.properties配置文件中设置exchange.compression-codec的值为LZ4,来减少I/O负载。

在配置交换管理器(exchange manager)时,建议设置一个存储桶生命周期规则,以便在节点崩溃时自动使被遗弃的对象过期。

AWS S3

下面是AWS S3配置示例:

exchange-manager.name=filesystem
exchange.base-directories=s3://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key

也可以配置多个S3地址:

exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2

Azure Blob Storage

下面是Azure Blob配置示例:

exchange-manager.name=filesystem
exchange.base-directories=abfs://container_name@account_name.dfs.core.windows.net
exchange.azure.connection-string=connection-string

Google Cloud Storage

下面是GCS配置示例:

exchange-manager.name=filesystem
exchange.base-directories=gs://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
exchange.s3.endpoint=https://storage.googleapis.com
exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json

HDFS

下面是HDFS配置示例:

exchange-manager.name=hdfs
# 如果是HA,需要配置namespace:exchange.base-directories=hdfs://hdpv6/trino-exchange-spolling
exchange.base-directories=hdfs://hadoop-master:8020/trino-exchange-spolling
hdfs.config.resources=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop/core-site.xml,/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop/hdfs-site.xml

Local filesystem storage

下面是本地文件配置示例:

exchange-manager.name=filesystem
exchange.base-directories=/tmp/trino-exchange-manager

?温馨提示:

仅在单机模式,非生产环境使用本地文件存储。在分布式环境中,交换目录是共享的,并且所有节点都能访问到的情况下,才能使用本地文件存储。

配置参考

config.properties

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://xxx.xxx.xxx.xxx:8080
# 单次查询最大分布式内存
query.max-memory=128GB
# 单次查询单节点最大内存
query.max-memory-per-node=50GB
# 重试策略
retry-policy=TASK
# 缓冲区大小
exchange.deduplication-buffer-size=128MB
# 启用对暂存数据的加密
fault-tolerant-execution.exchange-encryption-enabled=false
# 最大重试次数,仅针对 QUERY 策略
query-retry-attempts=4
# task最大重试次数,仅针对 TASK 策略
task-retry-attempts-per-task=4
# 查询失败后重试之前最小等待时间
retry-initial-delay=3s
# 查询失败后重试之前最大等待时间,每次连续失败之后,等待时间都会增加。
retry-max-delay=1m
# 每次查询或任务失败时重试时间增加的系数
retry-delay-scale-factor=2.0

exchange-manager.properties

exchange-manager.name=hdfs
exchange.base-directories=hdfs://namespace/trino-exchange-spolling
hdfs.config.resources=/app/hadoop-3.1.3/etc/hadoop/core-site.xml,/app/hadoop-3.1.3/etc/hadoop/hdfs-site.xml


请使用浏览器的分享功能分享到微信等