OceanBase学习之路15|体验 Operational OLAP

OceanBase 数据库可以处理混合负载类型的场景。由于 OceanBase 数据库是基于对等节点的分布式架构,使得它既可以承载高并发和可扩展的 OLTP 任务,还可以在同一套数据引擎中基于 MPP 架构进行 OLAP 的并行计算,无需维护两套数据。

在 OceanBase 数据库中,您不但可以在大量在线业务数据上直接进行并行分析,还可以通过 PDML 能力(Parallel DML)将批量写入数据的大事务以并发的方式快速安全的执行。并且,这一切都是在严格保证事务一致性的前提下做到的。

下面让我们手动进行 TPC-H 测试,演示 OceanBase 数据库在 Operational OLAP 场景的特点和用法。TPC-H 是一个业界常用的基于决策支持业务的 Benchmark,通过一系列在大量数据集上面执行的复杂查询请求,检验数据库系统的分析以及决策支持能力。详细相信可参见 TPC 组织官方网站。(2021年5月20日,OceanBase 数据库以 1526 万 QphH 的成绩刷新了 TPC-H 世界纪录,并且是唯一一个同时刷新了TPC-C 以及 TPC-H 纪录的数据库,证明了其能够同时处理在线交易和实时分析两类业务场景能力。详细信息请参见  TPC-H Result )

手动进行 TPC-H 测试

以下内容为基于 TPC 官方 TPC-H 工具进行手动 Step-by-Step 进行 TPC-H 测试。手动测试可以帮助更好的学习和了解 OceanBase 数据库,尤其是一些参数的设置。

进行环境调优

请在系统租户  sys 下进行环境调优。

  1. OceanBase 数据库调优。 在系统租户下执行  obclient -h$host_ip -P$host_port -uroot@sys -A 命令。

    alter system set enable_sql_audit=False;
    alter system set enable_sql_extension=True tenant=;
    alter system set syslog_level='PERF';
    alter system set max_syslog_file_count=100;
    alter system set trace_log_slow_query_watermark='100s';
    alter system set _hash_area_size='3g' tenant=;
    alter system set enable_rebalance=False;
    alter system set memory_chunk_cache_size=0;
    alter system set cache_wash_threshold='30g';
    alter system set ob_enable_batched_multi_statement=True tenant=";
    alter system set _bloom_filter_ratio=10;
    alter system set _rowsets_enabled=True tenant=";
    alter system set _parallel_server_sleep_time=10;
    alter system set cpu_quota_concurrency=4;
    alter system set syslog_io_bandwidth_limit='30m';
    alter system set enable_async_syslog=True;
    alter system set large_query_worker_percentage=10;
    alter system set builtin_db_data_verify_cycle=0;
    alter system set micro_block_merge_verify_level=0;
    alter system set freeze_trigger_percentage=50;
    alter system set enable_perf_event=False;
    alter system set large_query_threshold='1s';
    # 存储下压和向量化设置后,如果非第一次设置,一定要刷新 plan cache 才生效
    alter system flush plan cache global;
  2. 设置租户。 在测试用户下执行  obclient -h$host_ip -P$host_port -u$user@$tenant -p$password -A 命令。

    set global ob_sql_work_area_percentage = 80;
    set global optimizer_use_sql_plan_baselines = True;
    set global optimizer_capture_sql_plan_baselines = True;
    set global ob_query_timeout = 36000000000;
    set global ob_trx_timeout = 36000000000;
    set global max_allowed_packet = 67108864;
    # parallel_servers_target = max_cpu * server_num * 8
    set global parallel_servers_target = 624;
    set global _groupby_nopushdown_cut_ratio = 1;
    # 由于安全原因,只能使用observer本地client变更secure_file_priv
    set global secure_file_priv = '';
  3. 调优参数设置完毕后请执行 obd cluster restart $cluster_name 命令重启集群。

