上篇介绍了 Trino 在处理 OOM 时,为我们提供的磁盘溢写功能,但官方并不推荐那种方式,今天就来介绍一下官方推荐的方式:容错执行 + Exchange manager。
默认情况下,如果 Trino 的一个节点在执行任务时资源不足,或者由于其他原因导致查询执行失败,那么这个查询就会失败,需要手动重新执行,查询运行的时间越长,查询执行失败的概率就越大。
容错执行是一种允许集群在查询失败时通过重新执行整个 query 或部分 task 来减轻失败影响的机制。当启用容错执行功能时,中间的交换数据会被暂存起来,这样 worker 节点在发生故障时,其他 worker 节点也可以重新使用这些数据。
?注意:
容错机制不适用于由于查询本身错误或用户操作错误导致的问题。例如,Trino不会浪费资源去重试一个因为SQL语法无法解析而失败的查询。简单来说,如果用户输入了错误的SQL语句,导致SQL无法解析,Trino 不会尝试去重试这个查询,而是直接报错。
容错配置
容错执行特性默认是禁用的。我们可以设置retry-policy 这个属性来启用容错执行特性,可选的值有:QUERY 和 TASK,比如:
retry-policy=QUERY
⚠️警告:
如果设置了重试策略(retry-policy),对于没有明确支持容错执行的 connectors 来说,在执行查询时会导致执行失败,并且显示错误信息:This connector does not support query retries。
以下这些 connectors 支持容错执行:
BigQuery connector Delta Lake connector Hive connector Iceberg connector MariaDB connector MongoDB connector MySQL connector Oracle connector PostgreSQL connector Redshift connector SQL Server connector
我们可以为容错执行设置以下属性:
| 属性名称 | 描述 | 默认值 |
|---|---|---|
| retry-policy | 重试策略,可选值:QUERY(重试整个查询) 和 TASK(重试失败的任务),详见retry policy | NONE |
| exchange.deduplication-buffer-size | 协调器(coordinator)使用一个内存缓冲区(in-memory buffer),这个缓冲区用于容错执行(fault-tolerant execution)过程中存储查询阶段(query stages)的输出结果。如果这个缓冲区在查询执行时满了,会导致执行失败,并输出错误信息:"Exchange manager must be configured for the failure recovery capabilities to be fully functional",除非配置了交换管理器exchange manager](https://trino.io/docs/current/admin/fault-tolerant-execution.html#fte-exchange-manager)。 | 32MB |
| fault-tolerant-execution.exchange-encryption-enabled | 启用对暂存数据的加密,如果 Trino 处理的是敏感数据,建议设置为 True。详见Encryption | true |
更多相关的属性设置可以参考Properties reference,尤其是 Resource management properties 和 Exchange properties。
重试策略
retry-policy属性决定了在查询失败时执行整个 query 还是查询中特定的 task。
QUERY
如果重试策略设置为QUERY,在 worker 节点发生错误时,Trino 会自动重试整个 query,当 Trino 集群包含大量小查询时,建议使用QUERY这种重试策略。
默认情况下,对于执行结果不超过32MB的查询来说,Trino并不会实现容错。比如,如果一个SELECT语句返回给用户的数据集非常大,就可能超出这个限制,我们可以修改exchange.deduplication-buffer-size这个属性来增加这个限制的大小,使其超过默认的32MB,但是这样会在 coordinator 节点上消耗更多的内存。
为了使查询能够对具有较大结果集的查询执行容错处理,强烈建议配置一个交换管理器:exchange manager,该管理器使用外部存储来处理中间数据存储,可以在内存不足时,将数据溢出到外部存储。
TASK
如果重试策略设置为TASK,在查询执行失败时,Trino会重新尝试执行这个失败的 task,而不是整个查询。使用这个策略时,必须配置交换管理器:exchange manager。当执行大量的批处理查询时,建议使用TASK重试策略,集群可以针对查询中的小任务进行更有效的重试,而不是当一个任务失败时就需要重试整个查询,这样做可以节省资源并提高效率。
当重试策略设置为TASK时,一些相关的配置属性的默认值会被自动更改,以适应容错集群的需求。这种自动更改不会影响那些已经手动设置过这些属性的集群。如果你已经在config.properties中手动配置了某些属性,并且重试策略设置为TASK,强烈建议设置task.low-memory-killer.policy的值为:total-reservation-on-blocked-nodes,以避免在内存不足时需要手动终止查询。
?温馨提示:
TASK重试策略最适合用于处理大量的批查询,但是对于高频执行的短查询,可能会导致比较大的延迟。最佳实践是,建议运行一个专门的集群,该集群使用
TASK重试策略来处理大量的批处理。这个集群应该与处理大量小查询的集群独立开来。
加密
Trino在将数据暂存之前会对数据进行加密。这种加密措施防止了除写入数据的Trino集群之外的任何人访问查询数据,包括存储系统的管理员。对于每一个查询,Trino都会随机生成一个新的加密密钥,并且在查询完成后,这个密钥会被丢弃。
这种做法确保了每次查询的加密都是独立的,进一步增强了安全性,因为即使某个密钥被泄露,它也只能解密对应的那次查询的数据,而不会影响到其他查询的数据安全。
高级配置
你可以使用以下列出的配置属性来进一步配置容错执行。这些属性的默认值应该适用于大多数部署场景,但如果需要进行测试或故障排查,你可以修改这些值。
Retry limits
以下配置决定了在什么情况下,如果查询或任务连续失败,系统将不再尝试重新执行它们。这些配置属性通常用于避免在失败的情况下无休止地重试,从而节省资源并防止潜在的系统过载。
| 属性名称 | 描述 | 默认值 | 重试策略 |
|---|---|---|---|
| query-retry-attempts | 最大重试次数 | 4 | 仅针对 QUERY 策略 |
| task-retry-attempts-per-task | 每个Task最大重试次数 | 4 | 仅针对 TASK 策略 |
| retry-initial-delay | 查询失败后重试之前最小等待时间 | 10s | QUERY 和 TASK |
| retry-max-delay | 查询失败后重试之前最大等待时间,每次连续失败之后,等待时间都会增加。 | 1m | QUERY 和 TASK |
| retry-delay-scale-factor | 每次查询或任务失败时重试时间增加的系数。 | 2.0 | QUERY 和 TASK |
Task sizing
对于TASK重试策略,如何管理每个Task中处理的数据量非常重要。如果 Task 太小,任务协调的开销可能会超过执行任务本身所需的时间和资源。如果 Task 太大,单个任务可能需要的资源量会超过任何单一节点所能提供的资源,这将导致任务无法在一个节点上完成,从而阻碍查询的执行。
Trino 支持自动调整 Task 大小,但这个调整是有限的。如果在执行具有容错能力的任务时出现问题时,你可以手动设置 Task 的大小,下面这些属性仅支持TASK重试策略:
| 属性名称 | 描述 | 默认值 |
|---|---|---|
| fault-tolerant-execution-standard-split-size | Task 从源表读取数据时处理的标准数据分片大小 | 64MB |
| fault-tolerant-execution-max-task-split-count | 单个 Task 能处理最大的分片数 | 256 |
| fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period | 在任意分布的non-writer阶段,任务大小增加之前,所创建的任务数量 | 64 |
| fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor | 任意分布的non-writer任务增长因子,最小是1.0,对于每次任务大小的增加,新的任务大小是旧的任务目标大小乘以这个增长因子。 | 1.26 |
| fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min | 任意分布的non-writer任务最小数据大小 | 512MB |
| fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max | 任意分布的non-writer任务最大数据大小 | 50GB |
| fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period | 任意分布的non-writer任务目标数据大小 | 512MB |
| fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor | 任意分布的writer写入任务增长因子,最小是1.0,对于每次任务大小的增加,新的任务大小是旧的任务目标大小乘以这个增长因子。 | 1.26 |
| fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min | 任意分布的writer任务最小数据大小 | 4GB |
| fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max | 任意分布的writer任务最大数据大小 | 50GB |
| fault-tolerant-execution-hash-distribution-compute-task-target-size | 非写入任务的哈希分布的目标输入数据大小 | 512MB |
| fault-tolerant-execution-hash-distribution-write-task-target-size | 任意分布的writer任务目标数据大小 | 4GB |
| fault-tolerant-execution-hash-distribution-write-task-target-max-count | 在一个容错执行的哈希分配阶段中,写入任务数量的软上限 | 2000 |
Node allocation
对于TASK重试策略,节点会根据可用内存和估计的内存使用量来为任务分配资源。如果一个任务因为超出了节点上的可用内存而失败,那么这个任务将会被重新启动,并请求分配一个完整的节点来执行该任务。
初始 Task 的内存是通过fault-tolerant-task-memory来设置的,这个属性仅适用于TASK重试策略,默认为5GB。
其他参数
| 属性名称 | 描述 | 默认值 | 重试策略 |
|---|---|---|---|
| fault-tolerant-execution-task-descriptor-storage-max-memory | 内存数据大小上限 | (JVM heap size * 0.15) | TASK |
| fault-tolerant-execution-max-partition-count | 任务最大分区数,不建议大于50 | 50 | TASK |
| fault-tolerant-execution-min-partition-count | 最小任务分区数 | 4 | TASK |
| fault-tolerant-execution-min-partition-count-for-write | 对于写入操作,最小任务分区数 | TASK | |
| max-tasks-waiting-for-node-per-stage | 在每个阶段,允许配置的一定数量的任务等待节点分配,而在这些任务等待的过程中,会暂停对当前阶段其他任务的调度,直到这些任务获得所需的资源 | 5 | TASK |
Exchange manager
交换缓冲(Exchange spooling)的功能是存储和管理缓冲数据,以实现容错执行。你可以配置一个基于文件系统的交换管理器(exchange manager),它能够将这些缓冲数据存储在指定的位置。这些位置包括:AWS S3,S3-compatible systems,Azure Blob Storage,Google Cloud Storage,HDFS。
配置
为了配置 exchange manager,需要在 coordinator 和 worker 节点都创建一个配置文件:etc/exchange-manager.properties,配置文件中设置exchange-manager.name属性为 filesystem 或者 hdfs,并且可以为不同的存储设置附加属性,以下是一些常见的属性:
| 属性名称 | 描述 | 默认值 | 支持的文件系统 |
|---|---|---|---|
| exchange.base-directories | 逗号分隔的URI地址 | Any | |
| exchange.sink-buffer-pool-min-size | sink端最小的缓冲区大小,缓冲区越大,写入的并行度也越高,使用的内存也会增加。 | 10 | Any |
| exchange.sink-buffers-per-partition | 每个分区的缓冲区数量,缓冲区越大,写入的并行度也越高,使用的内存也会增加。 | 2 | Any |
| exchange.sink-max-file-size | sink端文件的最大数据大小 | 1GB | Any |
| exchange.source-concurrent-readers | 从缓冲区读取数据的并行度,并行度越大,使用的内存也会增加。 | 4 | Any |
| exchange.s3.aws-access-key | AWS access key | AWS S3, GCS | |
| exchange.s3.aws-secret-key | AWS secret key | AWS S3, GCS | |
| exchange.s3.iam-role | IAM role | AWS S3, GCS | |
| exchange.s3.external-id | IAM role 的 External ID | AWS S3, GCS | |
| exchange.s3.region | S3存储的地区 | AWS S3, GCS | |
| exchange.s3.endpoint | S3 storage endpoint | Any S3-compatible storage | |
| exchange.s3.max-error-retries | S3客户端最大重试次数 | 10 | Any S3-compatible storage |
| exchange.s3.path-style-access | 是否使用path-style-access | false | Any S3-compatible storage |
| exchange.s3.upload.part-size | S3多部分上传时每个部分的大小 | 5MB | Any S3-compatible storage |
| exchange.gcs.json-key-file-path | GCS | ||
| exchange.gcs.json-key | GCS | ||
| exchange.azure.endpoint | 端口 | Azure Blob Storage | |
| exchange.azure.connection-string | 连接串 | Azure Blob Storage | |
| exchange.azure.block-size | Azure block blob并行上传的数据大小 | 4MB | Azure Blob Storage |
| exchange.azure.max-error-retries | Azure客户端最大重试次数 | 10 | Azure Blob Storage |
| exchange.hdfs.block-size | HDFS存储块大小 | 4MB | HDFS |
| hdfs.config.resources | 逗号分隔的hdfs配置文件,比如:/etc/hdfs-site.xml。每个节点都必须存在这些文件。 | HDFS |
推荐在config.properties配置文件中设置exchange.compression-codec的值为LZ4,来减少I/O负载。
在配置交换管理器(exchange manager)时,建议设置一个存储桶生命周期规则,以便在节点崩溃时自动使被遗弃的对象过期。
AWS S3
下面是AWS S3配置示例:
exchange-manager.name=filesystem
exchange.base-directories=s3://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
也可以配置多个S3地址:
exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2
Azure Blob Storage
下面是Azure Blob配置示例:
exchange-manager.name=filesystem
exchange.base-directories=abfs://container_name@account_name.dfs.core.windows.net
exchange.azure.connection-string=connection-string
Google Cloud Storage
下面是GCS配置示例:
exchange-manager.name=filesystem
exchange.base-directories=gs://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
exchange.s3.endpoint=https://storage.googleapis.com
exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json
HDFS
下面是HDFS配置示例:
exchange-manager.name=hdfs
# 如果是HA,需要配置namespace:exchange.base-directories=hdfs://hdpv6/trino-exchange-spolling
exchange.base-directories=hdfs://hadoop-master:8020/trino-exchange-spolling
hdfs.config.resources=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop/core-site.xml,/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop/hdfs-site.xml
Local filesystem storage
下面是本地文件配置示例:
exchange-manager.name=filesystem
exchange.base-directories=/tmp/trino-exchange-manager
?温馨提示:
仅在单机模式,非生产环境使用本地文件存储。在分布式环境中,交换目录是共享的,并且所有节点都能访问到的情况下,才能使用本地文件存储。
配置参考
config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://xxx.xxx.xxx.xxx:8080
# 单次查询最大分布式内存
query.max-memory=128GB
# 单次查询单节点最大内存
query.max-memory-per-node=50GB
# 重试策略
retry-policy=TASK
# 缓冲区大小
exchange.deduplication-buffer-size=128MB
# 启用对暂存数据的加密
fault-tolerant-execution.exchange-encryption-enabled=false
# 最大重试次数,仅针对 QUERY 策略
query-retry-attempts=4
# task最大重试次数,仅针对 TASK 策略
task-retry-attempts-per-task=4
# 查询失败后重试之前最小等待时间
retry-initial-delay=3s
# 查询失败后重试之前最大等待时间,每次连续失败之后,等待时间都会增加。
retry-max-delay=1m
# 每次查询或任务失败时重试时间增加的系数
retry-delay-scale-factor=2.0
exchange-manager.properties
exchange-manager.name=hdfs
exchange.base-directories=hdfs://namespace/trino-exchange-spolling
hdfs.config.resources=/app/hadoop-3.1.3/etc/hadoop/core-site.xml,/app/hadoop-3.1.3/etc/hadoop/hdfs-site.xml