ClickHouse UDF 实践

ClickHouse支持用户自定义函数(User-Defined Functions,UDF),这允许用户编写自定义的函数来扩展ClickHouse的功能。UDF可以用于实现各种自定义逻辑,例如数据处理、转换、计算等。

ClickHouse目前支持内部UDF和外部UDF。

一、内部UDF

ClickHouse支持从lambda表达式创建UDF。表达式必须包含参数、常量、运算符或者其他函数调用。

1. 语法

创建内部UDF的语法如下:

CREATE FUNCTION name [ON CLUSTER cluster] AS (parameter0, ...) -> expression

一个UDF可以包含任意数量的参数,但是有一些限制:

① 函数名必须唯一,并且不能和系统函数名称重复

② 不允许使用递归函数

③ 函数使用的所有变量都必须在其参数列表中指定

如果违反以上限制,就会报错。

2. 举个例子

摘抄自官网例子,比较简单。首先在clickhouse客户端创建一个udf:

CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;

使用上面创建的udf:

SELECT number, linear_equation(number21FROM numbers(3);

在udf中使用条件函数:

CREATE FUNCTION parity_str AS (n) -> if(n % 2'odd''even');

使用一下:

SELECT number, parity_str(numberFROM numbers(3);

二、外部UDF

1. 概述

上述提到的内部UDF使用场景比较有限。除此之外,ClickHouse还提供了外部UDF供我们使用。

ClickHouse可以调用任何外部可执行程序或脚本来处理数据。

UDF配置可以位于一个或者多个xml配置文件中,配置文件的路径由/etc/clickhouse-server/config.xml中的user_defined_executable_functions_config属性指定,比如:


<user_defined_executable_functions_config>*_function.xmluser_defined_executable_functions_config>

2. UDF配置文件

UDF配置文件可以包含以下属性:

属性名称 说明
name 函数名称
command execute_direct设置为0时,执行脚本的名称
argument 参数(参数类型和参数名称)
format 传递给command的格式
return_type 返回值类型
return_name 返回值名称
type 执行类型。如果设置为executable,则启动单个命令;如果设置为executable_pool,则会创建一个命令池。
max_command_execution_time 处理数据的最大执行时间(秒),这个设置仅对type = executable_pool 时有效,默认值为10
command_termination_timeout 进程关闭的超时时间,默认为10秒。
command_read_timeout 数据读取的超时时间(毫秒),默认为10000。
command_write_timeout 数据写入的超时时间(毫秒),默认为10000。
pool_size 命令池大小,默认值为16。
send_chunk_header 是否在发送数据之前发送数据总数,默认为false。
execute_direct 如果设置为1,将会从user_scripts目录寻找可执行的文件。如果设置为0,则会将会执行bin/sh -c
lifetime 函数刷新的间隔(秒),默认为0。

执行函数时,输入参数必须来自标准输入(STDIN),并且输出结果到标准输出(STDOUT)。

3. java udf示例

3.1 配置udf名称

找到/etc/clickhouse-server/config.xml,修改如下属性:

<user_defined_executable_functions_config>*_function.xmluser_defined_executable_functions_config>
3.2 创建udf配置文件

在/etc/clickhouse-server目录下创建demo_function.xml,内容如下:

<functions>
    <function>
        <type>executabletype>
        
        <name>demo_clickhouse_udfname>
        
        <return_type>Stringreturn_type>
        
        
        <return_name>resultreturn_name>
        
        
        <argument>
            <type>UInt64type>
            <name>argument_1name>
        argument>
        <argument>
            <type>UInt64type>
            <name>argument_2name>
        argument>
        
        <format>JSONEachRowformat>
        
        <execute_direct>0execute_direct>
        
        <command>/usr/bin/java -jar /data/clickhouse/lib/clickhouse/user_scripts/demo_clickhouse_udf-1.0-SNAPSHOT-jar-with-dependencies.jarcommand>
    function>
functions>
3.3 编写udf代码

① 创建一个maven项目

新建一个maven项目,名称为:clickhouse-java-udf-demo

② 添加依赖

新建pom.xml,内容如下:


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0modelVersion>

    <groupId>com.eyesmoonsgroupId>
    <artifactId>clickhouse-java-udf-demoartifactId>
    <version>1.0-SNAPSHOTversion>

    <properties>
        <maven.compiler.source>8maven.compiler.source>
        <maven.compiler.target>8maven.compiler.target>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    properties>

    <dependencies>
        <dependency>
            <groupId>com.google.code.gsongroupId>
            <artifactId>gsonartifactId>
            <version>2.10.1version>
        dependency>
    dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-assembly-pluginartifactId>
                <version>3.3.0version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.eyesmoons.MainmainClass>
                        manifest>
                    archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependenciesdescriptorRef>
                    descriptorRefs>
                configuration>
                <executions>
                    <execution>
                        <id>assemble-allid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
        plugins>
    build>
project>

③ 主代码

package com.eyesmoons;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.math.BigDecimal;

/**
 * clickhouse自定义udf函数
 */

public class Main {
    public static void main(String[] args) {
        try {
            String s;
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            // 逐行读取数据
            while (!(s = reader.readLine()).isEmpty()) {
                // 获取输入参数
                Gson gson = new Gson();
                JsonElement jsonElement = gson.fromJson(s, JsonElement.class);
                JsonObject jsonObject = jsonElement.getAsJsonObject();
                String argument_1 = jsonObject.get("argument_1").getAsString();
                String argument_2 = jsonObject.get("argument_2").getAsString();
                // 封装输出结果
                BigDecimal num1 = new BigDecimal(argument_1);
                BigDecimal num2 = new BigDecimal(argument_2);
                BigDecimal result = num1.add(num2);
                System.out.println(gson.toJson(new Result(result.toString())));
            }
            System.out.flush();
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    }

    /**
     * 返回结果
     * 返回值名称必须跟xml文件中的return_name一致
     * xml文件return_name默认值result
     */

    public static class Result {
        private String result;

        public String getResult() {
            return result;
        }

        public void setResult(String result) {
            this.result = result;
        }

        public Result(String result) {
            this.result = result;
        }
    }
}
3.4 打包
mvn clean package
3.5 上传

将打包的clickhouse-java-udf-demo-1.0-SNAPSHOT-jar-with-dependencies.jar上传到/var/lib/clickhouse/user_scripts目录。

3.6 验证

在ClickHouse客户端执行验证创建的udf:

# 刷新函数
SYSTEM RELOAD FUNCTIONS;
# 查询刚添加的udf函数
SELECT * FROM system.functions WHERE name = 'demo_clickhouse_udf';
# 使用自定义UDF
select demo_clickhouse_udf(1,2);

4. python udf示例

ClickHouse要求python3环境,所以需要提前配置好python3。

4.1 创建udf配置文件

/etc/clickhouse-server/目录下创建test_sum_function.xml,内容如下:

<functions>
    <function>
        <type>executabletype>
        <name>test_function_sum_jsonname>
        <return_type>UInt64return_type>
        <return_name>result_namereturn_name>
        <argument>
            <type>UInt64type>
            <name>argument_1name>
        argument>
        <argument>
            <type>UInt64type>
            <name>argument_2name>
        argument>
        <format>JSONEachRowformat>
        <command>test_sum_function.pycommand>
    function>
functions>
4.2 创建python脚本

/var/lib/clickhouse/user_scripts 目录下创建test_sum_function.py,内容如下:

#!/usr/bin/python3

import sys
import json

if __name__ == '__main__':
    for line in sys.stdin:
        value = json.loads(line)
        first_arg = int(value['argument_1'])
        second_arg = int(value['argument_2'])
        result = {'result_name': first_arg + second_arg}
        print(json.dumps(result), end='\n')
        sys.stdout.flush()
4.3 授予执行权限

为上述创建的python脚本授予执行权限,否则执行会报错。

chmod +x /var/lib/clickhouse/user_scripts/test_sum_function.py.py
4.4 验证
# 刷新函数
SYSTEM RELOAD FUNCTIONS;
# 使用自定义UDF
SELECT test_function_sum_json(22);



往期推荐



如何将lacus部署到自己的服务器

Flink 自定义各种UDF函数实践

看完这篇,你就知道怎么选用ClickHouse表引擎了


请使用浏览器的分享功能分享到微信等