安装 TPC-H Tool

  1. 下载 TPC-H Tool。详细信息请参考  TPC-H Tool 下载页面

  2. 下载完成后解压文件,进入 TPC-H 解压后的目录。

    [wieck@localhost ~] $ unzip 7e965ead-8844-4efa-a275-34e35f8ab89b-tpc-h-tool.zip
    [wieck@localhost ~] $ cd TPC-H_Tools_v3.0.0
  3. 复制  Makefile.suite

    [wieck@localhost TPC-H_Tools_v3.0.0] $ cd dbgen/
    [wieck@localhost dbgen] $ cp Makefile.suite Makefile
  4. 修改  Makefile 文件中的 CC、DATABASE、MACHINE、WORKLOAD 等参数定义。

    [wieck@localhost dbgen] $ vim Makefile
    CC      = gcc
    # Current values for DATABASE are: INFORMIX, DB2, TDAT (Teradata)
    #                                  SQLSERVER, SYBASE, ORACLE, VECTORWISE
    # Current values for MACHINE are:  ATT, DOS, HP, IBM, ICL, MVS,
    #                                  SGI, SUN, U2200, VMS, LINUX, WIN32
    # Current values for WORKLOAD are:  TPCH
    DATABASE= MYSQL
    MACHINE = LINUX
    WORKLOAD = TPCH
  5. 修改  tpcd.h 文件,并添加新的宏定义。

    [wieck@localhost dbgen] $ vim tpcd.h
    #ifdef MYSQL
    #define GEN_QUERY_PLAN ""
    #define START_TRAN "START TRANSACTION"
    #define END_TRAN "COMMIT"
    #define SET_OUTPUT ""
    #define SET_ROWCOUNT "limit %d;\n"
    #define SET_DBASE "use %s;\n"
    #endif

6.编译文件。

make

生成数据

您可以根据实际环境生成 TCP-H 10G、100G 或者 1T 数据。本文以生成 100G 数据为例。

./dbgen -s 100
mkdir tpch100
mv *.tbl tpch100

生成查询 SQL

说明

您可参考本节中的下述步骤生成查询 SQL 后进行调整,也可直接使用  GitHub 中给出的查询 SQL。

若您选择使用 GitHub 中的查询 SQL,您需将 SQL 语句中的  cpu_num 修改为实际并发数。

  1. 复制  qgen 和  dists.dss 文件至  queries 目录。

    cp qgen queries
    cp dists.dss queries
  2. 在  queries 目录下创建  gen.sh 脚本生成查询 SQL。

    [wieck@localhost queries] $ vim gen.sh
    #!/usr/bin/bash
    for i in {1..22}
    do  
    ./qgen -d $i -s 100 > db"$i".sql
    done
  3. 执行  gen.sh 脚本。

    chmod +x  gen.sh
    ./gen.sh
  4. 查询 SQL 进行调整。

    dos2unix *

调整后的查询 SQL 请参考  GitHub。您需将 GitHub 给出的 SQL 语句中的  cpu_num 修改为实际并发数。建议并发数的数值与可用 CPU 总数相同,两者相等时性能最好。

您可在  sys 租户下使用如下命令查看租户的可用 CPU 总数。

select sum(cpu_capacity_max) from __all_virtual_server;

以  q1 为例,修改后的 SQL 语句如下:

select /*+    parallel(96) */   ---增加 parallel 并发执行
   l_returnflag,
   l_linestatus,
   sum(l_quantity) as sum_qty,
   sum(l_extendedprice) as sum_base_price,
   sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
   sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
   avg(l_quantity) as avg_qty,
   avg(l_extendedprice) as avg_price,
   avg(l_discount) as avg_disc,
   count(*) as count_order
from
   lineitem
where
   l_shipdate <= date '1998-12-01' - interval '90' day 
group by
   l_returnflag,
   l_linestatus
order by
   l_returnflag,
   l_linestatus;

新建表

创建表结构文件  create_tpch_mysql_table_part.ddl

