KWDB时序数据库在工业级机器手臂生产调度中的落地实践案例

 


作者:2301_79516858

原文链接:https://blog.csdn.net/2301_79516858/article/details/148283623

前言:

随着AI与人工智能的快速发展,工业4.0也迎来了智能制造,机器人等技术的发展,单纯的关系型数据库已经远远不能满足工业量级的存储场景了。为了满足工业级物联网的数据增量问题,急需一款高性能、分布式的物联网、工业大数据平台,来解决时序数据的问题。

在万物互联时代,设备和传感器持续产生海量实时数据,传统数据库和数据处理平台难以满足高性能和实时数据处理的需求,面临严峻挑战。这些海量实时数据的涌现,也导致了企业在研发、运营和维护成本的大幅攀升。

为应对这一挑战,本文介绍一款专为物联网、工业互联网等场景设计并优化的大数据平台“KWDB”, 这个专为物联网和工业互联网等场景设计优化的大数据平台应运而生。作为一款高性能、分布式的物联网、工业大数据平台,其核心模块是高性能、集群开源、云原生、极简的时序数据库。它能安全高效地将大量设备每天产生的高达 TB 甚至 PB 级的数据进行汇聚、存储、分析和分发,并提供 AI 智能体对数据进行预测与异常检测,提供实时的商业洞察。

接下来从“工厂4.0的机器手臂自动化测试”的场景来实际分析与实践,它融合了关系型数据库,时序数据库等多种类型的数据库,很适合用在物联网IoT的应用场景。

一、时序数据增长给MySQL带来的业务痛点分析:

工厂SFC(Shop Floor Control)生产管理系统,简单来说,是一种专注于工厂车间生产现场管控的信息化管理系统,它就像是车间里的智能管家,对生产过程中的各个环节进行细致入微的管理。

在现代的互联网应用中,其中MySQL数据库扮演着重要的角色。然而,随着工厂时序的数据量持续不断的增加,用户可能会发现数据库查询的速度逐渐变慢。本文将探讨一下“KWDB分布式多模数据库"时序数据库的,并提供一些其它的数据库解决方案。

  1. 1. 性能瓶颈:
    ①. 随着公司的业务快速发展,数据库中的时序数据量猛增,访问性能也变慢了,单台MySQL实例无法应对和满足大规模数据管理和请求访问,导致数据库性能下降,成为瓶颈。

②. 关系型数据本身就比较容易形成系统瓶颈,无论是从单机存储容量、连接数、处理能力都有限。

③. 当单表的数据量达到1000W以后,由于查询和操作的维度较广,哪怕使用了MySQL从库读写分离、优化索引等操作时,性能还是无可避免严重下降。

  1. 2. 数据强一致性同步延迟:
    ①. 当架构增加Redis、RabbitMQ等消息队列。

②. OLTP的大数据量统计数据类异构表同步只能满足业务的T+1。

③. 系统架构中,异步设计方案中的中间件故障,导致数据重传、数据丢失。

二、关系型数据库应用的类型:

Item 区分 OLAP OLTP
1 名称 在线分析处理(Online Analytical Processing) 在线事务处理(Online Transaction Processing)
2 作用 处理企业级的决策分析、战略分析以及业务分析等 处理企业级的常规业务操作,如公司的采购、销售、存储、支付等
3 侧重点 多维数据分析技术和聚合算法,方便分析数据 强调数据的精确、事务的原子性和并发性
4 数据类型 历史性、汇总性、非实时性、不可变性数据 实时的、明细的、实时性的、可变性数据
5 场景 数据仓库 常规业务操作
6 查询模式 采用复杂的算法和存储结构,如多维数据库和立方体结构 需要简单的SQL语句,如基本的、事务相关的查询
7 性能要求 更高的存储要求和处理能力 快速且稳定的响应速度,可扩展性和高可用性
8 应用场景 企业级的决策支持和战略分析等领域 采购、销售、库存管理、银行交易等领域,极短的时间内快速响应用户请求,从而保证业务的正常运行

所以,在日常的企业级应用中,OLAP和OLTP针对不同的业务场景,有不同的解决方案。OLAP主要用于企业级决策和战略分析,需要快速的数据查询和分析技术。相反,OLTP主要用于企业日常操作,需要快速的数据更新和处理技术。那么“KWDB分布式多模数据库"有什么特点和区别呢?

2.1 KWDB 分布式多模数据库:

KWDB 2.0 是一款自主研发的面向 AIoT 场景的分布式、多模融合、支持原生 AI 的数据库产品,可通过同一实例同时建立时序库和关系库并融合处理多模数据,具备千万级设备接入、百万级数据秒级写入、亿级数据秒级读取等时序数据高效处理能力;具有稳定安全、高可用、易运维等特点,面向工业物联网、数字能源、车联网、智慧产业等领域,提供一站式数据存储、管理与分析的基座。

2.2 什么是时序数据库?

时序数据,即时间序列数据(Time-Series Data),它们是一组按照时间发生先后顺序进行排列的序列数据。日常生活中,设备、传感器采集的数据就是时序数据,证券交易的记录也是时序数据。因此时序数据的处理并不陌生,特别在是工业自动化以及证券金融行业,专业的时序数据处理软件早已存在,比如工业领域的 PI System 以及金融行业的 KDB。

我们可以来思考一下,工业SFC系统的业务流程是什么呢,先来介绍一下我们生产手机的测试场景,再来结合时序数据的特别来综合分析一下。

①. 首先我们每个设备在车间是有固定的位置,一般流水线是分为A/B边,不同的工站测试的手机数据不同,比如有一些是专门测试声音、Wifi模组、摄像头等不同的工位。

②. 每台设备机器手臂会产出很多时序数据,比如设备的编号、型号、代码等固定的信息,这些可以算是标签的数据,用于记录采集对象的静态数据。

③. 接下来是采集的是字段,用于记录采集对象的实时数据,这里分为2部分:一部分是机器手臂相关的运行状态数据,另外一部分是手机的测试数据,比如某个功能是否成功。

④. 结合上面时序数据的特点,我们这个场景完全是符合要求的,相较于传统使用的关系型数据库充分利用了时序数据特点,比如结构化、无需事务、很少删除或更新、写多读少等等。

2.3 KWDB的优势与特点:

KWDB 具备完善的功能和优异的性能,充分满足不同的应用场景需求,赋能行业企业的数字化建设和转型。能广泛应用于电力、石油、制造、出行、汽车、IT 运维、金融等领域。企业使用KWDB可以安全高效地将大量设备、传感器每天产生的高达 TB 甚至 PB级的数据进行汇聚、存储、分析和分发,对业务运行状态进行实时监测、预警,提供实时的商业洞察,加速数字化转型进程,将数据价值最大化。

从上面KWDB的产品优势来看,KWDB是一款高性能、分布式、支持 SQL 的时序数据库 (Database)的多模数据库,包括集群功能全部开源(开放原子开源基金会孵化及运营的项目)。KWDB能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。

除核心的时序数据库 (Database) 功能外,KWDB还提关系型数据库查询的功能,可以给大数据平台所需要的系列功能,最大程度减少研发和运维的复杂度。

融合多种数据计算引擎,根据不同模型数据特征选择不同的存储、计算模式,对外提供统一的接口,提供多种分析计算能力,提升查询效率:

①. 自适应时序引擎:支持多种时序数据特色的复杂查询和多维聚合方式。与传统关系数据库相比,KWDB 具备优异的查询性能。另外,KWDB 提供 5-30 倍的压缩能力,数据压缩后无需解压缩即可使用。

