场景案例 - KWDB分布式多模数据库在净水机物联网IoT方案落地最佳实践:

 

作者:完美句号

原文链接:https://blog.csdn.net/wanmeijuhao/article/details/147965173?spm=1001.2014.3001.5501

一、序列概括:

随着物联网技术在各行业的使用将越来越广,物联网与人工智能的深度结合,特别是在智能家居、智慧城市、工业4.0等领域有着广泛的应用‌,AIoT(Artificial Intelligence of Things)正成为企业实现智能化转型的关键路径。

上面一篇长文已经介绍了KWDB的安装以及踩过的坑,而且也对TSBS专门针对时间序列场景做了压力测试。

接下来,将从第2个方向“场景案例”一起来探索公司实际物联网IoT项目的思考,是否能让KWDB 分布式多模数据库碰撞出不一样的火花呢?

二、正文前言:

本人在学校毕业后一直从事开发工作,经过了以下几个阶段的软件开发历程,从早期的Web时代编程、到云时代分布式编程,到如今的物联网AIoT时代,都是解决企业级项目开发的效率问题,通过不断的技术演进,来为公司带来降本增效的原则。

2.1 传统Web时代编程开发时代:

传统Web时代编程开发时代是一个以静态网页为主,依赖HTML、CSS和JavaScript等技术栈,通过服务器端处理逻辑和数据库交互,实现网页动态内容和交互功能的时代。

2.2 云时代分布式编程时代:

云时代分布式编程时代,强调利用云计算平台进行数据的分布式处理和应用程序的编写与开发。依托分布式架构构建高可用服务集群,以服务化SaaS交付模式降低IT运维复杂度,推动企业从固定硬件投入向灵活智能的数字化服务转型。

2.3 物联网AIoT时代:

物联网AIoT时代,就像给世界装上“超级感官”与“智慧大脑”——机器之间能“对话”,数据像血液般流动:红灯预判车流、工厂设备自主巡检,甚至冰箱会提醒牛奶过期;万物从沉默到觉醒,效率与便利在指尖悄然生长,生活像被施了魔法,一切变得更聪明、更贴心。

三、物联网AIoT时代数据管理的新途径 – “时序数据库”:

KWDB 的时序数据库(Timeseries Database)和关系型数据库(Relational Database)使用不同的存储引擎和元数据管理方式:

①. 时序数据库:专门存储带时间戳的序列化数据,要求表结构包含主时间戳字段和标签字段。

②. 关系型数据库:支持传统关系模型,可定义主键、外键、索引等约束,适用于非时间序列数据。

KWDB 正在成为新一代物联网与智能系统的核心数据引擎。如果您正面临复杂的数据接入、多模数据管理或实时计算需求,不妨尝试一下这款专为 AIoT 设计的数据库产品,拥抱真正面向未来的数据基础设施。

四、企业内部物联网IoT净水机业务平台方案思考:

公司企业的净水机业务就是采用了物联网IoT的设计方案,能够实时对设备的运行状态、水质状态以及滤芯状态进行全面而精确的监测。通过高效的数据传输机制,该系统实现了对设备的智能化管理,包括:

①. 同步水卡消费记录、商家或设备的记录。

②. 查询TDS值、CPF/DF/CF值上传。

③. 远程开关机、重启机器、机器预警、调水费。

④. 滤芯寿命、水温、水量、pH值、电导率、温度。

五、KWDB在净水机物联网IoT方案落地最佳实践:GoLang应用程序 + 香橙派Orange Pi AI Pro 开发板场景案例测试:

在实际项目中,我们通过开发语言使用数据库提供的SDK连接到KWDB数据库实例进行CURD的操作,可以看到这里分为2种场景:

①. 应用程序配置连接器KWDB,可以看到官网提供了10几种不同应用程序连接器的应用开发。

②. 第三方工具与KWDB数据读写同步,有分布式消息中间件、离线数据同步工具、开源指标收集工具的数据读写同步。

5.1 使用 pgx 驱动连接 KWDB:

KWDB 支持可以通过 pgx 驱动连接数据库,并执行创建、插入和查询操作。下面代码演示如何使用 Go 语言通过 pgx 驱动连接 KWDB。

pgx 是用 Go 语言编写的 PostgreSQL 驱动和工具包,提供了高性能的低级接口,支持用户直接利用 PostgreSQL 的特性,pgx 还包含一个适配器,与标准的数据库或 SQL 接口兼容,方便开发者进行数据库操作。

go get github.com/jackc/pgx/v5@v5.5.1

5.2 香橙派Orange Pi AI Pro 开发板:

香橙派Orange Pi AI Pro开发板凭借其硬件性能与生态适配能力,在物联网(IoT)领域展现了显著优势,开发板通过高性能AI加速能力与灵活扩展性,提供了强大的训练能力,优异的推理能力,可以满足大多数AI算法原型验证、推理应用开发的需求,可以构建智能物联网节点的理想硬件平台。

以下是简单的香橙派Orange Pi AI Pro 开发板相关硬件设备(电源线、开发板、TF卡),经过几分钟简单的硬件组装后,因为需要进行调试,所以,插入相关的USB和HDMI插头后,开机登陆后即可进入桌面。

5.3 Golang模拟测试场景实践:

在香橙派Orange Pi AI Pro 开发板上,跑一个基于Go语言开发的物联网净水器性能压测系统,用于模拟大量设备并发上传数据和请求操作的场景。

5.3.1 KWDB - 时序数据库:

在数据库管理中,KWDB的时序数据库(Time-Series Database, TSD)是一种专门用于存储和管理随时间变化的数据的数据库。这类数据库特别适用于物联网(IoT)、监控系统、日志分析、金融分析等领域,能够高效地处理大量时间序列数据。