drop table if exists lineitem;
  drop table if exists orders;
  drop table if exists partsupp;
  drop table if exists part;
  drop table if exists customer;
  drop table if exists supplier;
  drop table if exists nation;
  drop table if exists region;
  drop tablegroup if exists tpch_tg_lineitem_order_group;
  drop tablegroup if exists tpch_tg_partsupp_part;
  create tablegroup if not exists tpch_tg_lineitem_order_group binding true partition by key 1 partitions cpu_num;
  create tablegroup if not exists tpch_tg_partsupp_part binding true partition by key 1 partitions cpu_num;
  drop table if exists lineitem;
  create table lineitem (
  l_orderkey BIGINT NOT NULL,
  l_partkey BIGINT NOT NULL,
  l_suppkey INTEGER NOT NULL,
  l_linenumber INTEGER NOT NULL,
  l_quantity DECIMAL(15,2) NOT NULL,
  l_extendedprice DECIMAL(15,2) NOT NULL,
  l_discount DECIMAL(15,2) NOT NULL,
  l_tax DECIMAL(15,2) NOT NULL,
  l_returnflag char(1) DEFAULT NULL,
  l_linestatus char(1) DEFAULT NULL,
  l_shipdate date NOT NULL,
  l_commitdate date DEFAULT NULL,
  l_receiptdate date DEFAULT NULL,
  l_shipinstruct char(25) DEFAULT NULL,
  l_shipmode char(10) DEFAULT NULL,
  l_comment varchar(44) DEFAULT NULL,
  primary key(l_orderkey, l_linenumber))row_format = condensed
  tablegroup = tpch_tg_lineitem_order_group
  partition by key (l_orderkey) partitions cpu_num;
  drop table if exists orders;
  create table orders (
  o_orderkey bigint not null,
  o_custkey bigint not null,
  o_orderstatus char(1) default null,
  o_totalprice bigint default null,
  o_orderdate date not null,
  o_orderpriority char(15) default null,
  o_clerk char(15) default null,
  o_shippriority bigint default null,
  o_comment varchar(79) default null,
  primary key (o_orderkey))row_format = condensed
  tablegroup = tpch_tg_lineitem_order_group
  partition by key(o_orderkey) partitions cpu_num;
  drop table if exists partsupp;
  create table partsupp (
  ps_partkey bigint not null,
  ps_suppkey bigint not null,
  ps_availqty bigint default null,
  ps_supplycost bigint default null,
  ps_comment varchar(199) default null,
  primary key (ps_partkey, ps_suppkey))row_format = condensed
  tablegroup tpch_tg_partsupp_part
  partition by key(ps_partkey) partitions cpu_num;
  drop table if exists part;
  create table part (
  p_partkey bigint not null,
  p_name varchar(55) default null,
  p_mfgr char(25) default null,
  p_brand char(10) default null,
  p_type varchar(25) default null,
  p_size bigint default null,
  p_container char(10) default null,
  p_retailprice bigint default null,
  p_comment varchar(23) default null,
  primary key (p_partkey))row_format = condensed
  tablegroup tpch_tg_partsupp_part
  partition by key(p_partkey) partitions cpu_num;
  drop table if exists customer;
  create table customer (
  c_custkey bigint not null,
  c_name varchar(25) default null,
  c_address varchar(40) default null,
  c_nationkey bigint default null,
  c_phone char(15) default null,
  c_acctbal bigint default null,
  c_mktsegment char(10) default null,
  c_comment varchar(117) default null,
  primary key (c_custkey))row_format = condensed
  partition by key(c_custkey) partitions cpu_num;
  drop table if exists supplier;
  create table supplier (
  s_suppkey bigint not null,
  s_name char(25) default null,
  s_address varchar(40) default null,
  s_nationkey bigint default null,
  s_phone char(15) default null,
  s_acctbal bigint default null,
  s_comment varchar(101) default null,
  primary key (s_suppkey))row_format = condensed
  partition by key(s_suppkey) partitions cpu_num;
  drop table if exists nation;
  create table nation (
  n_nationkey bigint not null,
  n_name char(25) default null,
  n_regionkey bigint default null,
  n_comment varchar(152) default null,
  primary key (n_nationkey))row_format = condensed;
  drop table if exists region;
  create table region (
  r_regionkey bigint not null,
  r_name char(25) default null,
  r_comment varchar(152) default null,
  primary key (r_regionkey))row_format = condensed;

加载数据