②. 事务处理引擎:支持分布式事务和 MVCC(Multi-Version Concurrency Control,多版本并发控制),具备注释、视图、约束、索引、序列等功能。

③. 预测分析引擎:提供模型生命周期管理、模型训练、模型推理预测等功能。任何拥有数据库应用开发背景的开发人员都可以轻松地完成模型管理和预测等操作。
目前,KWDB 在工业物联网、数字能源、数字政务、金融等领域均已成功完成落地实践。未来,KWDB 能够赋能工业物联网、数字能源、车联网、智慧矿山等各大行业领域,助力企业从数据中挖掘更大的商业价值。

相关文档:
KWDB官方开源仓库地址:https://gitee.com/kwdb/kwdb
KWDB官网地址:https://www.kaiwudb.com/
KWDB官网文档地址: https://www.kaiwudb.com/kaiwudb_docs/#/
KWDB 2.2.0 下载地址: https://gitee.com/kwdb/kwdb/releases/tag/V2.2.0

三、KWDB多模数据库集群搭建:

KWDB 支持用户根据自己的需求来选择安装的方式,这里分为单节点部署和集群部署两种方式,因为工厂内部的数据量比较大,所以,我们这里使用集群的方式来进行部署与使用:

①. 单节点部署分为:二进制安装包、容器和源码安装部署。

②. 集群部署分为:裸机部署、容器部署。

从KWDB支持的部署集群方式来看,如果是单副本集群的话,整个集群只有一份数据副本,从数据库的CAP理论来讲,从一致性 (Consistency)、可用性 (Availability) 和分区容错性 (Partition Tolerance)角度来讲,单副本集群部署方式是不太推荐的。

那么,我们只能通过最少三台物理设备来搭建KWDB集群的方式,才能符合多副本集群的基础节点要求,从官方的文档查阅,支持有3种不同的集群部署支持方式:

接下来,那我们选择一个最简单的方式“使用脚本部署”来进行安装KWDB 3节点集群方案。

为了提高可用性,降低数据丢失的风险,建议在单台计算机上只运行一个节点。KWDB 采用跨节点复制机制,如果在一台计算机上同时运行多个节点,当计算机发生故障时,更有可能丢失数据。

3.1 安装的软理件检查:

在安装KWDB集群,先了解一下基本的配置情况,使用裸机安装包部署 KWDB 所需的硬件规格要求和软件要求,部署 KWDB 时,系统将对配置文件、运行环境、硬件配置和软件依赖进行检查:

①. 首先单节点配置的CPU 和内存,建议不低于 4 核 8G,如果对于数据量大、复杂工作负载、高并发和高性能场景,建议配置更高的 CPU 和内存资源以确保系统的高效运行。

②. 操作系统推荐使用Ubuntu版本的。

③. KWDB有一些依赖的要求,如果缺少依赖会退出安装并提示依赖缺失,如OpenSSL、libprotobuf等。

④. KWDB的服务映射端口防止被占用,且没有被防火墙拦截,也可以自定义修改 deploy.cfg 文件中的端口配置参数。

  • • 8080:数据库 Web 服务端口
  • • 26257:数据库服务端口、节点监听端口和对外连接端口

3.2 节点“SSH免密登录”和“时钟同步”配置:

一般情况下,我们都是使用密码来登陆服务器进行管理,如果如果频繁登录的场景,如自动化脚本或持续集成环境,就可以使用SSH免密登陆,用户无需每次登录时输入密码,节省了时间和精力,同时,最主要的是免密登录可以避免密码在网络中明文传输,减少密码被窃取的风险。

SSH免密登录:

因为测试的集群是3台节点,所以,这里我们先把3台服务器做一个标识,用来区分不同的设备机器,kwdb-db-01、kwdb-db-02、kwdb-db-03。在第一个服务器,登录当前节点,生成公私密钥对,再把生成的密钥分发到集群其它节点:

ssh-keygen -f ~
/.ssh/id_rsa -N 
""

ssh-copy-id -f -i ~ /.ssh/id_rsa. pub -o  StrictHostKeyChecking=no 

参数说明:

①. -f ~/.ssh/id_rsa:指定生成的密钥对文件目录。

②. -N:将密钥密码设置为空,以实现免密登录。
确认是否可以使用 SSH 免密登录到集群其它节点。

ssh 

NTP时钟同步:
在Linux系统中,时钟同步通常指的是确保系统时间与外部时间源(如互联网上的NTP服务器)同步,以保证系统时间的准确性和一致性。这对于服务器尤其重要,因为它们需要精确的时间来执行各种任务,如日志记录、数据库操作等。

过网络连接到指定的NTP(Network Time Protocol)服务器,获取准确的时间并调整本地系统时钟,KWDB采用中等强度的时钟同步机制来维持数据的一致性:

①. 当节点检测到自身的机器时间与集群中至少 50% 的节点的机器时间的误差值超过集群最大允许时间误差值(默认为 500 ms)的 80% 时,该节点会自动停止。

②. 可以避免违反数据一致性,带来读写旧数据的风险。

③. 每个节点都必须运行 NTP(Network Time Protocol,网络时间协议)或其它时钟同步软件,防止时钟漂移得太远。

# 启用 
NTP 服务

sudo timedatectl set-ntp on
sudo apt update && sudo apt install ntp
timedatectl status
# 重启服务
sudo systemctl restart ntp
sudo ntpq -p

3.3 依赖相关:

检查当前系统中是否已安装 libprotobuf 及其版本是否符合要求(3.6.1 及以上版本),安装完成后可以查看 libprotobuf 版本的相关依赖:

sudo apt install -y libprotobuf-dev

sudo apt-cache rdepends libprotobuf-dev

3.4 KWDB脚本部署集群:

上面我们使用脚本部署集群时,已经对配置文件、运行环境、硬件配置、软件依赖和 SSH 免密登录进行安装,接下来可以正式进入KWDB脚本集群部署环境。

wget 
https:
//gitee.com/kwdb/kwdb/releases/download/V2.2.0/KWDB-2.2.0-ubuntu22.04-x86_64-debs.tar.gz

tar -xzvf  KWDB- 2.2 .0-ubuntu22 .04-x86_64-debs. tar. gz
vi deploy. cfg
# 编辑deploy. cfg 配置文件,设置local本机 IP地址、设置cluster服务node_addr以及ssh_user用户等信息
./deploy. sh install --multi-replica
./deploy. sh cluster -i
./deploy. sh cluster -s

这里我们需要下载对应的安装版本,请注意一下您服务器对应的版本,如果下错了会出现报错(明明依赖安装过了),接下来,需要修改deploy.cfg的文件,这里需要注意是使用的安全模式(默认是tls的,即有一个CA证书),另外,需要修改cluster需要对应的所有服务器节点加进来,最后ssh_user要特别注意,刚开始没注意,结果一直卡在那里,默认是admin,但是我的用户名是root。

以下是kwdb安装脚本的相关的目录结构:

.