首先我们需要新建一个测试使用的数据库,这里需要注意一下,跟平时的建表语句不同的是,需要携带一个TS的参数标识,表示是Time-Series时序数据库。

这里的分区时间范围比较实用,会进行定期检测删除,删除的依据就是分区中最新的数据已经超过了保存周期的话就执行删除,比较类似于Redis的“定时删除策略”,会创建一个定时事件,当时间到达时,由事件处理器自动执行 key 的删除操作相当于会维护一个进程去进行检测。

# 创建iot时序数据库成功

CREATE  TS  DATABASE iot  RETENTIONS 50d  PARTITION  INTERVAL 2d;
# 查看所有数据库
show databases;
# 查看时序数据库的建库语句
show create database iot;
# 切换指定的数据库
use iot;

这种数据库的特点,非常适合我们的这种业务,比如在业务的设计中存在“冷热数据”,MongoDB的存在是物联网最新上传的数据,老数据会针对性的进行数据刷选备份到MySQL进行存档保存。

5.3.2 KWDB - 时序表:

KWDB时序表(TIME SERIES TABLE)是用于存储时间序列数据的数据表,在与平常接触的建表语句不太一样,而且有些数据类型、函数也是不支持的,具体支持的数据类型可以直接查看官方的文档 - 时序数据类型。

时序表跟关系型常规的建表语句有一些差异性,这里我做了一些总结,一张表包含3种类型的列:

①. 时间戳列:用于记录数据采集的时间。

②. 标签列:用于记录采集对象的静态数据,比如设备编号、标识位置、设备的型号,固定不变的数据。

③. 字段列:用于记录采集对象的实时数据,比如一直变化的数据,设备产生了不同的时间的收费、电流、电压。

接着创建2张时序表,一张时序表是记录净水机设备相关信息,比如这台净水机设备实时相关设备数据:TDS值、CPF/DF/CF值、pH值、水温、水量、电导率、滤芯寿命、设备状态等信息。