您可以根据上述步骤生成的数据和 SQL 自行编写脚本。加载数据示例操作如下:

  1. 创建加载脚本目录。

    [wieck@localhost dbgen] $ mkdir load
    [wieck@localhost dbgen] $ cd load
    [wieck@localhost load] $ cp ../dss.ri  ../dss.ddl ./
  2. 创建  load.py 脚本。

    [wieck@localhost load] $ vim load.py
    #!/usr/bin/env python
    #-*- encoding:utf-8 -*-
    import os
    import sys
    import time
    import commands
    hostname='$host_ip'  # 注意!!请填写某个 observer,如 observer A 所在服务器的 IP 地址
    port='$host_port'               # observer A 的端口号
    tenant='$tenant_name'              # 租户名
    user='$user'               # 用户名
    password='$password'           # 密码
    data_path='$path'         # 注意!!请填写 observer A 所在服务器下 tbl 所在目录
    db_name='$db_name'             # 数据库名
    # 创建表
    cmd_str='obclient -h%s -P%s -u%s@%s -p%s -D%s < create_tpch_mysql_table_part.ddl'%(hostname,port,user,tenant,password,db_name)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str='obclient -h%s -P%s -u%s@%s -p%s  -D%s -e "show tables;" '%(hostname,port,user,tenant,password,db_name)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c  -D%s -e "load data /*+ parallel(80) */ infile '%s/customer.tbl' into table customer fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c  -D%s -e "load data /*+ parallel(80) */ infile '%s/lineitem.tbl' into table lineitem fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c -D%s -e "load data /*+ parallel(80) */ infile '%s/nation.tbl' into table nation fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c  -D%s -e "load data /*+ parallel(80) */ infile '%s/orders.tbl' into table orders fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s   -D%s -e "load data /*+ parallel(80) */ infile '%s/partsupp.tbl' into table partsupp fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c  -D%s -e "load data /*+ parallel(80) */ infile '%s/part.tbl' into table part fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c  -D%s -e "load data /*+ parallel(80) */ infile '%s/region.tbl' into table region fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
    cmd_str=""" obclient -h%s -P%s -u%s@%s -p%s -c  -D%s -e "load data /*+ parallel(80) */ infile '%s/supplier.tbl' into table supplier fields terminated by '|';" """ %(hostname,port,user,tenant,password,db_name,data_path)
    result = commands.getstatusoutput(cmd_str)
    print result
  3. 加载数据。

    注意

    加载数据需要安装 OBClient 客户端。

    $ python load.py
    (0,'')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.\nTABLE_NAME\nT1\nLINEITEM\nORDERS\nPARTSUPP\nPART\nCUSTOMER\nSUPPLIER\nNATION\nREGION')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
    (0, 'obclient: [Warning] Using a password on the command line interface can be insecure.')
  4. 执行合并。

    Major 合并将当前大版本的 SSTable 和 MemTable 与前一个大版本的全量静态数据进行合并,使存储层统计信息更准确,生成的执行计划更稳定。

    注意

    执行合并需要使用  sys 租户登录。

    MySQL [(none)]> use oceanbase
    Database changed
    MySQL [oceanbase]> alter system major freeze tenant=;
    Query OK, 0 rows affected
  5. 查看合并是否完成。

    MySQL [oceanbase]> select FROZEN_SCN, LAST_SCN from oceanbase.CDB_OB_MAJOR_COMPACTION;
    +---------------------+---------------------+
    | FROZEN_SCN          | LAST_SCN            |
    +---------------------+---------------------+
    | 1667239201167716767 | 1667239201167716767 |
    | 1667239200111919300 | 1667239200111919300 |
    | 1667239201167452168 | 1667239201167452168 |
    | 1667239201168053124 | 1667239201168053124 |
    | 1667239201167520213 | 1667239201167520213 |
    +---------------------+---------------------+

    说明

    所有的 FROZEN_SCN 和 LAST_SCN 的值相等即表示合并完成。

  6. 手动收集统计信息

在测试用户下执行  obclient -h$host_ip -P$host_port -u$user@$tenant -p$password -A -D$database 命令。

set _force_parallel_query_dop = 96;
analyze table lineitem partition(lineitem) compute statistics for all columns size auto; 
analyze table orders partition(orders) compute statistics for all columns size auto; 
analyze table partsupp partition(partsupp) compute statistics for all columns size auto; 
analyze table part partition(part) compute statistics for all columns size auto; 
analyze table customer partition(customer) compute statistics for all columns size auto; 
analyze table supplier partition(supplier) compute statistics for all columns size auto; 
analyze table nation compute statistics for all columns size auto; 
analyze table region compute statistics for all columns size auto;

说明

_force_parallel_query_dop 可以设置当前  session 上的  query 并发度,取值为  cpu_num

执行测试

