Trino 动态Catalog 体验

相信很多人没有听说过 Trino,但是基本都听说过 Presto,所以本文将分享 Trino 的历史背景、集群部署方式、客户端操作以及动态 Catalog 的操作。

关于Presto

Presto 是用于数据分析和开放式湖仓一体的快速可靠的SQL查询引擎。

Presto 一个很重要的特性是支持几乎现在市面上所有的数据源,并且支持联邦查询。

历史背景

Presto 诞生于 Facebook,在没有 Presto 之前,大家都是通过 Hive 基于PB级的数据仓库执行SQL,Hive的引擎是基于MapReduce的,它虽然能够完成定时跑批的任务,而且很稳定。但在交互式查询这种场景,速度是非常慢的。

在这样的业务场景推动下,Presto诞生了。它的出现,就是要来填补交互式快速查询的空白。

关于 Trino

所以 Trino 是由 Presto 演变而来的,关于 Trino,我从官网截了一张图,来介绍下 Trino 的关键特性。

翻译过来就是:

  • 快速查询

    Trino 是一个高效的并行和分布式查询引擎,可以构建高效、低延迟的分析系统。

  • 大规模部署

    基于Trino可以查询EB级的数据湖、以及海量数据仓库。

  • 简单

    基于标准ANSI SQL查询引擎,可以与TableAU、PowerBI、Superset等BI工具配合使用。

  • 适用于多种场景

    可以支持多种场景:快速的多维分析,海量小时级的批量查询,亚秒级查询大规模数据的应用程序

  • 就地分析

    不需要复制数据,直接在 hadoop、s3、cassandra、mysql 等本地直接分析。

  • 联邦查询

    可以在Trino中查询多个系统的数据,例如:将 Mysql 的数据和 S3 中的数据联合查询。

  • 任何地方运行

    可以将Trino部署在本地集群,或者是云环境,比如 Amazon,Azure,Google Cloud等。

  • 信任

    Trino 被世界各地一些最大的组织用于关键业务运营。

  • 开放

    Trino 是基于 Trino 软件基金会的社区驱动的项目。

安装部署

介绍完 Trino 之后,我们再来进行实际部署吧,对于部署过 Presto 的小伙伴来说,这简直太容易了,因为 Trino 部署不能说和 Presto 很相似,简直是一模一样。

1. 环境准备

安装任何软件的开始都是准备环境,所以按照以下指引先准备好环境吧。

  • 安装包

    下载trino-server-449.tar.gz,解压到/opt/software目录下,并重命名为trino-server-449

  • 服务器

    hadoop1 hadoop2 hadoop3
    coordinator worker worker

三台机器自行配置免密,便于文件分发和集群启动。

  • 操作系统

    要求使用64位的 Linux 系统,且为部署 trino 的用户提供足够的 unlimit,我这里部署 trino 使用的用户是 casey。

    使用 root 用户编辑limits.conf

    vim /etc/security/limits.conf

    casey soft nofile 131072
    casey hard nofile 131072
    casey soft nproc 128000
    casey hard nproc 128000

    修改完后,切换到 casey 用户,重新登录即可,可以通过如下命令查看配置是否生效:

    su -l casey
    ulimit  -a
  • jdk

    目前最新版本的 trino-449 需要 jdk 22。这里下载完解压放到 /opt/software 下,并重命名为 jdk-22.0.1

  • python

    2.6.x2.7.x、或者3.x

2. 开始安装