CREATE 
TABLE 
device_data (

    timestamp timestamptz  NOT  NULL,
    tds float  NOT  NULL,
    cpf float  NOT  NULL,
    df float  NOT  NULL,
    cf float  NOT  NULL,
    ph float  NOT  NULL,
    temperature float  NOT  NULL,
    water_volume float  NOT  NULL,
    filter_life  INTEGER  NOT  NULL,
    conductivity float  NOT  NULL,
    power_status  BOOLEAN  NOT  NULL,
    created_at timestamptz  NOT  NULL  default  now()
TAGS (
    device_id  VARCHAR( 50NOT  NULL
PRIMARY  TAGS (device_id);

另外一张时序表是与消费相关的,主要记录净水机设备交易数据,比如水卡消费记录、商家/设备记录。


CREATE 
TABLE 
consumption_record (

    timestamp timestamptz  NOT  NULL,
    amount float  NOT  NULL,
    created_at timestamptz  NOT  NULL  default  now()
TAGS (
    device_id  VARCHAR( 50NOT  NULL,
    card_id  VARCHAR( 50NOT  NULL
PRIMARY  TAGS (device_id);

5.4 Golang模拟压测脚本

上面可以看到数据库相关的准备工作已经就绪,接下来就是编写go相关代码,以下是go代码压测方案的示意图:

①. 在香橙派Orange Pi AI Pro 开发板上运行go压测程序,支持动态并发数、测试持续时间、模拟设备数量,上传支持水卡消费记录同步和模拟设备数据指标(TDS值、CPF/DF/CF值、pH值等)。

  • • 支持模拟多个设备并发上传数据
  • • 模拟多种数据指标:TDS值、CPF/DF/CF值、pH值等
  • • 模拟设备操作:开关机、重启、调水费等
  • • 支持水卡消费记录同步
  • • 实时性能监控和测试报告生成

②. 我们这次压力测试的是KWDB单节点版本安全模式下进行测试,推荐在集群环境进行测试,这样会更容易贴近生产环境。

③. 最后通过KWDB平台集成部署 Prometheus 和 Grafana,配置好告警规则和聚合规则配置文件,即可在Grafana中查看相关监控指标数据。

监控部署,可以直接查看官方的手册进行安装与配置(https://www.kaiwudb.com/kaiwudb_docs/#/db-monitor/os-monitor-component/deploy-monitoring.html)。

生成模拟设备数据指标(TDS值、CPF/DF/CF值、pH值等),和生成模拟支持水卡消费记录的Mock数据:


// generateDeviceData 生成模拟设备数据

func  generateDeviceData(deviceID string) db. DeviceData {
     return db. DeviceData{
         DeviceID:     deviceID,
         Timestamp:    time. Now(),
         TDS:          rand. Float64() *  1000,
         CPF:          rand. Float64() *  100,
         DF:           rand. Float64() *  100,
         CF:           rand. Float64() *  100,
         PH:           rand. Float64()* 6 +  1,    // pH 1-7
         Temperature:  rand. Float64()* 30 +  10// 10-40度
         WaterVolume:  rand. Float64() *  1000,
         FilterLife:   rand. Intn( 100),
         Conductivity: rand. Float64() *  2000,
         PowerStatus:  rand. Intn( 2) ==  1,
    }
}

// generateConsumptionRecord 生成模拟消费记录
func  generateConsumptionRecord(deviceID string) db. ConsumptionRecord {
     return db. ConsumptionRecord{
         CardID:    fmt. Sprintf( "CARD_%d", rand. Intn( 1000)),
         DeviceID:  deviceID,
         Amount:    rand. Float64() *  100,
         Timestamp: time. Now(),
    }
}

在main.go中,可以定义TestConfig结构体的参数来调整测试配置,如Concurrency(并发数)、Duration:(测试持续时间)、DeviceCount(模拟设备数量)、ReportInterval(报告输出间隔),并且通过使用goroutine池可以管理并发数量,还可以通过channel控制并发数量,使用sync.Mutex确保指标统计的准确性。


// 定期输出连接池状态

    go  func( ) {
        ticker := time. NewTicker(time. Minute)
        defer ticker. Stop()

         for {
            select {
             case  <-ticker.C:
                stats := kwdb.GetStats()
                log.Printf("连接池状态 - 总连接数: %d, 空闲连接数: %d, 使用中连接数: %d\n",
                    stats.TotalConns(), stats.IdleConns(), stats.TotalConns()-stats.IdleConns())
            }
        }
    }()

    // 测试配置
    config := TestConfig{
        Concurrency:    100,
        Duration:       1 * time.Minute,
        DeviceCount:    1000,
        ReportInterval: 5 * time.Second,
    }

    // 初始化测试指标
    metrics := &TestMetrics{}

    // 创建等待组
    var wg sync.WaitGroup

    // 开始时间
    startTime := time.Now()

    // 定期报告测试进度
    go func() {
        for {
            time.Sleep(config.ReportInterval)
            if time.Since(startTime) >= config.Duration {
                break
            }
            log.Printf("进行中 - 总请求: %d, 成功: %d, 失败: %d, 平均延迟: %v\n",
                metrics.TotalRequests,
                metrics.SuccessRequests,
                metrics.FailedRequests,
                metrics.AverageLatency)
        }
    }()

    // 主测试循环
    for time.Since(startTime) < config.Duration {
        // 控制并发数
        if metrics.TotalRequests%int64(config.Concurrency) == 0 {
            time.Sleep(100 * time.Millisecond)
        }

        wg.Add(1)
        deviceID := fmt.Sprintf("DEVICE_%d", rand.Intn(config.DeviceCount))
        go simulateDevice(deviceID, metrics, &wg, kwdb)
    }

    // 等待所有goroutine完成
    wg.Wait()

注意:时间字段去掉CURRENT_TIMESTAMP (且设置了not_null),在插入数据时创建时间是空,这里就会提示报错提示:

ERROR: null value in column "created_at" violates not-null constraint (SQLSTATE 23502)

另外,也在思考,如果多台设备同时运行,肯定会存在并发的问题,上面可以看到第一列是时间列,而且是默认的索引,那么会不会在同一时间段,产生多条时间一样的数据呢?这样会不会存在问题呢?会不会产生时间一样的冲突呢?带着这个问题,我们可以做以下的测试:

结果,可以得出结论:时间对于不同设备来说是可以重复的 ,同一个设备不行,因为同一设备不可能在同一时间产生2个值。

相关代码:

package main


import (
     "fmt"
     "log"
     "math/rand"
     "sync"
     "time"

     "trae/db"
     "trae/config"
)

// TestConfig 测试配置
type  TestConfig struct {
     Concurrency     int            `json:"concurrency"`
     Duration        time. Duration  `json:"duration"`
     DeviceCount     int            `json:"device_count"`
     ReportInterval  time. Duration  `json:"report_interval"`
}

// TestMetrics 测试指标
type  TestMetrics struct {
     TotalRequests    int64
     SuccessRequests  int64
     FailedRequests   int64
     AverageLatency   time. Duration
     MaxLatency       time. Duration
     MinLatency       time. Duration
    mutex            sync. Mutex
}

// generateDeviceData 生成模拟设备数据
func  generateDeviceData(deviceID string) db. DeviceData {
     return db. DeviceData{
         DeviceID:     deviceID,
         Timestamp:    time. Now(),
         TDS:         rand. Float64() *  1000,
         CPF:         rand. Float64() *  100,
         DF:          rand. Float64() *  100,
         CF:          rand. Float64() *  100,
         PH:          rand. Float64()* 6 +  1// pH 1-7
         Temperature: rand. Float64()* 30 +  10// 10-40度
         WaterVolume: rand. Float64() *  1000,
         FilterLife:  rand. Intn( 100),
         Conductivity: rand. Float64() *  2000,
         PowerStatus: rand. Intn( 2) ==  1,
    }
}

// generateConsumptionRecord 生成模拟消费记录
func  generateConsumptionRecord(deviceID string) db. ConsumptionRecord {
     return db. ConsumptionRecord{
         CardID:    fmt. Sprintf( "CARD_%d", rand. Intn( 1000)),
         DeviceID:  deviceID,
         Amount:    rand. Float64() *  100,
         Timestamp: time. Now(),
    }
}

// simulateDevice 模拟单个设备的数据上传
func  simulateDevice( deviceID string, metrics *TestMetrics, wg *sync.WaitGroup, kwdb *db.KWDB) {
    defer wg. Done()

    start := time. Now()
    data :=  generateDeviceData(deviceID)
    consumption :=  generateConsumptionRecord(deviceID)

     // 模拟数据上传延迟
    time. Sleep(time. Duration(rand. Intn( 100)) * time. Millisecond)

     // 保存到KWDB
     var saveErr, saveErr2 error
    saveErr = kwdb. SaveDeviceData(&data)
    saveErr2 = kwdb. SaveConsumptionRecord(&consumption)

    metrics. mutex. Lock()
    metrics. TotalRequests++
     if saveErr == nil && saveErr2 == nil {
        metrics. SuccessRequests++
    }  else {
        metrics. FailedRequests++
         if saveErr != nil {
            log. Printf( "设备数据保存失败: %v", saveErr)
        }
         if saveErr2 != nil {
            log. Printf( "消费记录保存失败: %v", saveErr2)
        }
    }

    latency := time. Since(start)
     if metrics. MaxLatency < latency {
        metrics. MaxLatency = latency
    }
     if metrics. MinLatency ==  0 || metrics. MinLatency > latency {
        metrics. MinLatency = latency
    }
    metrics. AverageLatency = time. Duration( int64(metrics. AverageLatency+latency) /  2)
    metrics. mutex. Unlock()
}

func  main( ) {
     // 设置随机数种子
    rand. Seed(time. Now(). UnixNano())

     // 加载数据库配置
    dbConfig, err := config. LoadConfig( "config/database.yaml")
     if err != nil {
        log. Fatalf( "加载数据库配置失败: %v", err)
    }

     // 初始化数据库连接
    kwdb, err := db. NewKWDB(&dbConfig. Postgresql)
     if err != nil {
        log. Fatalf( "连接数据库失败: %v", err)
    }

     // 注册优雅关闭
    defer  func( ) {
        log. Println( "正在关闭数据库连接...")
        kwdb. Close()
        log. Println( "数据库连接已关闭")
    }()

     // 定期输出连接池状态
    go  func( ) {
        ticker := time. NewTicker(time. Minute)
        defer ticker. Stop()

         for {
            select {
             case  <-ticker.C:
                stats := kwdb.GetStats()
                log.Printf("连接池状态 - 总连接数: %d, 空闲连接数: %d, 使用中连接数: %d\n",
                    stats.TotalConns(), stats.IdleConns(), stats.TotalConns()-stats.IdleConns())
            }
        }
    }()

    // 测试配置
    config := TestConfig{
        Concurrency:    100,
        Duration:       1 * time.Minute,
        DeviceCount:    1000,
        ReportInterval: 5 * time.Second,
    }

    // 初始化测试指标
    metrics := &TestMetrics{}

    // 创建等待组
    var wg sync.WaitGroup

    // 开始时间
    startTime := time.Now()

    // 定期报告测试进度
    go func() {
        for {
            time.Sleep(config.ReportInterval)
            if time.Since(startTime) >= config.Duration {
                break
            }
            log.Printf("进行中 - 总请求: %d, 成功: %d, 失败: %d, 平均延迟: %v\n",
                metrics.TotalRequests,
                metrics.SuccessRequests,
                metrics.FailedRequests,
                metrics.AverageLatency)
        }
    }()

    // 主测试循环
    for time.Since(startTime) < config.Duration {
        // 控制并发数
        if metrics.TotalRequests%int64(config.Concurrency) == 0 {
            time.Sleep(100 * time.Millisecond)
        }

        wg.Add(1)
        deviceID := fmt.Sprintf("DEVICE_%d", rand.Intn(config.DeviceCount))
        go simulateDevice(deviceID, metrics, &wg, kwdb)
    }

    // 等待所有goroutine完成
    wg.Wait()

    // 输出最终测试报告
    log.Printf("\n测试完成\n")
    log.Printf("总请求数: %d\n", metrics.TotalRequests)
    log.Printf("成功请求: %d\n", metrics.SuccessRequests)
    log.Printf("失败请求: %d\n", metrics.FailedRequests)
    log.Printf("平均延迟: %v\n", metrics.AverageLatency)
    log.Printf("最大延迟: %v\n", metrics.MaxLatency)
    log.Printf("最小延迟: %v\n", metrics.MinLatency)
    log.Printf("QPS: %.2f\n", float64(metrics.TotalRequests)/config.Duration.Seconds())
}

下面是KWDB数据库连接的代码:

package db


import (
     "context"
     "fmt"
     "sync"
     "time"

     "github.com/jackc/pgx/v5/pgxpool"
     "trae/config"
)

// KWDB 数据库连接管理器
type  KWDB struct {
    pool     *pgxpool. Pool
    config   *config. DatabaseConfig
    ctx      context. Context
    cancel   context. CancelFunc
    mutex    sync. RWMutex
    isClosing bool
}

// NewKWDB 创建新的KWDB实例
func  NewKWDB(dbConfig *config. DatabaseConfig) (* KWDB, error) {
    ctx, cancel := context. WithCancel(context. Background())

     // 创建连接池配置
    poolConfig, err := pgxpool. ParseConfig(dbConfig. GetDSN())
     if err != nil {
         cancel()
         return nil, fmt. Errorf( "解析数据库配置失败: %v", err)
    }

     // 应用连接池设置
    poolConfig. MaxConns =  int32(dbConfig. Pool. MaxConns)
    poolConfig. MinConns =  int32(dbConfig. Pool. MinConns)
    poolConfig. MaxConnLifetime = dbConfig. Pool. MaxConnLifetime
    poolConfig. MaxConnIdleTime = dbConfig. Pool. MaxConnIdleTime
    poolConfig. HealthCheckPeriod = dbConfig. Pool. HealthCheckPeriod

     // 创建连接池(带重试机制)
     var pool *pgxpool. Pool
     for retries :=  3; retries >  0; retries-- {
        pool, err = pgxpool. NewWithConfig(ctx, poolConfig)
         if err == nil {
             break
        }
         if retries >  1 {
            time. Sleep(time. Second *  2)
        }
    }

     if err != nil {
         cancel()
         return nil, fmt. Errorf( "连接数据库失败: %v", err)
    }

     // 创建KWDB实例
    db := & KWDB{
         pool:     pool,
         config:   dbConfig,
         ctx:      ctx,
         cancel:   cancel,
         mutex:    sync. RWMutex{},
         isClosingfalse,
    }

     // 启动健康检查
    go db. healthCheck()

     return db, nil
}

// healthCheck 定期检查数据库连接健康状态
func (db * KWDBhealthCheck( ) {
    ticker := time. NewTicker(db. config. Pool. HealthCheckPeriod)
    defer ticker. Stop()

     for {
        select {
         case <-db.ctx.Done():
            return
        case <-ticker.C:
            if err := db.pool.Ping(db.ctx); err != nil {
                fmt.Printf("数据库健康检查失败: %v\n", err)
            }
        }
    }
}

// Close 优雅关闭数据库连接
func (db *KWDB) Close() {
    db.mutex.Lock()
    db.isClosing = true
    db.mutex.Unlock()

    // 取消上下文
    db.cancel()

    // 等待所有连接完成
    time.Sleep(time.Second * 5)

    // 关闭连接池
    db.pool.Close()
}

// GetStats 获取连接池统计信息
func (db *KWDB) GetStats() *pgxpool.Stat {
    return db.pool.Stat()
}

// SaveDeviceData 保存设备数据
func (db *KWDB) SaveDeviceData(data *DeviceData) error {
    db.mutex.RLock()
    if db.isClosing {
        db.mutex.RUnlock()
        return fmt.Errorf("数据库连接正在关闭")
    }
    db.mutex.RUnlock()

    // 重试机制
    var err error
    for retries := 3; retries > 0; retries-- {
        // 开启事务
        tx, err := db.pool.Begin(db.ctx)
        if err != nil {
            if retries > 1 {
                time.Sleep(time.Second)
                continue
            }
            return fmt.Errorf("开启事务失败: %v", err)
        }

        // 执行插入
        _, err = tx.Exec(db.ctx,
            "INSERT INTO device_data (device_id, timestamp, tds, cpf, df, cf, ph, temperature, water_volume, filter_life, conductivity, power_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
            data.DeviceID, data.Timestamp, data.TDS, data.CPF, data.DF, data.CF, data.PH, data.Temperature, data.WaterVolume, data.FilterLife, data.Conductivity, data.PowerStatus)

        if err != nil {
            tx.Rollback(db.ctx)
            if retries > 1 {
                time.Sleep(time.Second)
                continue
            }
            return fmt.Errorf("保存设备数据失败: %v", err)
        }

        // 提交事务
        if err = tx.Commit(db.ctx); err != nil {
            if retries > 1 {
                time.Sleep(time.Second)
                continue
            }
            return fmt.Errorf("提交事务失败: %v", err)
        }

        break
    }

    return err
}

// SaveDeviceDataBatch 批量保存设备数据
func (db *KWDB) SaveDeviceDataBatch(dataList []*DeviceData) error {
    if len(dataList) == 0 {
        return nil
    }

    db.mutex.Lock()
    defer db.mutex.Unlock()

    // 开启事务
    tx, err := db.pool.Begin(db.ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(db.ctx)

    // 批量插入数据
    for _, data := range dataList {
        _, err := tx.Exec(db.ctx, "INSERT INTO device_data (device_id, timestamp, tds, cpf, df, cf, ph, temperature, water_volume, filter_life, conductivity, power_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
            data.DeviceID, data.Timestamp, data.TDS, data.CPF, data.DF, data.CF, data.PH, data.Temperature, data.WaterVolume, data.FilterLife, data.Conductivity, data.PowerStatus)
        if err != nil {
            return fmt.Errorf("批量插入设备数据失败: %v", err)
        }
    }

    return tx.Commit(db.ctx)
}

// SaveConsumptionRecord 保存消费记录
func (db *KWDB) SaveConsumptionRecord(record *ConsumptionRecord) error {
    db.mutex.Lock()
    defer db.mutex.Unlock()

    // 插入消费记录到PostgreSQL
    _, err := db.pool.Exec(db.ctx,
        "INSERT INTO consumption_record (card_id, device_id, amount, timestamp) VALUES ($1, $2, $3, $4)",
        record.CardID, record.DeviceID, record.Amount, record.Timestamp)
    return err
}

// SaveConsumptionRecordBatch 批量保存消费记录
func (db *KWDB) SaveConsumptionRecordBatch(records []*ConsumptionRecord) error {
    if len(records) == 0 {
        return nil
    }

    db.mutex.Lock()
    defer db.mutex.Unlock()

    // 开启事务
    tx, err := db.pool.Begin(db.ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(db.ctx)

    // 批量插入记录
    for _, record := range records {
        _, err := tx.Exec(db.ctx, "INSERT INTO consumption_record (card_id, device_id, amount, timestamp) VALUES ($1, $2, $3, $4)",
            record.CardID, record.DeviceID, record.Amount, record.Timestamp)
        if err != nil {
            return fmt.Errorf("批量插入消费记录失败: %v", err)
        }
    }

    return tx.Commit(db.ctx)
}

// Ping 测试连接是否正常
func (db *KWDB) Ping() error {
    return db.pool.Ping(db.ctx)
}

// ReConnect 重新连接
func (db *KWDB) ReConnect() error {
    db.mutex.Lock()
    defer db.mutex.Unlock()

    // 关闭旧连接池
    db.pool.Close()

    // 创建新的连接池
    pool, err := pgxpool.New(db.ctx, db.pool.Config().ConnString())
    if err != nil {
        return fmt.Errorf("重新连接失败: %v", err)
    }

    db.pool = pool
    return nil
}

5.5 压测结果指标数据分析:

上面我们一共进行了4次的压力测试,可以使用 Grafana支持查看 KWDB 集群及各个节点的监控指标,包括指标概览、硬件指标、运行指标、SQL 指标、存储指标、副本指标、分布式指标、队列指标和慢查询指标。

以下不是所有指标都来查看,我们只调一些比较重要的指标来进一步分析与总结。

5.5.1概览页面指标 - SQL Queries指标:

在以下节点视图中,SQL Queries指标表示该时间序列图展示指定节点处理客户端请求的 QPS(Queries Per Second,每秒查询数)。并且支持查询的类型包括查询、更新、插入、删除。

采样值为 10 秒内的平均值。在如下集群视图中,该时间序列图展示当前集群查询负载的估计值,该估计值为每个节点最近 10 秒的活动情况的汇总值。可以在看到在以下4次压测中,在Inserts的操作最高值分别为:

①. 第一次压测时,在时间段为2025-05-08 02:19:45时,Inserts操作达到了4.15K每秒。

② 第二次压测时,在时间段为2025-05-08 02:22:15时,Inserts操作达到了2.87K每秒。

③ 第三次压测时,在时间段为2025-05-08 02:42:30时,Inserts操作达到了3.29K每秒。

④ 第四次压测时,在时间段为2025-05-08 02:52:45时,Inserts操作达到了3.74K每秒。

5.5.2 概览页面指标 - Service Latency: SQL 99th percentile:

在数据库管理中,服务延迟(Service Latency)指的是数据库操作(如查询、事务等)的执行时间,即集群从接收到查询请求到查询结束之间的时间,不包含将查询结果传输给客户端的时间。

特别是在高并发环境下,了解99th百分位延迟(也称为99百分位延迟或最长延迟)对于优化数据库性能至关重要。99th百分位延迟指的是在最坏情况下,有1%的请求超过了此延迟时间。

该时间序列图展示指定节点或者集群内所有节点的服务延迟的 99th 百分位数,即在观察时间内,百分之九十九(99%)的节点的服务延迟低于或等于这个值,这里可以看到最高的是369ms。

5.5.3 硬件页面指标 – Capacity & Memory Usage:

Capacity表示磁盘容量‌,这个指标可以显示磁盘的使用情况,包括总容量、已用容量和剩余容量等,帮助管理磁盘空间。用户可以通过监控存储容量图来判断什么时候需要为集群添加新的存储空间,比如如下,可以实时去查看时序数据库和关系型数据库磁盘空间占比,可以有针对性的进行调整。

Memory Usage表示内存容量,随着KWDB加载的数据源数据量增加、缓存的图表数据增多以及运行的插件功能增多,内存使用量会相应变化。这个指标用于及时发现内存使用是否超出合理范围,避免因内存耗尽导致KWDB服务中断,如果内存使用率持续上升且无合理回落,可能暗示存在内存泄漏等问题‌。

如上所示,可以看到我们实验的机器设备是8G,但是在压测的过程中,最高才使用到内存总量为2.02G,只占用了25%左右。

5.5.4 运行时页面指标 - GC Pause Time:

GC Pause Time指标指的是垃圾回收(Garbage Collection,简称GC)暂停时间,即在进行垃圾回收时,程序的工作线程被暂停的时间‌‌。
GC Pause Time指标反映了垃圾回收过程中应用程序暂停的时间长度。这个指标对于评估垃圾回收对应用程序性能的影响非常重要。较长的暂停时间会降低应用程序的响应速度和吞吐量,尤其是在交互式应用中,长时间的暂停会导致用户体验下降。因此,优化垃圾回收策略,减少暂停时间,是提高应用程序性能的关键‌。

在下面节点视图中,该时间序列图展示指定节点的 GC 阻塞时间。在集群视图中,该时间序列图展示集群中所有节点的 GC 阻塞时间总和,可以看到整个生命周期维持在2ms左右,最高是8.58ms。

GC Runs表示该时间序列图展示指定节点或者集群内所有节点的 GC 运行次数,可以看到平均稳定在0.5次左右,最高GC运行次数在1.27。

六、Java优化批量数据写入:

由于上面插入的效果没有实际体现出KWDB时序数据库的优势,经过官方的沟通,同时也说出了我代码的几个缺陷问题,这里我们以官方的示例来演示一下。

KaiwuDB JDBC 是 KWDB 的官方 Java 语言连接器,基于 PgJDBC 扩展实现,符合 JDBC 4.0、JDBC 4.1 和 JDBC 4.2 规范。Java 开发人员可以使用 KaiwuDB JDBC 驱动程序连接 KWDB 的服务进程,进行数据增删改查操作。使用批量接口写入数据时,如果待写入的值与列的数据类型不符或者待写入的字段不存在,KaiwuDB JDBC 会返回成功写入条数、写入失败条数,并将具体错误信息记录到日志中。

KaiwuDB JDBC 提供了传统的批量执行 SQL 接口,用户可以通过手动拼接 SQL 实现批量数据写入,同时提供了 addBatchInsert、executeBatchInsert 和 clearBatchInsert 接口,能够将同一张时序表的多次数据写入合并到一条 SQL 语句,降低 CPU 占用,提升写入性能。

注意:目前,批量写入功能只适用于 KWDB 单机版本。

6.1 版本说明:

①. 安装 openJDK(1.8 及以上版本)。

②. 安装 Maven(3.6 及以上版本)。

③. 安装 KWDB 2.2.0数据库、配置数据库认证方式、创建数据库。

④. 获取 KWDB JDBC 驱动包。

6.2 配置数据库:

测试使用的数据库是阿里云的ECS,配置是16核与32G的机器配置:

用时序写入短接功能,该功能默认关闭。启用后可以直接将数据写入存储,减少中间处理环节,提高性能。

# 为当前会话启用时序写入短接功能。

SET  SESSION tsinsert_direct= true;

#为  KaiwuDB 集群启用时序写入短接功能:
SET  CLUSTER  SETTING server. tsinsert_direct. enabled =  TRUE;

#允许写入时跳过错误数据,正常写入其他数据:
SET  SESSION ts_ignore_batcherror= true;

6.3 配置连接:

在 pom.xml 中添加依赖,将 KaiwuDB JDBC 引入 Java 项目:



   < groupId>com.kaiwudb groupId>
   < artifactId>kaiwudb-jdbc artifactId>
   < version>2.2.0 version>

创建一下时序表:

CREATE TABLE tsdb.tbl_raw_1 (ts TIMESTAMPTZ NOT NULL, data FLOAT8 NULL, type CHAR(10) NULL, parse VARCHAR NULL) TAGS (device CHAR(20) NOT NULL, iot_hub_name VARCHAR(64) NOT NULL) PRIMARY TAGS (device, iot_hub_name);

6.4 代码演示:

package com.
winterchen;


import com. kaiwudb. jdbc. KwStatement;
import org. springframework. boot. autoconfigure. SpringBootApplication;

import java. sql. Connection;
import java. sql. DriverManager;
import java. sql. SQLException;
import java. util. LinkedHashMap;
import java. util. UUID;
import java. util. concurrent. ThreadLocalRandom;

@ SpringBootApplication
public  class  SpringbootMybatisDemo2Application {

    public  static  void  main( String[] args) {
         String url =  "jdbc:kaiwudb://47.111.23.173:26257/tsdb?preferQueryMode=simple";
         String user =  "root";
         String password =  "123456";

         try ( Connection connection =  DriverManager. getConnection(url, user, password)) {
             KwStatement statement = ( KwStatement) connection. createStatement();
            long timestamp = 1731373200000L;  // 2024-11-12 09:00:00.000 初始时间戳
             for (int i =  0; i <  1000; i++) {
                 // 循环1000次,每次写入1000行数据,共计100万行数据;每次循环插入20个设备,每个设备50行的数据
                 for (int row =  1; row <=  999; row++) {
                    int index = (row -  1) %  10 +  1;
                    long finalTime = timestamp + (row * 1000L) + (i *  50 * 1000L);
                     for (int num =  1; num <=  999; num++) {
                         String device =  "device_" + num;
                         String iot =  "iot_" + num;
                        statement. addBatchInsert(finalTime, ( "tbl_raw_1"),
                                 new  LinkedHashMap< StringObject>() {{
                                     put( "ts", finalTime);
                                     put( "data"ThreadLocalRandom. current(). nextDouble());
                                     put( "type""t_001");
                                     put( "parse"UUID. randomUUID() +  "'123");
                                }},
                                 new  LinkedHashMap< StringObject>() {{
                                     put( "device", device);
                                     put( "iot_hub_name", iot);
                                }});

                    }
                }

                 // execute batch insert sql data
                long inserTime =  System. currentTimeMillis();
                int[] updateCounts = statement. executeBatchInsert();
                int totalCount =  0;
                 for (int  count: updateCounts) {
                    totalCount += count;
                }
                long insertEndTime =  System. currentTimeMillis() - inserTime;
                 System. out. println( "插入数据时间:" + insertEndTime +  "插入了数据:" + totalCount +  "条");

                 // clear batch insert temp data
                statement. clearBatchInsert();
            }
             // close statement
            statement. close();
        }  catch ( SQLException ex) {
            ex. printStackTrace();
        }
    }
}

通过对上面的数据进行总结分析如下,可以看到每秒插入的数据可以平均在1.7w左右,其实还可以加大,但是由于我机器的原因,只能到最大一次性插入10w条数据,有更好机器的朋友可以尝试一下。

这里也可能与网速有关系,公司的网可能会限速,我使用自己的5G就可以达到上面的测试标准数据。

七、跨模查询:

跨模查询是一种用于检索相关联数据的查询方法,通常用于在不同类型的数据库之间进行查询,例如在关系数据库和时序数据库之间检索相关的数据,KWDB 跨模查询支持对关系表和时序表进行关联查询、嵌套查询、联合查询。

KWDB 跨模查询支持以下关联查询:

  • • 内连接(INNER JOIN)
  • • 左连接(LEFT JOIN)
  • • 右连接(RIGHT JOIN)
  • • 全连接(FULL JOIN)

KWDB 跨模查询支持以下嵌套查询:

  • • 相关子查询(Correlated Subquery):内部查询依赖于外部查询的结果,每次外部查询的都触发内部查询的执行。
  • • 非相关子查询(Non-Correlated Subquery):内部查询独立于外部查询,只执行一次内部查询并返回固定的结果。
  • • 相关投影子查询(Correlated Scalar Subquery): 内部查询依赖于外部查询的结果,并且只返回一个单一的值作为外部查询的结果。
  • • 非相关投影子查询(Non-Correlated Scalar Subquery):内部查询独立于外部查询,并且只返回一个单一的值作为外部查询的结果。
  • • FROM 子查询:将一个完整的 SQL 查询嵌套在另一个查询的 FROM 子句中,作为临时表格使用。

KWDB 跨模查询支持以下联合查询:

  • • UNION:合并多个查询结果集,并去除重复行。
  • • UNION ALL:合并多个查询结果集,但不去除重复行。
  • • INTERSECT:返回两个查询结果集中都存在的所有行,去除重复行。
  • • INTERSECT ALL:返回两个查询结果集中都存在的所有行,但不去除重复行。
  • • EXCEPT:返回第一个查询结果集中不包含在第二个结果集中的行,去除重复行。
  • • EXCEPT ALL:返回第一个查询结果集中不包含在第二个结果集中的行,不去除重复行。

warning 说明

  • • KWDB 支持显式事务内执行时序数据的查询以及写入,但不保证时序引擎的事务性,也不保证跨模查询结果的一致性。
  • • 使用 FULL JOIN 时,避免在连接条件中使用子查询。

以下为SQL查询相关语句:

-- 设备数据表

CREATE  TABLE  device_data (
    id  SERIAL  PRIMARY  KEY,
    device_id  VARCHAR( 50),
    timestamp  TIMESTAMPTZ  DEFAULT  CURRENT_TIMESTAMP,
    voltage  FLOAT,
    current  FLOAT,
    power  FLOAT,
    energy  FLOAT,
    power_factor  FLOAT,
    frequency  FLOAT,
    temperature  FLOAT,
    status  VARCHAR( 20),
    created_at  TIMESTAMPTZ  DEFAULT  CURRENT_TIMESTAMP
);

-- 消费记录表
CREATE  TABLE  consumption_record (
    id  SERIAL  PRIMARY  KEY,
    device_id  VARCHAR( 50),
    timestamp  TIMESTAMPTZ,
    energy_usage  FLOAT,
    amount  DECIMAL( 10, 2),
    created_at  TIMESTAMPTZ  DEFAULT  CURRENT_TIMESTAMP
);

-- 查询设备运行状态统计
SELECT 
    status,
     COUNT(*)  as device_count,
     AVG(voltage)  as avg_voltage,
     AVG(current)  as avg_current,
     AVG(power)  as avg_power,
     AVG(temperature)  as avg_temperature
FROM device_data
WHERE timestamp >=  NOW() -  INTERVAL  '1 hour'
GROUP  BY status;

-- 查询高能耗设备
SELECT 
    device_id,
     SUM(energy_usage)  as total_energy,
     SUM(amount)  as total_amount,
     COUNT(*)  as record_count
FROM consumption_record
WHERE timestamp >=  NOW() -  INTERVAL  '30 days'
GROUP  BY device_id
HAVING  SUM(energy_usage) >  1000
ORDER  BY total_energy  DESC;

-- 查询设备运行参数异常记录
SELECT 
    device_id,
    timestamp,
    voltage,
    current,
    power,
    temperature,
    status,
     CASE 
         WHEN voltage  NOT  BETWEEN  210  AND  230  THEN  '电压异常'
         WHEN current >  100  THEN  '电流过高'
         WHEN power_factor <  0.85  THEN  '功率因数低'
         WHEN temperature >  75  THEN  '温度过高'
         ELSE  '其他异常'
     END  as alarm_type
FROM device_data
WHERE timestamp >=  NOW() -  INTERVAL  '24 hours'
     AND (
        voltage  NOT  BETWEEN  210  AND  230
         OR current >  100
         OR power_factor <  0.85
         OR temperature >  75
    )
ORDER  BY timestamp  DESC;

-- 查询设备能耗趋势
SELECT 
     DATE_TRUNC( 'hour', timestamp)  as hour,
     COUNT( DISTINCT device_id)  as device_count,
     SUM(energy_usage)  as total_energy,
     SUM(amount)  as total_amount,
     AVG(energy_usage)  as avg_energy_per_device
FROM consumption_record
WHERE timestamp >=  NOW() -  INTERVAL  '24 hours'
GROUP  BY  DATE_TRUNC( 'hour', timestamp)
ORDER  BY hour;

-- 查询设备维护建议
SELECT 
    dd. device_id,
     MAX(dd. timestampas last_update,
     COUNT(*)  as abnormal_count,
     STRING_AGG( DISTINCT 
         CASE 
             WHEN dd. voltage  NOT  BETWEEN  210  AND  230  THEN  '电压异常'
             WHEN dd. current >  100  THEN  '电流过高'
             WHEN dd. power_factor <  0.85  THEN  '功率因数低'
             WHEN dd. temperature >  75  THEN  '温度过高'
         END,
         ', '
    )  as abnormal_types
FROM device_data dd
WHERE dd. timestamp >=  NOW() -  INTERVAL  '7 days'
     AND (
        dd. voltage  NOT  BETWEEN  210  AND  230
         OR dd. current >  100
         OR dd. power_factor <  0.85
         OR dd. temperature >  75
    )
GROUP  BY dd. device_id
HAVING  COUNT(*) >  5
ORDER  BY abnormal_count  DESC;

-- 查询设备能效分析
WITH hourly_stats  AS (
     SELECT 
        device_id,
         DATE_TRUNC( 'hour', timestamp)  as hour,
         SUM(energy_usage)  as hourly_energy,
         SUM(amount)  as hourly_amount
     FROM consumption_record
     WHERE timestamp >=  NOW() -  INTERVAL  '24 hours'
     GROUP  BY device_id,  DATE_TRUNC( 'hour', timestamp)
)
SELECT 
    device_id,
     AVG(hourly_energy)  as avg_hourly_energy,
     MAX(hourly_energy)  as max_hourly_energy,
     MIN(hourly_energy)  as min_hourly_energy,
     SUM(hourly_energy)  as total_energy,
     SUM(hourly_amount)  as total_amount,
     COUNT(*)  as active_hours
FROM hourly_stats
GROUP  BY device_id
HAVING  COUNT(*) >=  12
ORDER  BY avg_hourly_energy  DESC;

八、总结:

本文深入探讨了KWDB分布式多模数据库在净水机物联网IoT项目中的应用实践与压力测试。通过结合Go语言与香橙派Orange Pi AI Pro开发板,成功构建了模拟大量设备并发上传数据和请求操作的场景,验证了KWDB在处理时间序列数据方面的卓越性能。

在项目实施过程中,首先通过pgx驱动连接KWDB数据库,实现了数据的CURD操作。接着,针对净水机业务特点,设计了包含设备数据和消费记录的时序表,有效支持了设备状态监测和水卡消费记录同步等核心功能。

压力测试环节,通过模拟不同并发数、测试持续时间及模拟设备数量,全面评估了KWDB在高负载下的表现。测试结果显示,KWDB在插入操作方面表现出色,每秒插入数据达到最高2.2w(受限机器配置的原因,官方测试甚至可达几十万甚至上百万的级别),同时服务延迟保持在合理范围内,确保了系统的实时性和稳定性。

此外,通过Grafana监控平台,对KWDB集群及节点的各项监控指标进行了深入分析,包括SQL查询量、服务延迟、磁盘容量、内存使用及垃圾回收暂停时间等。这些指标数据为系统的优化和调整提供了有力支持,确保了KWDB在净水机物联网IoT项目中的高效稳定运行。

综上所述,KWDB分布式多模数据库凭借其强大的时间序列数据处理能力和灵活的部署方式,在物联网IoT领域展现出广阔的应用前景。通过本文的实践与压测总结,为KWDB在类似项目中的推广和应用提供了宝贵的参考经验。

 


请使用浏览器的分享功能分享到微信等