您可以根据上述步骤生成的数据和 SQL 自行编写脚本。执行测试示例操作如下:

  1. 在  queries 目录下编写测试脚本  tpch.sh

    [wieck@localhost queries] $ vim tpch.sh
    #!/bin/bash
    TPCH_TEST="obclient -h $host_ip -P $host_port -utpch_100g_part@tpch_mysql  -D tpch_100g_part  -ptest -c"
    # warmup预热
    for i in {1..22}
    do
       sql1="source db${i}.sql"
       echo $sql1| $TPCH_TEST >db${i}.log  || ret=1
    done
    # 正式执行
    for i in {1..22}
    do
       starttime=`date +%s%N`
       echo `date  '+[%Y-%m-%d %H:%M:%S]'` "BEGIN Q${i}"
       sql1="source db${i}.sql"
       echo $sql1| $TPCH_TEST >db${i}.log  || ret=1
       stoptime=`date +%s%N`
       costtime=`echo $stoptime $starttime | awk '{printf "%0.2f\n", ($1 - $2) / 1000000000}'`
       echo `date  '+[%Y-%m-%d %H:%M:%S]'` "END,COST ${costtime}s"
    done
  2. 执行测试脚本。

    sh tpch.sh

FAQ

  • 导入数据失败。报错信息如下:

    ERROR 1017 (HY000) at line 1: File not exist

    tbl 文件必须放在所连接的 OceanBase 数据库所在机器的某个目录下,因为加载数据必须本地导入。

  • 查看数据报错。报错信息如下:

    ERROR 4624 (HY000):No memory or reach tenant memory limit

    内存不足,建议增大租户内存。

  • 导入数据报错。报错信息如下:

    ERROR 1227 (42501) at line 1: Access denied

    需要授予用户访问权限。运行以下命令,授予权限:

    grant file on *.* to tpch_100g_part;
  • 查询 SQL 进行调整 dos2unix * 时报错,报错信息如下:

    -bash: dos2unix: command not found

    需要安装 dos2unix。执行以下命令,即可安装:

    yum install -y dos2unix

手动体验 Operational OLAP

通过上一步的操作,我们已经获得了一个 TPCH 的测试环境,下面让我们通过手动执行来看看,OceanBase 数据库在 OLAP 方面的能力和特性。 我们先使用 OBClient 登录到数据库中,如果您没有安装 OBClient,使用  mysql 客户端也是可以的。

obclient -h127.0.0.1 -P2881 -uroot@test  -Dtest -A -p -c

在开始之前,您需要根据 OceanBase 集群和租户的配置,进行并行度的设置,具体大小建议不超过当前租户配置的 CPU 核数的 2 倍。例如您的租户 CPU 最大配置为8,那么此处建议并行度设置为16:

MySQL [test]> SET GLOBAL parallel_servers_target=16;
Query OK, 0 rows affected

OceanBase 数据库兼容大多数 MySQL 的内部视图,我们可以通过如下查询查看当前环境中表的大小:

MySQL [test]> SELECT table_name, table_rows, CONCAT(ROUND(data_length/(1024*1024*1024),2),' GB')  table_size FROM information_schema.TABLES WHERE table_schema = 'test' order by table_rows desc;
+------------+------------+------------+
| table_name | table_rows | table_size |
+------------+------------+------------+
| lineitem   |    6001215 | 0.37 GB    |
| orders     |    1500000 | 0.08 GB    |
| partsupp   |     800000 | 0.04 GB    |
| part       |     200000 | 0.01 GB    |
| customer   |     150000 | 0.01 GB    |
| supplier   |      10000 | 0.00 GB    |
| nation     |         25 | 0.00 GB    |
| region     |          5 | 0.00 GB    |
+------------+------------+------------+
8 rows in set

下面我们通过 TPC-H 测试中的 Q1 来体验 OceanBase 数据库查询能力,Q1 查询会在最大的  lineitem 表上,汇总分析指定时间内各类商品的价格、折扣、发货、数量等信息。这个查询对全表数据都会进行读取、并进行分区、排序、聚合等计算。

不开启并发查询

首先,我们在默认不开启并发的情况下执行该查询:

select 
 l_returnflag,
 l_linestatus,
 sum(l_quantity) as sum_qty,
 sum(l_extendedprice) as sum_base_price,
 sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
 sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
 avg(l_quantity) as avg_qty,
 avg(l_extendedprice) as avg_price,
 avg(l_discount) as avg_disc,
 count(*) as count_order
from
 lineitem
where
 l_shipdate <= date '1998-12-01' - interval '90' day
group by
 l_returnflag,
 l_linestatus
order by
 l_returnflag,
 l_linestatus;

在本例的测试环境中,执行结果如下:

+--------------+--------------+----------+----------------+----------------+--------------+---------+------------+----------+-------------+
| l_returnflag | l_linestatus | sum_qty  | sum_base_price | sum_disc_price | sum_charge   | avg_qty | avg_price  | avg_disc | count_order |
+--------------+--------------+----------+----------------+----------------+--------------+---------+------------+----------+-------------+
| A            | F            | 37734107 |    56586577106 |    56586577106 |  56586577106 | 25.5220 | 38273.1451 |   0.0000 |     1478493 |
| N            | F            |   991417 |     1487505208 |     1487505208 |   1487505208 | 25.5165 | 38284.4806 |   0.0000 |       38854 |
| N            | O            | 74476040 |   111701776272 |   111701776272 | 111701776272 | 25.5022 | 38249.1339 |   0.0000 |     2920374 |
| R            | F            | 37719753 |    56568064200 |    56568064200 |  56568064200 | 25.5058 | 38250.8701 |   0.0000 |     1478870 |
+--------------+--------------+----------+----------------+----------------+--------------+---------+------------+----------+-------------+
4 rows in set (6.791 sec)

开启并发查询

OceanBase 数据库的 Operational OLAP 能力基于一套数据以及执行引擎,无需进行异构的数据同步和维护。下面我们通过添加一个  parallel Hint,以并行度为8的方式再次执行这条语句:

select /*+parallel(8) */
 l_returnflag,
 l_linestatus,
 sum(l_quantity) as sum_qty,
 sum(l_extendedprice) as sum_base_price,
 sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
 sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
 avg(l_quantity) as avg_qty,
 avg(l_extendedprice) as avg_price,
 avg(l_discount) as avg_disc,
 count(*) as count_order
from
 lineitem
where
 l_shipdate <= date '1998-12-01' - interval '90' day
group by
 l_returnflag,
 l_linestatus
order by
 l_returnflag,
 l_linestatus;

在相同的环境和数据集中,执行结果如下:

+--------------+--------------+----------+----------------+----------------+--------------+---------+------------+----------+-------------+
| l_returnflag | l_linestatus | sum_qty  | sum_base_price | sum_disc_price | sum_charge   | avg_qty | avg_price  | avg_disc | count_order |
+--------------+--------------+----------+----------------+----------------+--------------+---------+------------+----------+-------------+
| A            | F            | 37734107 |    56586577106 |    56586577106 |  56586577106 | 25.5220 | 38273.1451 |   0.0000 |     1478493 |
| N            | F            |   991417 |     1487505208 |     1487505208 |   1487505208 | 25.5165 | 38284.4806 |   0.0000 |       38854 |
| N            | O            | 74476040 |   111701776272 |   111701776272 | 111701776272 | 25.5022 | 38249.1339 |   0.0000 |     2920374 |
| R            | F            | 37719753 |    56568064200 |    56568064200 |  56568064200 | 25.5058 | 38250.8701 |   0.0000 |     1478870 |
+--------------+--------------+----------+----------------+----------------+--------------+---------+------------+----------+-------------+
4 rows in set (1.197 sec)

可以看到,对比默认无并发的执行耗时,并行查询下速度提升了将近 6 倍。如果我们通过  EXPLAIN 命令查看执行计划,也可以看到并行度的展示(第 18 行,1 号算子,dop=8):