2.1 配置 coordinator 节点

  • 登录 hadoop1 并进入安装目录

    cd /opt/software/trino-server-449/
  • 创建配置目录

    在安装目录创建一个etc目录,用于放置 trino 的配置文件

    mkdir etc
  • 配置节点属性

    vim etc/node.properties

    #
     环境名称,集群中所有 trino 节点环境名称必须一致
    node.environment=production
    # 节点唯一标识符,每个 trino 节点必须唯一
    node.id=1
    # trino 数据目录,需要自行创建
    node.data-dir=/opt/software/trino_data
    # 开启动态 catalog,如果使用静态 catalog,请注释掉这行
    catalog.management=dynamic
  • 配置 jvm

    vim etc/jvm.config

    -server
    -Xmx8G
    -XX:+ExplicitGCInvokesConcurrent
    -XX:+ExitOnOutOfMemoryError
    -XX:+HeapDumpOnOutOfMemoryError
    -XX:+UnlockDiagnosticVMOptions
    -XX:G1NumCollectionsKeepPinned=10000000
    -Dfile.encoding=UTF-8
    -Duser.timezone=GMT+8
  • 配置 coordinator

    vim etc/config.properties

    #
     设置为 true 表示 coordinator 节点,false 表示 worker 节点
    coordinator=true
    # 设置为 false 表示在 coordinator 节点只负责调度,true 表示调度节点也作为 worker
    node-scheduler.include-coordinator=false
    # 设置 HTTP server 端口
    http-server.http.port=18080
    # coordinator 的 URL,coordinator有一个发现服务,所有节点通过它来发现其他节点,每个 trino 启动时,向服务器发现注册自己,并不断发送心跳保持活动状态。
    discovery.uri=http://hadoop1:18080
  • 配置日志级别

    vim etc/log.properties

    io.trino=INFO
  • 配置 catalog

    创建一个 catalog 目录,用来存放 trino 的数据源

    mkdir etc/catalog

至此,hadoop1 上的 coordinator 节点配置完成,接下来继续配置 worker 节点。

2.2 分发 trino

这里为了方便,写一个分发脚本,内容如下:

vim syncfiles.sh


#
!/bin/bash
# 1. 遍历所有服务器
for hostname in hadoop1 hadoop2 hadoop3
do
    echo ------  $hostname  ------
    # 2. 遍历所有目录
    for file in $@
    do
        # 3. 判断文件是否存在
        if [ -e $file ]
        then
            # 4. 获取父目录
            pdir=$(cd -P $(dirname $file); pwd)
            # 5. 获取当前文件的名称
            fname=$(basename $file)
            ssh $hostname "mkdir -p $pdir"
            rsync -av $pdir/$fname $hostname:$pdir
        else
            echo $file 不存在!
        fi
    done
done

编写完分发脚本,授权一下:

chmod +x syncfiles.sh

然后将 hadoop1 上配置好的 trino 安装包分发到 hadoop2 和 hadoop3 上:

syncfiles.sh /opt/software/trino-server-449

2.3 配置 worker 节点

  • 登录 hadoop2 并进入安装目录

    cd /opt/software/trino-server-449/
  • 修改节点属性

    vim etc/node.properties

    node.environment=production
    # 修改节点id
    node.id=2
    # 自行创建数据目录
    node.data-dir=/opt/software/trino_data
    catalog.management=dynamic
  • 配置 worker

    vim etc/config.properties

    #
     coordinator 为 false 表示 worker 节点
    coordinator=false
    node-scheduler.include-coordinator=false
    http-server.http.port=18081
    discovery.uri=http://hadoop1:18080

至此,hadoop2 上的 worker 节点配置完毕,hadoop3 配置类似,需要注意节点 id 必须唯一。

3. 启动集群

这里为了方便,编写一个集群启动脚本:

vim trino-cluster.sh

#
! /bin/bash

#
 start, restart, status, stop
command=$1

for node in  hadoop1 hadoop2 hadoop3
do
    echo -------$node---------- 
    ssh $node "/opt/software/trino-server-449/bin/launcher ${command}"
done

给启动脚本授权:

chmod +x trino-cluster.sh

然后执行如下命令即可启动集群:

trino-cluster.sh start

出现如下,表示启动成功:

[casey@hadoop1 etc]$ trino-cluster.sh start
-------hadoop1----------
Started as 53031
-------hadoop2----------
Started as 15204
-------hadoop3----------
Started as 21021

想要停止集群也很简单,只需执行如下命令:

trino-cluster.sh stop

Trino 客户端

