ClickHouse支持用户自定义函数(User-Defined Functions,UDF),这允许用户编写自定义的函数来扩展ClickHouse的功能。UDF可以用于实现各种自定义逻辑,例如数据处理、转换、计算等。
ClickHouse目前支持内部UDF和外部UDF。
一、内部UDF
ClickHouse支持从lambda表达式创建UDF。表达式必须包含参数、常量、运算符或者其他函数调用。
1. 语法
创建内部UDF的语法如下:
CREATE FUNCTION name [ON CLUSTER cluster] AS (parameter0, ...) -> expression
一个UDF可以包含任意数量的参数,但是有一些限制:
① 函数名必须唯一,并且不能和系统函数名称重复
② 不允许使用递归函数
③ 函数使用的所有变量都必须在其参数列表中指定
如果违反以上限制,就会报错。
2. 举个例子
摘抄自官网例子,比较简单。首先在clickhouse客户端创建一个udf:
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
使用上面创建的udf:
SELECT number, linear_equation(number, 2, 1) FROM numbers(3);
在udf中使用条件函数:
CREATE FUNCTION parity_str AS (n) -> if(n % 2, 'odd', 'even');
使用一下:
SELECT number, parity_str(number) FROM numbers(3);
二、外部UDF
1. 概述
上述提到的内部UDF使用场景比较有限。除此之外,ClickHouse还提供了外部UDF供我们使用。
ClickHouse可以调用任何外部可执行程序或脚本来处理数据。
UDF配置可以位于一个或者多个xml配置文件中,配置文件的路径由/etc/clickhouse-server/config.xml中的user_defined_executable_functions_config属性指定,比如:
<user_defined_executable_functions_config>*_function.xmluser_defined_executable_functions_config>
2. UDF配置文件
UDF配置文件可以包含以下属性:
| 属性名称 | 说明 |
|---|---|
| name | 函数名称 |
| command | execute_direct设置为0时,执行脚本的名称 |
| argument | 参数(参数类型和参数名称) |
| format | 传递给command的格式 |
| return_type | 返回值类型 |
| return_name | 返回值名称 |
| type | 执行类型。如果设置为executable,则启动单个命令;如果设置为executable_pool,则会创建一个命令池。 |
| max_command_execution_time | 处理数据的最大执行时间(秒),这个设置仅对type = executable_pool 时有效,默认值为10 |
| command_termination_timeout | 进程关闭的超时时间,默认为10秒。 |
| command_read_timeout | 数据读取的超时时间(毫秒),默认为10000。 |
| command_write_timeout | 数据写入的超时时间(毫秒),默认为10000。 |
| pool_size | 命令池大小,默认值为16。 |
| send_chunk_header | 是否在发送数据之前发送数据总数,默认为false。 |
| execute_direct | 如果设置为1,将会从user_scripts目录寻找可执行的文件。如果设置为0,则会将会执行bin/sh -c
|
| lifetime | 函数刷新的间隔(秒),默认为0。 |
执行函数时,输入参数必须来自标准输入(STDIN),并且输出结果到标准输出(STDOUT)。
3. java udf示例
3.1 配置udf名称
找到/etc/clickhouse-server/config.xml,修改如下属性:
<user_defined_executable_functions_config>*_function.xmluser_defined_executable_functions_config>
3.2 创建udf配置文件
在/etc/clickhouse-server目录下创建demo_function.xml,内容如下:
<functions>
<function>
<type>executabletype>
<name>demo_clickhouse_udfname>
<return_type>Stringreturn_type>
<return_name>resultreturn_name>
<argument>
<type>UInt64type>
<name>argument_1name>
argument>
<argument>
<type>UInt64type>
<name>argument_2name>
argument>
<format>JSONEachRowformat>
<execute_direct>0execute_direct>
<command>/usr/bin/java -jar /data/clickhouse/lib/clickhouse/user_scripts/demo_clickhouse_udf-1.0-SNAPSHOT-jar-with-dependencies.jarcommand>
function>
functions>
3.3 编写udf代码
① 创建一个maven项目
新建一个maven项目,名称为:clickhouse-java-udf-demo
② 添加依赖
新建pom.xml,内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<groupId>com.eyesmoonsgroupId>
<artifactId>clickhouse-java-udf-demoartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
<dependencies>
<dependency>
<groupId>com.google.code.gsongroupId>
<artifactId>gsonartifactId>
<version>2.10.1version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-assembly-pluginartifactId>
<version>3.3.0version>
<configuration>
<archive>
<manifest>
<mainClass>com.eyesmoons.MainmainClass>
manifest>
archive>
<descriptorRefs>
<descriptorRef>jar-with-dependenciesdescriptorRef>
descriptorRefs>
configuration>
<executions>
<execution>
<id>assemble-allid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
project>
③ 主代码
package com.eyesmoons;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.math.BigDecimal;
/**
* clickhouse自定义udf函数
*/
public class Main {
public static void main(String[] args) {
try {
String s;
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
// 逐行读取数据
while (!(s = reader.readLine()).isEmpty()) {
// 获取输入参数
Gson gson = new Gson();
JsonElement jsonElement = gson.fromJson(s, JsonElement.class);
JsonObject jsonObject = jsonElement.getAsJsonObject();
String argument_1 = jsonObject.get("argument_1").getAsString();
String argument_2 = jsonObject.get("argument_2").getAsString();
// 封装输出结果
BigDecimal num1 = new BigDecimal(argument_1);
BigDecimal num2 = new BigDecimal(argument_2);
BigDecimal result = num1.add(num2);
System.out.println(gson.toJson(new Result(result.toString())));
}
System.out.flush();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
/**
* 返回结果
* 返回值名称必须跟xml文件中的return_name一致
* xml文件return_name默认值result
*/
public static class Result {
private String result;
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
public Result(String result) {
this.result = result;
}
}
}
3.4 打包
mvn clean package
3.5 上传
将打包的clickhouse-java-udf-demo-1.0-SNAPSHOT-jar-with-dependencies.jar上传到/var/lib/clickhouse/user_scripts目录。
3.6 验证
在ClickHouse客户端执行验证创建的udf:
# 刷新函数
SYSTEM RELOAD FUNCTIONS;
# 查询刚添加的udf函数
SELECT * FROM system.functions WHERE name = 'demo_clickhouse_udf';
# 使用自定义UDF
select demo_clickhouse_udf(1,2);
4. python udf示例
ClickHouse要求python3环境,所以需要提前配置好python3。
4.1 创建udf配置文件
在/etc/clickhouse-server/目录下创建test_sum_function.xml,内容如下:
<functions>
<function>
<type>executabletype>
<name>test_function_sum_jsonname>
<return_type>UInt64return_type>
<return_name>result_namereturn_name>
<argument>
<type>UInt64type>
<name>argument_1name>
argument>
<argument>
<type>UInt64type>
<name>argument_2name>
argument>
<format>JSONEachRowformat>
<command>test_sum_function.pycommand>
function>
functions>
4.2 创建python脚本
在/var/lib/clickhouse/user_scripts 目录下创建test_sum_function.py,内容如下:
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()
4.3 授予执行权限
为上述创建的python脚本授予执行权限,否则执行会报错。
chmod +x /var/lib/clickhouse/user_scripts/test_sum_function.py.py
4.4 验证
# 刷新函数
SYSTEM RELOAD FUNCTIONS;
# 使用自定义UDF
SELECT test_function_sum_json(2, 2);
往期推荐