===============================================================
|ID|OPERATOR                      |NAME    |EST. ROWS|COST    |
---------------------------------------------------------------
|0 |PX COORDINATOR MERGE SORT     |        |6        |13507125|
|1 | EXCHANGE OUT DISTR           |:EX10001|6        |13507124|
|2 |  SORT                        |        |6        |13507124|
|3 |   HASH GROUP BY              |        |6        |13507107|
|4 |    EXCHANGE IN DISTR         |        |6        |8379337 |
|5 |     EXCHANGE OUT DISTR (HASH)|:EX10000|6        |8379335 |
|6 |      HASH GROUP BY           |        |6        |8379335 |
|7 |       PX BLOCK ITERATOR      |        |5939712  |3251565 |
|8 |        TABLE SCAN            |lineitem|5939712  |3251565 |
===============================================================
Outputs & filters:
-------------------------------------
  0 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_quantity)), DECIMAL(20, 0))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_extendedprice)), DECIMAL(20, 0))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_discount)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_discount)), DECIMAL(20, 0))], [T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), sort_keys([lineitem.l_returnflag, ASC], [lineitem.l_linestatus, ASC])
  1 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax))], [T_FUN_COUNT_SUM(T_FUN_COUNT(*))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_quantity)), DECIMAL(20, 0))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_extendedprice)), DECIMAL(20, 0))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_discount)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_discount)), DECIMAL(20, 0))]), filter(nil), dop=8
  2 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax))], [T_FUN_COUNT_SUM(T_FUN_COUNT(*))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_quantity)), DECIMAL(20, 0))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_extendedprice)), DECIMAL(20, 0))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_discount)) / cast(T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_discount)), DECIMAL(20, 0))]), filter(nil), sort_keys([lineitem.l_returnflag, ASC], [lineitem.l_linestatus, ASC])
  3 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax))], [T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_quantity))], [T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_discount))], [T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_discount))], [T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil),
      group([lineitem.l_returnflag], [lineitem.l_linestatus]), agg_func([T_FUN_SUM(T_FUN_SUM(lineitem.l_quantity))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax))], [T_FUN_COUNT_SUM(T_FUN_COUNT(*))], [T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_quantity))], [T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_extendedprice))], [T_FUN_SUM(T_FUN_SUM(lineitem.l_discount))], [T_FUN_COUNT_SUM(T_FUN_COUNT(lineitem.l_discount))])
  4 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(lineitem.l_quantity)], [T_FUN_SUM(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax)], [T_FUN_COUNT(lineitem.l_quantity)], [T_FUN_COUNT(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_discount)], [T_FUN_COUNT(lineitem.l_discount)], [T_FUN_COUNT(*)]), filter(nil)
  5 - (#keys=2, [lineitem.l_returnflag], [lineitem.l_linestatus]), output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(lineitem.l_quantity)], [T_FUN_SUM(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax)], [T_FUN_COUNT(lineitem.l_quantity)], [T_FUN_COUNT(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_discount)], [T_FUN_COUNT(lineitem.l_discount)], [T_FUN_COUNT(*)]), filter(nil), dop=8
  6 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [T_FUN_SUM(lineitem.l_quantity)], [T_FUN_SUM(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax)], [T_FUN_COUNT(lineitem.l_quantity)], [T_FUN_COUNT(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_discount)], [T_FUN_COUNT(lineitem.l_discount)], [T_FUN_COUNT(*)]), filter(nil),
      group([lineitem.l_returnflag], [lineitem.l_linestatus]), agg_func([T_FUN_SUM(lineitem.l_quantity)], [T_FUN_SUM(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount)], [T_FUN_SUM(lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax)], [T_FUN_COUNT(*)], [T_FUN_COUNT(lineitem.l_quantity)], [T_FUN_COUNT(lineitem.l_extendedprice)], [T_FUN_SUM(lineitem.l_discount)], [T_FUN_COUNT(lineitem.l_discount)])
  7 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [lineitem.l_quantity], [lineitem.l_extendedprice], [lineitem.l_discount], [lineitem.l_extendedprice * 1 - lineitem.l_discount], [lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax]), filter(nil)
  8 - output([lineitem.l_returnflag], [lineitem.l_linestatus], [lineitem.l_quantity], [lineitem.l_extendedprice], [lineitem.l_discount], [lineitem.l_extendedprice * 1 - lineitem.l_discount], [lineitem.l_extendedprice * 1 - lineitem.l_discount * 1 + lineitem.l_tax]), filter([lineitem.l_shipdate <= ?]),
      access([lineitem.l_shipdate], [lineitem.l_returnflag], [lineitem.l_linestatus], [lineitem.l_quantity], [lineitem.l_extendedprice], [lineitem.l_discount], [lineitem.l_tax]), partitions(p[0-15])

本文中的例子使用单节点环境部署,值得特别说明的是,OceanBase 数据库的并行执行框架最大的特点是还可以将大量数据的分析查询以多节点并发执行的方式进行分析,例如一张表包含上亿行数据,分布在多个 OceanBase 数据库节点上,当进行分析查询时,OceanBase 数据库的分布式执行框架可以生成一个分布式并行执行计划,利用多个节点的资源进行分析,因此具备很好的扩展性,同时针对并行的设置还可以在 SQL、会话、表上多个维度进行设置。


请使用浏览器的分享功能分享到微信等