连接 trino 有3种常见方式:通过 dbeaver 等工具、 trino client、Jdbc

  • 通过 dbeaver

  • 执行几条命令试试:

    show catalogs;
    show schemas from hive;
    show tables from hive.default;
  • 通过 trino-client

    通过官网下载:trino-cli-449-executable.jar,解压到 /opt/software,并重命名为trino

    执行如下命令即可:

    chmod +x trino
    ./trino http://hadoop1:18080

    执行几条命令试试:

    show catalogs;
    show schemas from hive;
    show tables from hive.default;
  • 通过Jdbc

    首先加入 trino-jdbc 的依赖

    <dependency>
        <groupId>io.trinogroupId>
        <artifactId>trino-jdbcartifactId>
        <version>448version>
    dependency>

    编写测试代码:

    import java.sql.*;
    import java.util.Properties;

    public class TestTrino {
        public static void main(String[] args) {
            Connection connection = null;
            Statement statement = null;
            try {
                String url = "jdbc:trino://hadoop1:18080";
                Properties properties = new Properties();
                properties.setProperty("user""demo");
                // 创建连接对象
                connection = DriverManager.getConnection(url, properties);
                // 创建Statement对象
                statement = connection.createStatement();
                // 执行查询
                ResultSet rs = statement.executeQuery("show catalogs");
                // 获取结果
                int columnNum = rs.getMetaData().getColumnCount();
                while (rs.next()) {
                    for (int i = 1; i <= columnNum; i++) {
                        System.out.println(rs.getString(i));
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
                // 关闭连接
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        }
    }

动态 catalog

动态 Catalog 这个功能还是很实用的,在没有动态 Catalog 之前,我们只能写一大堆的配置文件放在 Trino 数据目录下,每次新增或者删除 Catalog 都需要重启集群,非常的麻烦,所以动态 Catalog 的出现,极大的方便了人们对于 Catalog 的操作,只需要通过简单的几条命令即可创建或删除 Catalog,并且不需要重启集群,听起来就觉得很棒!

要使用 Trino 的动态 catalog 功能,首先需要在 node.properties 中开启动态 catalog。

然后,我们使用客户端连接到 Trino,这里我们直接用 Dbeaver 来连接。

1. 创建 Catalog

create catalog 命令用来动态创建 Catalog,如下例子所示:

# 创建一个Hive Catalog
CREATE CATALOG hive USING hive
WITH (
  "hive.config.resources" = 'your path/core-site.xml,your path/hdfs-site.xml,your path/hive-site.xml',
  "hive.metastore.uri" = 'thrift://hadoop1:9083'
);

# 创建一个Postgresql Catalog
CREATE CATALOG pg USING postgresql
WITH (
  "connection-url" = 'jdbc:postgresql://localhost:5432',
  "connection-user" = 'demo',
  "connection-password" = '123456',
  "case-insensitive-name-matching" = 'true'
);

# 创建一个Greenplum Catalog
CREATE CATALOG gp USING postgresql
WITH (
  "connection-url" = 'jdbc:postgresql://localhost:5432',
  "connection-user" = 'demo',
  "connection-password" = '123456',
  "case-insensitive-name-matching" = 'true'
);

# 创建一个Mysql Catalog
CREATE CATALOG mysql USING mysql
WITH (
  "connection-url" = 'jdbc:mysql://localhost:3306',
  "connection-user" = 'demo',
  "connection-password" = '123456',
  "case-insensitive-name-matching" = 'true'
);

# 创建一个 Kafka Catalog
create catalog kafka using kafka
with (
    "kafka.table-names"='test,demo',
    "kafka.nodes"='hadoop1:9092,hadoop2:9092,hadoop3:9092',
    "kafka.hide-internal-columns"='false'
);

这样就成功在 Trino 中实现了动态创建 Catalog。

2. 删除 Catalog

既然可以动态创建 Catalog,那当然也可以动态删除 Catalog 了,通过 delete catalog 就可以动态删除 Catalog,如下例子所示:

# 删除 mysql catalog
drop catalog mysql;



往期推荐



关于clickhouse的几点总结和建议

ClickHouse UDF 实践

Doris Catalog入门学习

Flink 自定义各种UDF函数实践


请使用浏览器的分享功能分享到微信等