├── add_user. sh     # 安装、启动  KaiwuDB 后,为  KaiwuDB 数据库创建用户。
├── deploy. cfg      # 安装部署配置文件,用于配置部署节点的  IP 地址、端口等配置信息。
├── deploy. sh       # 安装部署脚本,用于安装、卸载、启动、状态获取、关停和重启等操作。
├── packages       # 存放  DEBRPM、镜像包和 libprotobuf 包。
│   ├── kwdb-libcommon_2 .2 .0-ubuntu- 22.04_amd64. deb
│   └── kwdb-server_2 .2 .0-ubuntu- 22.04_amd64. deb
└── utils           # 存放工具类脚本。
    ├── container_shell. sh
    ├── kaiwudb_cluster. sh
    ├── kaiwudb_common. sh
    ├── kaiwudb_hardware. sh
    ├── kaiwudb_install. sh
    ├── kaiwudb_log. sh
    ├── kaiwudb_operate. sh
    ├── kaiwudb_uninstall. sh
    ├── kaiwudb_upgrade. sh
    ├── process_bar. sh
    └── utils. sh

2 directories,  16 files

最后,通过命令–multi-replica即可进行多副本集群安装,如果使用命令–single-replica是进行单副本集群安装,过程中会提示需要输入密码,在输入密码等待几分钟后(取决你的网速),当提示“INSTALL COMPLETED: KaiwuDB has been installed successfuly!”时,表示已经安装集群成功了。

接下来,就是初始化并启动集群,输入命令cluster -i,就会进行初始化集群,在几秒后会输出“Cluster init successfully”即表示初始化并启动集群成功,这里有一个小小问题,就是上面我只安装了,但是在写文章,耽误了大概半小时,没有进行初始化集群操作,再进行初始化会提示我KWDB没有安装,需要注意一下。

备注:多副本集群初始化和启动大约需要 10 秒左右时间,在此期间,如果有节点死亡,可能会导致集群无法触发高可用机制。

初始化集群会发生的问题:

在进行安装与初始化集群的过程中,我也遇到了不少问题,希望可以给其它同学带来参考的价值。

①. 提示:“lost connection. 9 15:04:05 Distribute files failed in 120.55.76.178:Connection closed by 120.55.76.178 port 22

原因:因为以为是第一台kwdb-db-01机器操作,所以,不需要进行设置SSH免登录,结果发现还是需要进行SSH免登录。
解决:执行ssh-copy-id -f -i ~/.ssh/id_rsa.pub -o StrictHostKeyChecking=no

②. 提示:“[ERROR] 2025-05-29 23:38:42 install exec failed in 118.178.56.167: dpkg: error: dpkg frontend lock was locked by another process with pid 4343 Note: removing the lock file is always wrong, and can end up damaging the locked area and the entir e system.”
原因:可能存在网络波动,连接超时的原因。
解决:重新进行安装命令“./deploy.sh install --multi-replica”即可正常成功。

③. 提示:“KaiwuDB not install.”
原因:未知,如果再执行命令“./deploy.sh install --multi-replica”会提示已经安装了。
解决:机器重新构建镜像即可,恢复初始化状态。

3.5 创建时序数据库:

在集群搭建完成后,我们可以来查看一下第一个节点相关的集群状态信息,使用“systemctl status kaiwudb”来进行查看,会输出很多相关KWDB实例服务相关的信息:

①. 显示绿色的active(running)表示服务正在运行中,服务是正常在使用的,包括服务使用的内存大小,CPU的运行时间。

②. 系统命令kaisudb.service服务,实际上是执行了一条/usr/local/kaiwudb/bin/kwbase这条命令,里面包括一些参数,比如监听服务器的IP与端口,对外通信的节点IP与端口、数据库存储的路径、加入指定IP集群的方式。

③. 还会生成RPC客户端连接的方式,通过/usr/local/kaiwudb/bin/kwbase命令,用来访问数据库控制台的shell命令。

另外,在连接到访问数据库shell控制台后,可以看到基本上操作的命令也是跟平时写的SQL语句是一样的,但是在创建时序数据库的时候,有些区别,需要加一个ts的关键词来指定是时序数据库。

create ts database factory_ts;

3.6 分析KWDB集群服务状态:

在集群搭建完成后,我们也可以来查看一下其它的每个节点相关的集群状态信息,这里我们来进行操作kwdb-db-02和kwdb-db-03来查看一下状态,这里可以看到在kwdb-db-02和kwdb-db-03的服务也是都是active(running)启动起来的,而且各自都有各自的RPC客户端启动命令。

上面创建的时序数据库factory_ts也可以发现已经同步到其它3个KWDB服务节点上了,说明集群已经成功了,而且说明数据也是可以进行同步的。

3.7 创建时序数据表:

接下来,我们来一起在时序数据库factory_ts下面创建3张表(在实际的场景中,肯定会更多),时序表(TIME SERIES TABLE表)是用于存储时间序列数据的数据。这里面创建三张时序数据表:测试记录表(test_records)、机器手臂数据(robot_arms)、警告信息(alerts)三张纬度的时序数据表。

①. 创建表时不能使用IF NOT EXISTS语句,会报错,不过,建议还是加上比较好。

②. 时间戳字段,时序数据表对于时间类型的列(TIMESTAMPTZ 或 TIMESTAMP),默认值可以是常量,也可以是 now() 函数。如果默认值类型与列类型不匹配,设置默认值时,系统报错。支持默认值设置为 NULL。

③. 字段也不支持double类型,这里我们只能使用Float类型字段。

④. Tags表示标签列表,支持添加一个或多个标签定义,最多可指定 128 个标签。标签定义包含标签名和数据类型,标签名的最大长度为 128 字节,支持指定 NOT NULL,默认为空值。不支持 TIMESTAMP、TIMESTAMPTZ、NVARCHAR 和 GEOMETRY 数据类型。

⑤. Primary_tags表示主标签列表,支持添加一个或多个主标签名称,最多可指定 4 个。主标签必须包含在标签列表内且指定为 NOT NULL,不支持浮点类型和除 VARCHAR 之外的变长数据类型。

如下所示是插入的时序数据表的SQL语句:


CREATE 
TABLE factory_ts.
test_records (k_TIMESTAMPTZ 
TIMESTAMPTZ 
NOT 
NULL, test_type 
VARCHAR 
NOT 
NULL, test_result 
VARCHAR 
NOT 
NULL, fail_reason 
VARCHAR 
NOT 
NULL,

start_time  TIMESTAMPTZ  NOT  NULL, end_time  TIMESTAMPTZ  NOT  NULL, phone_model  VARCHAR  NOT  NULL, imei  VARCHAR  NOT  NULL,
battery_level  FLOAT  NOT  NULL, signal_strength  FLOAT  NOT  NULL, test_duration  FLOAT  NOT  NULL, supplier  VARCHAR  NOT  NULL,
material_batch  VARCHAR  NOT  NULL, production_line  VARCHAR  NOT  NULL, operator  VARCHAR  NOT  NULL,
quality_inspector  VARCHAR  NOT  NULL, mes_order_number  VARCHAR  NOT  NULL, alert_count  INT  NOT  NULL,
quality_score  FLOAT  NOT  NULLTAGS ( batch_number  VARCHAR  NOT  NULL, product_model  VARCHAR  NOT  NULL
PRIMARY  TAGS(batch_number);

CREATE  TABLE factory_ts. robot_arms (
k_TIMESTAMPTZ  TIMESTAMPTZ  NOT  NULL, availability  FLOAT  NOT  NULL, performance  FLOAT  NOT  NULL, quality  FLOAT  NOT  NULL, oee  FLOAT  NOT  NULL,
total_faults  INT  NOT  NULL, fault_rate  FLOAT  NOT  NULL,
last_fault_time  TIMESTAMPTZ  NOT  NULL, last_fault_type  VARCHAR  NOT  NULL, operation_speed  FLOAT  NOT  NULL, temperature  FLOAT  NOT  NULL, voltage  FLOAT  NOT  NULL, current  FLOAT  NOT  NULL,
last_maintenance_time  TIMESTAMPTZ  NOT  NULL, next_maintenance_time  TIMESTAMPTZ  NOT  NULL, operation_start_time  TIMESTAMPTZ  NOT  NULL, total_runtime  FLOAT  NOT  NULL, status  VARCHAR  NOT  NULL
) TAGS(
serial_number  VARCHAR  NOT  NULL, model  VARCHAR  NOT  NULL, position  VARCHAR  NOT  NULL
PRIMARY  TAGS(serial_number)  ACTIVETIME 30d;

CREATE  TABLE factory_ts. alerts (
k_TIMESTAMPTZ  TIMESTAMPTZ  NOT  NULL, alert_type  VARCHAR  NOT  NULL, level  VARCHAR  NOT  NULL, title  VARCHAR  NOT  NULL, status  VARCHAR  NOT  NULL, handler  VARCHAR  NOT  NULL,
handle_method  VARCHAR  NOT  NULL, handle_time  TIMESTAMPTZ  NOT  NULL, resolve_time  TIMESTAMPTZ  NOT  NULL, parameter_name  VARCHAR  NOT  NULL, current_value  FLOAT  NOT  NULL, threshold_value  FLOAT  NOT  NULL, unit  VARCHAR  NOT  NULL,
product_batch  VARCHAR  NOT  NULL, production_line  VARCHAR  NOT  NULL, workstation  VARCHAR  NOT  NULL, operator  VARCHAR  NOT  NULL
) TAGS(
source  VARCHAR  NOT  NULL, category  VARCHAR  NOT  NULL,
notify_group  VARCHAR  NOT  NULL
PRIMARY  TAGS(source)  ACTIVETIME 30d;

四、KWDB应用开发之Golang实战时序数据火力覆盖:

KWDB 为不同角色开发者提供以下支持(包括但不限于):

①. 为开发者提供通用连接接口,具备高速写入、极速查询、SQL 支持、随需压缩、数据生命周期管理、集群部署等特性,与第三方工具无缝集成,降低开发及学习难度,提升开发使用效率。

②. 为运维管理人员提供快速安装部署、升级、迁移、监控等能力,降低数据库运维管理成本。

4.1 使用 pgx 驱动连接 KWDB:

从官方的文档来看,pgx 是用 
Go 语言编写的 
PostgreSQL 驱动和工具包,提供了高性能的低级接口,支持用户直接利用 
PostgreSQL 的特性。pgx 还包含一个适配器,与标准的数据库或 
SQL 接口兼容,方便开发者进行数据库操作。注意:需要安装 
Go 
1.16 及以上版本。


KWDB 支持用户通过 pgx 驱动连接数据库,并执行创建、插入和查询操作。下面演示了如何使用  Go 语言通过 pgx 驱动连接  KWDB

go env -w  GOPROXY= https: //goproxy.cn
go mod tidy
go get github. com/jackc/pgx/v5

接下来,通过使用代码编写3张表的数据生成的代码,可以进行批量插入的动作。

package main


import (
     "context"
     "fmt"
     "log"
     "time"

     "github.com/jackc/pgx/v5"
)

func  main( ) {
     // 使用账号密码连接
    url := fmt. Sprintf( "postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s",
         "root""120.55.76.178:26257""factory_ts",
         "./certs/ca.crt",
         "./certs/client.root.crt",
         "./certs/client.root.key")

    config, err := pgx. ParseConfig(url)
     if err != nil {
        log. Fatalf( "error parsing connection configuration: %v", err)
    }

    config. RuntimeParams[ "application_name"] =  "factory_timeseries"
    conn, err := pgx. ConnectConfig(context. Background(), config)
     if err != nil {
        log. Fatalf( "error connecting to database: %v", err)
    }
    defer conn. Close(context. Background())

     // 批量插入告警数据
    _, err = conn. Exec(context. Background(),  `
        INSERT INTO factory_ts.alerts VALUES
        ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21),
        ($22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42);
    `
,
         // 第一条告警记录
        time. Now(),  "温度异常""高""设备温度过高""设备A温度超过阈值""未处理""张工""停机检查",
        time. Now(), time. Now(). Add(time. Hour),  "温度"85.580.0"℃",
         "BATCH001""生产线1""工位A""王工",
         "设备A""设备告警""运维组",
         // 第二条告警记录
        time. Now(),  "电压波动""中""电压不稳定""设备B电压波动较大""已处理""李工""调整电源",
        time. Now(), time. Now(). Add(time. Hour),  "电压"235.0220.0"V",
         "BATCH002""生产线2""工位B""赵工",
         "设备B""电气告警""电气组",
    )
     if err != nil {
        log. Printf( "error inserting alert data: %v\n", err)
    }  else {
        fmt. Println( "告警数据插入成功!")
    }

     // 批量插入机器手臂数据
    _, err = conn. Exec(context. Background(),  `
        INSERT INTO factory_ts.robot_arms VALUES
        ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21),
        ($22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42);
    `
,
         // 第一个机器手臂数据
        time. Now(),  95.592.098.085.5,
         20.5, time. Now(). Add(- 24*time. Hour),  "轻微故障",
         60.036.5220.010.0,
        time. Now(). Add(- 7* 24*time. Hour), time. Now(). Add( 7* 24*time. Hour), time. Now(). Add(- 8*time. Hour),  2000.5,
         "运行中""RA001""A型""生产线1",
         // 第二个机器手臂数据
        time. Now(),  88.090.095.075.2,
         51.2, time. Now(). Add(- 12*time. Hour),  "通信故障",
         45.038.0218.012.0,
        time. Now(). Add(- 14* 24*time. Hour), time. Now(). Add( 14* 24*time. Hour), time. Now(). Add(- 4*time. Hour),  1500.8,
         "待维护""RA002""B型""生产线2",
    )
     if err != nil {
        log. Printf( "error inserting robot arm data: %v\n", err)
    }  else {
        fmt. Println( "机器手臂数据插入成功!")
    }

     // 批量插入测试记录数据
    _, err = conn. Exec(context. Background(),  `
        INSERT INTO factory_ts.test_records VALUES
        ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21),
        ($22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42);
    `
,
         // 第一条测试记录
        time. Now(),  "性能测试""通过""",
        time. Now(). Add(-time. Hour), time. Now(),
         "iPhone 14""123456789"95.085.03600.0,
         "供应商A""MAT001""生产线1""张三""李四",
         "MES001"098.5,
         "BATCH001""产品型号A",
         // 第二条测试记录
        time. Now(),  "稳定性测试""失败""信号不稳定",
        time. Now(). Add(- 2*time. Hour), time. Now(). Add(-time. Hour),
         "iPhone 13""987654321"88.072.07200.0,
         "供应商B""MAT002""生产线2""王五""赵六",
         "MES002"285.0,
         "BATCH002""产品型号B",
    )
     if err != nil {
        log. Printf( "error inserting test record data: %v\n", err)
    }  else {
        fmt. Println( "测试记录数据插入成功!")
    }
}

执行上面的代码,结果发现报错,提示“insert (row 2) has more expressions than target columns, 42 expressions for 20 targets (SQLSTATE 42601)”,执行报错后,发现shell控制台也是报错了,提示“write:broken pipe”。

然后,我查看第一台kwdb-db-01服务节点的状态,发现从active(running)变成了failed(code=exited,status=2),然后,输出了一段go相关的报错信息,但是只能看到一少部分信息,直接报错就终止服务了。

接着,只能查找更详细的日志信息了,由于也是第一次接触,没有找到像常见软件在的日志目录中,后面在官方文档中找到了日志系统解读,感兴趣的同学可以阅读一下。

在/var/lib/kaiwudb/logs日志目录文件下,我们找到了相关日志,如下为详细的报错信息““INSERT INTO factory_ts.public.robot_arms DEFAULT VALUES”: runtime error: index out of range [21] with length 0”,官方人员可以针对以下报错看看能不能修改这个bug。


E250529 
08:
53:
32.295737 
81461 sql/conn_executor.
go:
969  [n1,client=
114.255
.155
.2:
64850,hostssl,user=root] a 
SQL panic has occurred 
while executing 
"INSERT INTO factory_ts.public.robot_arms DEFAULT VALUES": runtime 
error: index out 
of range [
21with length 
0

E250529  08: 53: 32.295877  81461 util/log/crash_reporting. go: 222  [n1,client= 114.255 .155 .2: 64850,hostssl,user=root] a panic has occurred!
panic: runtime  error: index out  of range [ 21with length  0 [recovered]
     panic: panic  while executing  1  statementsINSERT  INTO _. _. _  DEFAULT  VALUES; caused by runtime  error: index out  of range [ 21with length  0

goroutine  81461 [running]:
gitee. com/kwbasedb/kwbase/pkg/sql.(*connExecutor). closeWrapper( 0xc004ee80000x4cef3b00xc0051b94000x421c6c00xc005548990)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/conn_executor. go: 983 + 0x412
gitee. com/kwbasedb/kwbase/pkg/sql.(* Server). ServeConn. func1( 0xc004ee80000x4cef3b00xc0051b9400)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/conn_executor. go: 606 + 0x65
panic( 0x421c6c00xc005548990)
    /usr/local/go/src/runtime/panic. go: 965 + 0x1b9
gitee. com/kwbasedb/kwbase/pkg/sql/sem/tree.(* PlaceholderTypesInfo). ValueType(...)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/sem/tree/placeholders. go: 128
gitee. com/kwbasedb/kwbase/pkg/sql.(*connExecutor). execPrepare( 0xc004ee80000x4cef4580xc0062862400xc0043441740x3a0x4d1af980xc006bffd500xc0043441b20xf80x2a, ...)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/conn_executor_prepare. go: 130 + 0xb5e
gitee. com/kwbasedb/kwbase/pkg/sql.(*connExecutor). execCmd( 0xc004ee80000x4cef3b00xc0051b94000x00x0)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/conn_executor. go: 1701 + 0x2d98
gitee. com/kwbasedb/kwbase/pkg/sql.(*connExecutor). run( 0xc004ee80000x4cef3b00xc0051b94000xc0002968880x54000x150000xc0002969200xc0070533300x00x0)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/conn_executor. go: 1501 + 0x1fc
gitee. com/kwbasedb/kwbase/pkg/sql.(* Server). ServeConn( 0xc00078cb000x4cef3b00xc0051b94000xc004ee80000x54000x150000xc0002969200xc0070533300x00x0)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/conn_executor. go: 608 + 0xce
gitee. com/kwbasedb/kwbase/pkg/sql/pgwire.(*conn). processCommandsAsync. func1( 0xc000357b100xc004db83180x4cef3b00xc0051b94000xc0070533300xc00078cb000xc0042dd8000x4d08a980xc0055a20000xc0041fdaa0, ...)
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/pgwire/conn. go: 750 + 0x505
created by gitee. com/kwbasedb/kwbase/pkg/sql/pgwire.(*conn). processCommandsAsync
    /home/inspur/src/gitee. com/kwbasedb/kwbase/pkg/sql/pgwire/conn. go: 664 + 0x17e

上面原因找出来了,就是数据库是20个字段,结果我们插入的代码是21个,所以会报错。通过修复这个bug的问题,就可以得到如下一直循环的代码:

package main


import (
     "context"
     "fmt"
     "log"
     "math/rand"
     "time"

     "github.com/jackc/pgx/v5"
)

// 生成随机时间
func  randomTime() time. Time {
    now := time. Now()
    minusHours := rand. Float64() *  24 *  7  // 随机生成过去7天内的时间
     return now. Add(-time. Duration(minusHours) * time. Hour)
}

// 生成随机浮点数
func  randomFloat(min, max float64) float64 {
     return min + rand. Float64()*(max-min)
}

// 生成随机整数
func  randomInt(min, max int) int {
     return min + rand. Intn(max-min+ 1)
}

// 从切片中随机选择一个元素
func  randomChoice(items []string) string {
     return items[rand. Intn( len(items))]
}

// 生成随机字符串
func  randomString(prefix string, length int) string {
     const charset =  "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    result :=  make([]byte, length)
     for i := range result {
        result[i] = charset[rand. Intn( len(charset))]
    }
     return prefix +  string(result)
}

func  main( ) {
     // 初始化随机数生成器
    rand. Seed(time. Now(). UnixNano())

     // 使用账号密码连接
    url := fmt. Sprintf( "postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s",
         "root""47.110.144.145:26257""defaultdb",
         "./certs/ca.crt",
         "./certs/client.root.crt",
         "./certs/client.root.key")

    config, err := pgx. ParseConfig(url)
     if err != nil {
        log. Fatalf( "error parsing connection configuration: %v", err)
    }

    config. RuntimeParams[ "application_name"] =  "factory_timeseries"
    conn, err := pgx. ConnectConfig(context. Background(), config)
     if err != nil {
        log. Fatalf( "error connecting to database: %v", err)
    }
    defer conn. Close(context. Background())

     for {
         // 预定义一些随机选择项
        alertLevels := []string{ "低""中""高""紧急"}
        alertStatus := []string{ "未处理""处理中""已解决""已关闭"}
        handleMethods := []string{ "远程处理""现场处理""停机检查""更换零件"}
        productionLines := []string{ "生产线1""生产线2""生产线3""生产线4"}
        workstations := []string{ "工位A""工位B""工位C""工位D"}
        notifyGroups := []string{ "运维组""工程组""质检组""管理组"}
        faultTypes := []string{ "轻微故障""传感器异常""电机过热""通信中断""校准偏差"}
        testTypes := []string{ "性能测试""稳定性测试""压力测试""功能测试"}
        testResults := []string{ "通过""未通过""待复检"}
        failReasons := []string{ """性能不达标""稳定性差""参数超限"}
        phoneModels := []string{ "iPhone 14""iPhone 14 Pro""iPhone 15""iPhone 15 Pro"}
        suppliers := []string{ "供应商A""供应商B""供应商C""供应商D"}

         // 插入告警数据
        _, err = conn. Exec(context. Background(),  `
            INSERT INTO factory_ts.alerts (k_timestamptz, alert_type, level, title, status, handler, 
                handle_method, handle_time, resolve_time, parameter_name, current_value, threshold_value, unit, 
                product_batch, production_line, workstation, operator, source, category, notify_group)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20);
        `
,
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             "设备告警",
             randomChoice(alertLevels),
             "设备温度异常",
             randomChoice(alertStatus),
             randomString( "OP"4),
             randomChoice(handleMethods),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             "温度",
             randomFloat( 6090),
             80.0,
             "℃",
             randomString( "BATCH"4),
             randomChoice(productionLines),
             randomChoice(workstations),
             randomString( "WK"4),
             "温度异常",
             "设备温度过高",
             randomChoice(notifyGroups),
        )
         if err != nil {
            log. Printf( "error inserting alert data: %v\n", err)
        }  else {
            fmt. Println( "告警数据插入成功!")
        }

         // 插入机器手臂数据
        _, err = conn. Exec(context. Background(),  `
            INSERT INTO factory_ts.robot_arms (k_timestamptz, availability, performance, quality, oee, 
                total_faults, fault_rate, last_fault_time, last_fault_type, operation_speed, temperature, voltage, current, 
                last_maintenance_time, next_maintenance_time, operation_start_time, total_runtime, status,
                serial_number, model, position)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21);
        `
,
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomFloat( 90100),
             randomFloat( 85100),
             randomFloat( 90100),
             randomFloat( 8095),
             randomInt( 05),
             randomFloat( 02),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomChoice(faultTypes),
             randomFloat( 5070),
             randomFloat( 3040),
             randomFloat( 210230),
             randomFloat( 812),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomTime(). Add( 30* 24*time. Hour). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomFloat( 10003000),
             "运行中",
             randomString( "RA"4),
             randomString( "MODEL"2),
             randomChoice(productionLines),
        )
         if err != nil {
            log. Printf( "error inserting robot arm data: %v\n", err)
        }  else {
            fmt. Println( "机器手臂数据插入成功!")
        }

         // 插入测试记录数据
        _, err = conn. Exec(context. Background(),  `
            INSERT INTO factory_ts.test_records (k_timestamptz, test_type, test_result, fail_reason, 
                start_time, end_time, phone_model, imei, battery_level, signal_strength, test_duration, supplier, 
                material_batch, production_line, operator, quality_inspector, mes_order_number, alert_count, quality_score,
                batch_number, product_model)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21);
        `
,
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomChoice(testTypes),
             randomChoice(testResults),
             randomChoice(failReasons),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomTime(). Format( "2006-01-02 15:04:05.000Z07:00"),
             randomChoice(phoneModels),
             randomString( ""15),
             randomFloat( 80100),
             randomFloat( 70100),
             randomFloat( 18007200),
             randomChoice(suppliers),
             randomString( "MAT"4),
             randomChoice(productionLines),
             randomString( "OP"4),
             randomString( "QA"4),
             randomString( "MES"6),
             randomInt( 05),
             randomFloat( 90100),
             randomString( "BATCH"4),
             randomString( "PM"4),
        )
         if err != nil {
            log. Printf( "error inserting test record data: %v\n", err)
        }  else {
            fmt. Println( "测试记录数据插入成功!")
        }
    }
}

可以看到这里在代码修复后,可以往KWDB数据库中写入数据也成功了,在查看其它节点02和节点03时,发现数据也是可以进行同步过来的,都显示38722条数据。

4.2 KWDB集群监控集成:

KaiwuDB内置监控平台,但是KWDB是浪潮KaiwuDB的开源版本哈,目前KWDB已经捐赠给开放原子开源基金会咯,是基金会的孵化项目,所以部署 KWDB 集群后,需要自己通过实际需求选择另一种监控方案,集成Prometheus 和 Grafana 开源组件监控集群状态。

Prometheus 是一款开源的系统监控和告警平台,用于采集和存储 KWDB 集群的监控和性能指标信息。Grafana 是一款开源的数据可视化工具,可以从多种数据源获取数据,并在数据面板中展示所有数据。Grafana 读取 KWDB 集群的指标数据,以可视化方式展示数据库的集群节点状态、监控指标。

KWDB 使用 Prometheus 采集和存储 KWDB 集群的监控和性能指标信息,使用 Grafana 作为可视化组件进行展示,接下来介绍如何部署 Prometheus 和 Grafana。

部署 Prometheus:

wget 
https:
//github.com/prometheus/prometheus/releases/download/v2.53.4/prometheus-2.53.4.linux-amd64.tar.gz

tar -zxvf prometheus- 2.53 .4. linux-amd64. tar. gz

在prometheus-2.53.0.linux-amd64 目录下创建 rules 子目录,再下载 Prometheus 告警规则和聚合规则配置文件并将其放置在 rules 子目录。

KWDB 在 monitoring/rules 目录下提供 alerts.rules.yml 和 aggregation.rules.yml 文件的Prometheus 告警规则和 Prometheus 聚合规则。

①. alerts.rules.yml:告警规则配置文件。

②. aggregation.rules.yml:聚合规则配置文件。

cd prometheus-2.53.0.linux-amd64 && vi prometheus.yml

以下是配置文件示例,可以根据实际部署情况,调整配置参数及取值。

Prometheus configuration 
for kaiwudb clusters.

Requires prometheus  2.X
#
Run  with:
# $ prometheus -config. file=prometheus. yml
global:
   scrape_interval: 10s
   evaluation_interval: 10s

rule_files:
"rules/alerts.rules.yml"
"rules/aggregation.rules.yml"

scrape_configs:
  -  job_name'kaiwudb'
     metrics_path'/_status/vars'
    #  Insecure  mode:
     scheme'http'
    #  Secure  mode:
    #  scheme'https'
     tls_config:
       insecure_skip_verifytrue

     static_configs:
    -  targets: [ '118.178.131.120:8080''47.110.254.183:8080''47.110.144.145:8080']
       labels:
         cluster'my-kaiwudb-cluster'

启动 Prometheus 服务,默认情况下,Prometheus 的启动端口是 9090:

./prometheus --config.file=prometheus.yml

通过IP:9090端口即可访问prometheus的web地址。

部署 Grafana:

下载 Grafana 安装包并解压缩到本地目录,以下示例下载 Grafana v11.1.0 安装包:

wget 
https:
//dl.grafana.com/enterprise/release/grafana-enterprise-11.1.0.linux-amd64.tar.gz

tar -zxvf grafana-enterprise- 11.1 .0. linux-amd64. tar. gz
# 启动  Grafana 服务
cd grafana-v11 .1 .0/bin && ./grafana-server

此时,通过ip:3000端口访问grafana的web服务地址。

配置 Grafana以及面板:

登录 Grafana控制台,进行添加 Prometheus 数据源,默认情况下,Grafana 的登录地址是 http://IP:3000。用户可以使用默认的用户名和密码(均为 admin)登录 Grafana。

在 Grafana 左侧边栏,单击 Connections > Data sources,在 Data sources 窗口,单击 Add data source,然后选择 Prometheus,配置完Prometheus Server 的 IP 地址,单击 Save & test,保存 Prometheus 数据源。

登录 Grafana控制台,进行添加 Prometheus 数据源,默认情况下,Grafana 的登录地址是 http://IP:3000。用户可以使用默认的用户名和密码(均为 admin)登录 Grafana。

在 Grafana 左侧边栏,单击 Connections > Data sources,在 Data sources 窗口,单击 Add data source,然后选择 Prometheus,配置完Prometheus Server 的 IP 地址,单击 Save & test,保存 Prometheus 数据源。

导入 Grafana 面板,默认情况下,KWDB 在 monitoring/grafana-dashboards 目录下提供以下指标面板模板。用户将指标面板模板(.json 格式)导入 Grafana 后,即可监控 KWDB 集群。

①. 概览:展示集群和节点的关键指标 - KaiwuDB_Console_Overview.json

②. 硬件:展示硬件相关的监控指标 - KaiwuDB_Console_Hardware.json

③. 运行时:展示运行时相关的监控指标 - KaiwuDB_Console_Runtime.json

④. SQL:展示 SQL 相关的监控指标 - KaiwuDB_Console_SQL.json

⑤. 存储:展示存储相关的监控指标 - KaiwuDB_Console_Storage.json

⑥. 副本:展示副本相关的监控指标 - KaiwuDB_Console_Replication.json

⑦. 分布式:展示分布式相关的监控指标 - KaiwuDB_Console_Distribution.json

⑧. 队列:展示队列相关的监控指标 - KaiwuDB_Console_Queue.json

⑨. 慢查询:展示慢查询相关的监控指标 - KaiwuDB_Console_Slow_Query.json

4.3 压力测试对比:

以下为三个表(test_records、robot_arms、alerts时序数据表)同时插入语句,但是是顺序同步执行的,可以看到从100条数据到2000条数据,不同的批量插入方式存在不同的时间差,经过对比可以发现:

①. 这里并没有使用到地址池。

②. 可以发现在批量插入时,由于组装的数据量越少的表,在批量2000时插入的时间越短,但是组装数据量越多的表,在批量100条时插入时间越短。

接下来,修改脚本,将同步改为异步的情况,就是增加三个goroutine可以真正并行执行:

①. 告警数据插入goroutine

②. 机器手臂数据插入goroutine

③. 测试记录数据插入goroutine

每个goroutine都会立即启动,互不干扰,同时在各自的循环中通过 time.Sleep 来控制数据插入的频率。这样可以确保三个数据插入任务并行执行,提高数据生成的效率。

接下来使用三台机器同时进行数据的写入,但是这里不是三台机器同时往一台机器进行写入数据,而是分不同的节点,刚好对应开始写入数据,刚好测试一下,在集群的情况下,数据一致性准不准,但是这个也跟网速有一定的关系。

可以从上面的结果看出,基本上数据同步也是比较快的,误差平均在100以内,而且还有可能是没有同时执行的情况下,最终结果的误差可能会更小,这里需要注意如果在部署集群的话,使用的是云厂商的服务器,最好在同一个地区,这样网络延迟会小一点。

4.4 时序数据库查询性能:

接下来,我们使用10个SQL查询的语句来进行KWDB时序数据库的试一下查询效率的测试,以下是10个SQL语句相关查询的内容:

以下是相关查询的代码:

package main


import (
     "context"
     "fmt"
     "log"
     "time"

     "github.com/jackc/pgx/v5"
)


func  main( ) {
    ctx := context. Background()
    conn, err :=  createDBConnection()
     if err != nil {
        log. Fatalf( "连接数据库失败: %v", err)
    }
    defer conn. Close(ctx)

     // 执行所有查询
    queryFuncs := [] func(context. Context, *pgx. Conn) error{
        queryTestRecordsTop 10,
        queryRobotArmFaults,
        queryAlertsByProductionLine,
        queryTestQualityDistribution,
        queryRobotArmPerformance,
        queryHighPriorityAlerts,
        queryTestFailureAnalysis,
        queryRobotArmMaintenance,
        queryAlertHandlingEfficiency,
        queryTestTimeDistribution,
    }

     for _, fn := range queryFuncs {
        fmt. Println( "\n----------------------------------------")
         if err :=  fn(ctx, conn); err != nil {
            log. Printf( "查询执行失败: %v\n", err)
        }
    }
}

// createDBConnection 创建数据库连接
func  createDBConnection() (*pgx. Conn, error) {
    url := fmt. Sprintf( "postgresql://%s@%s/%s?sslmode=verify-full&sslrootcert=%s&sslcert=%s&sslkey=%s",
         "root""118.178.131.120:26257""defaultdb",
         "./certs/ca.crt",
         "./certs/client.root.crt",
         "./certs/client.root.key")

    config, err := pgx. ParseConfig(url)
     if err != nil {
         return nil, fmt. Errorf( "error parsing connection configuration: %v", err)
    }

    config. RuntimeParams[ "application_name"] =  "factory_timeseries"
    conn, err := pgx. ConnectConfig(context. Background(), config)
     if err != nil {
         return nil, fmt. Errorf( "error connecting to database: %v", err)
    }
     return conn, nil
}


// 查询各生产线最近24小时的测试记录数量TOP 10
func  queryTestRecordsTop 10(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            production_line,
            COUNT(*) as test_count,
            AVG(quality_score) as avg_quality
        FROM factory_ts.test_records
        WHERE k_timestamptz >= NOW() - INTERVAL '24 hours'
        GROUP BY production_line
        ORDER BY test_count DESC
        LIMIT 10;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "生产线测试记录统计(24小时):")
    fmt. Printf( "%-15s %-10s %-10s\n""生产线""测试数量""平均质量分")
     for rows. Next() {
         var line string
         var count int
         var quality float64
         if err := rows. Scan(&line, &count, &quality); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-15s %-10d %-10.2f\n", line, count, quality)
    }
     return nil
}

// 查询不同故障类型的机器手臂数量统计
func  queryRobotArmFaults(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            last_fault_type,
            COUNT(*) as fault_count,
            AVG(fault_rate) as avg_fault_rate
        FROM factory_ts.robot_arms
        WHERE last_fault_type IS NOT NULL
        GROUP BY last_fault_type
        ORDER BY fault_count DESC;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "机器手臂故障类型统计:")
    fmt. Printf( "%-20s %-10s %-15s\n""故障类型""故障数量""平均故障率(%)")
     for rows. Next() {
         var faultType string
         var count int
         var rate float64
         if err := rows. Scan(&faultType, &count, &rate); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-20s %-10d %-15.2f\n", faultType, count, rate* 100)
    }
     return nil
}

// 查询各生产线的告警统计
func  queryAlertsByProductionLine(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            production_line,
            COUNT(*) as alert_count,
            COUNT(DISTINCT alert_type) as alert_types,
            AVG(CASE WHEN status = '已解决' THEN 1 ELSE 0 END) as resolve_rate
        FROM factory_ts.alerts
        GROUP BY production_line
        ORDER BY alert_count DESC;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "生产线告警统计:")
    fmt. Printf( "%-15s %-10s %-10s %-15s\n""生产线""告警数量""告警类型""解决率(%)")
     for rows. Next() {
         var line string
         var count, types int
         var resolveRate float64
         if err := rows. Scan(&line, &count, &types, &resolveRate); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-15s %-10d %-10d %-15.2f\n", line, count, types, resolveRate* 100)
    }
     return nil
}

// 查询测试记录的质量分布
func  queryTestQualityDistribution(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            WIDTH_BUCKET(quality_score, 0, 100, 10) as score_range,
            COUNT(*) as count,
            MIN(quality_score) as min_score,
            MAX(quality_score) as max_score
        FROM factory_ts.test_records
        GROUP BY score_range
        ORDER BY score_range;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "测试质量分数分布:")
    fmt. Printf( "%-15s %-10s %-10s %-10s\n""分数区间""数量""最低分""最高分")
     for rows. Next() {
         var range_, count int
         var min, max float64
         if err := rows. Scan(&range_, &count, &min, &max); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%d-%-12d %-10d %-10.2f %-10.2f\n", (range_- 1)* 10, range_* 10, count, min, max)
    }
     return nil
}

// 查询机器手臂性能指标
func  queryRobotArmPerformance(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            position,
            ROUND(AVG(availability)::numeric, 2) as avg_availability,
            ROUND(AVG(performance)::numeric, 2) as avg_performance,
            ROUND(AVG(quality)::numeric, 2) as avg_quality,
            ROUND(AVG(oee)::numeric, 2) as avg_oee
        FROM factory_ts.robot_arms
        GROUP BY position
        ORDER BY avg_oee DESC;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "机器手臂性能指标:")
    fmt. Printf( "%-15s %-10s %-10s %-10s %-10s\n""位置""可用率""性能""质量""OEE")
     for rows. Next() {
         var pos string
         var avail, perf, qual, oee float64
         if err := rows. Scan(&pos, &avail, &perf, &qual, &oee); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-15s %-10.2f %-10.2f %-10.2f %-10.2f\n", pos, avail, perf, qual, oee)
    }
     return nil
}

// 查询未处理的高优先级告警
func  queryHighPriorityAlerts(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            alert_type,
            level,
            title,
            production_line,
            workstation,
            k_timestamptz
        FROM factory_ts.alerts
        WHERE status = '未处理'
            AND level IN ('高', '紧急')
        ORDER BY 
            CASE level 
                WHEN '紧急' THEN 1
                WHEN '高' THEN 2
            END,
            k_timestamptz DESC;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "未处理的高优先级告警:")
    fmt. Printf( "%-15s %-6s %-20s %-15s %-10s %-20s\n""类型""级别""标题""生产线""工位""时间")
     for rows. Next() {
         var alertType, level, title, line, station string
         var ts time. Time
         if err := rows. Scan(&alertType, &level, &title, &line, &station, &ts); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-15s %-6s %-20s %-15s %-10s %-20s\n"
            alertType, level, title, line, station, ts. Format( "2006-01-02 15:04:05"))
    }
     return nil
}

// 查询测试记录失败原因分析
func  queryTestFailureAnalysis(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            fail_reason,
            COUNT(*) as failure_count,
            ROUND(AVG(quality_score)::numeric, 2) as avg_quality,
            COUNT(DISTINCT phone_model) as affected_models
        FROM factory_ts.test_records
        WHERE test_result = '未通过'
            AND fail_reason IS NOT NULL
        GROUP BY fail_reason
        ORDER BY failure_count DESC;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "测试失败原因分析:")
    fmt. Printf( "%-20s %-10s %-15s %-15s\n""失败原因""失败次数""平均质量分""影响机型数")
     for rows. Next() {
         var reason string
         var count, models int
         var quality float64
         if err := rows. Scan(&reason, &count, &quality, &models); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-20s %-10d %-15.2f %-15d\n", reason, count, quality, models)
    }
     return nil
}

// 查询机器手臂维护计划
func  queryRobotArmMaintenance(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            serial_number,
            model,
            position,
            total_runtime,
            last_maintenance_time,
            next_maintenance_time,
            total_faults
        FROM factory_ts.robot_arms
        WHERE next_maintenance_time <= NOW() + INTERVAL '7 days'
        ORDER BY next_maintenance_time;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "即将需要维护的机器手臂:")
    fmt. Printf( "%-15s %-10s %-10s %-15s %-20s %-20s %-10s\n",
         "序列号""型号""位置""运行时间(h)""上次维护""下次维护""故障数")
     for rows. Next() {
         var sn, model, pos string
         var runtime, faults float64
         var lastMaint, nextMaint time. Time
         if err := rows. Scan(&sn, &model, &pos, &runtime, &lastMaint, &nextMaint, &faults); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-15s %-10s %-10s %-15.2f %-20s %-20s %-10.0f\n",
            sn, model, pos, runtime,
            lastMaint. Format( "2006-01-02 15:04"),
            nextMaint. Format( "2006-01-02 15:04"),
            faults)
    }
     return nil
}

// 查询告警处理效率分析
func  queryAlertHandlingEfficiency(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        WITH alert_times AS (
            SELECT 
                alert_type,
                level,
                handler,
                handle_method,
                EXTRACT(EPOCH FROM (handle_time - k_timestamptz)) as response_time,
                EXTRACT(EPOCH FROM (resolve_time - handle_time)) as resolve_time
            FROM factory_ts.alerts
            WHERE status = '已解决'
                AND handle_time IS NOT NULL
                AND resolve_time IS NOT NULL
        )
        SELECT 
            alert_type,
            level,
            COUNT(*) as alert_count,
            ROUND(AVG(response_time/60)::numeric, 2) as avg_response_minutes,
            ROUND(AVG(resolve_time/60)::numeric, 2) as avg_resolve_minutes
        FROM alert_times
        GROUP BY alert_type, level
        ORDER BY level DESC, avg_response_minutes DESC;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "告警处理效率分析:")
    fmt. Printf( "%-15s %-6s %-10s %-20s %-20s\n",
         "告警类型""级别""数量""平均响应时间(分)""平均解决时间(分)")
     for rows. Next() {
         var alertType, level string
         var count int
         var respTime, resolveTime float64
         if err := rows. Scan(&alertType, &level, &count, &respTime, &resolveTime); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%-15s %-6s %-10d %-20.2f %-20.2f\n",
            alertType, level, count, respTime, resolveTime)
    }
     return nil
}

// 查询测试记录的时间分布
func  queryTestTimeDistribution(ctx context. Context, conn *pgx. Conn) error {
    query :=  `
        SELECT 
            EXTRACT(HOUR FROM k_timestamptz) as hour,
            COUNT(*) as test_count,
            ROUND(AVG(quality_score)::numeric, 2) as avg_quality,
            ROUND(AVG(test_duration)::numeric, 2) as avg_duration
        FROM factory_ts.test_records
        WHERE k_timestamptz >= NOW() - INTERVAL '10 minutes'
        GROUP BY hour
        ORDER BY hour;
    `


    rows, err := conn. Query(ctx, query)
     if err != nil {
         return fmt. Errorf( "执行查询失败: %v", err)
    }
    defer rows. Close()

    fmt. Println( "24小时测试记录分布:")
    fmt. Printf( "%-6s %-10s %-15s %-15s\n""小时""测试数""平均质量分""平均耗时(秒)")
     for rows. Next() {
         var hour float64
         var count int
         var quality, duration float64
         if err := rows. Scan(&hour, &count, &quality, &duration); err != nil {
             return fmt. Errorf( "读取数据失败: %v", err)
        }
        fmt. Printf( "%02.0f:00 %-10d %-15.2f %-15.2f\n", hour, count, quality, duration)
    }
     return nil
}

由于篇幅关系,这里只描述其中的一个查询场景,以下是在近270w的数据中查询相关的数据,可以看到,只需要3s左右就可以查询出来,非常的快速,但是这种SQL在MySQL中进行了对比,需要10几秒才能查出来,查询的性能还是非常的不错。

4.5 使用 Grafana 查看指标数据:

Grafana 支持查看 KaiwuDB 集群及各个节点的监控指标,包括指标概览、硬件指标、运行指标、SQL 指标、存储指标、副本指标、分布式指标、队列指标和慢查询指标。

总结:

在工业4.0和智能制造快速发展的背景下,传统关系型数据库已难以应对海量时序数据的存储与处理需求。KWDB作为专为物联网和工业互联网设计的分布式大数据平台,通过高性能时序数据库核心模块,有效解决了TB/PB级数据的实时汇聚、存储分析和智能预测等关键问题。该平台融合了关系型和时序数据库优势,支持云原生部署,具备集群扩展能力,特别适用于工厂自动化等IoT场景。

相比MySQL在千万级数据量时出现的性能瓶颈、同步延迟等问题,KWDB通过分布式架构和AI集成,不仅提升了数据处理效率,还实现了实时商业洞察,显著降低了企业的研发和运维成本,为工业数字化转型提供了可靠的数据基础设施支撑。

 


请使用浏览器的分享功能分享到微信等