TiDB分布式数据库架构:Raft协议与HTAP混合负载的工程实践

# TiDB分布式数据库架构:Raft协议与HTAP混合负载的工程实践


在金融级应用场景中,数据库系统需要同时满足高可用性、强一致性和实时分析能力。TiDB作为新一代分布式数据库,基于Raft一致性协议构建了弹性扩展的存储层,并通过HTAP架构实现了事务处理与实时分析的混合负载支持。


## TiDB整体架构设计


TiDB采用分层架构设计,将计算层(TiDB Server)、存储层(TiKV)和调度层(PD)分离,实现了计算与存储的解耦。


```sql

-- TiDB集群初始化配置

-- 创建系统数据库

CREATE DATABASE IF NOT EXISTS mysql;


-- 配置系统表

CREATE TABLE IF NOT EXISTS mysql.user (

    Host CHAR(60) COLLATE utf8_bin NOT NULL DEFAULT '',

    User CHAR(32) COLLATE utf8_bin NOT NULL DEFAULT '',

    -- 认证信息

    authentication_string TEXT DEFAULT NULL,

    -- 权限字段

    Select_priv ENUM('N','Y') NOT NULL DEFAULT 'N',

    Insert_priv ENUM('N','Y') NOT NULL DEFAULT 'N',

    Update_priv ENUM('N','Y') NOT NULL DEFAULT 'N',

    Delete_priv ENUM('N','Y') NOT NULL DEFAULT 'N',

    -- 其他权限...

    PRIMARY KEY (Host, User)

) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;


-- 配置集群元数据

INSERT INTO mysql.tidb VALUES 

('tikv_gc_life_time', '10m0s'),

('tikv_gc_run_interval', '10m0s'),

('tikv_gc_concurrency', '1'),

('max_connections', '4000');


-- 创建业务数据库

CREATE DATABASE IF NOT EXISTS financial_data;

USE financial_data;


-- 创建分片表

CREATE TABLE account_transactions (

    transaction_id BIGINT NOT NULL AUTO_INCREMENT,

    account_id VARCHAR(36) NOT NULL,

    transaction_type ENUM('DEPOSIT', 'WITHDRAWAL', 'TRANSFER') NOT NULL,

    amount DECIMAL(15,2) NOT NULL,

    currency CHAR(3) NOT NULL DEFAULT 'CNY',

    transaction_time TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),

    status ENUM('PENDING', 'COMPLETED', 'FAILED') NOT NULL DEFAULT 'PENDING',

    -- 分区键

    PRIMARY KEY (transaction_id, account_id),

    -- 分片键

    SHARD_ROW_ID_BITS = 4,

    TIFLASH_REPLICA = 1  -- TiFlash副本

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

PARTITION BY HASH(MOD(transaction_id, 16))

TABLESPACE ts_accounts;


-- 创建二级索引

CREATE INDEX idx_account_time ON account_transactions(account_id, transaction_time);

CREATE INDEX idx_status_time ON account_transactions(status, transaction_time);

```


## TiKV存储引擎与Raft协议实现


TiKV基于RocksDB构建,通过Multi-Raft Group实现数据分片和副本管理。


```rust

// tikv-raft-engine.rs - Raft存储引擎核心实现

use raft::{RawNode, ProgressTracker, Config};

use raft::storage::{MemStorage, Storage};

use protobuf::Message;

use std::sync::{Arc, RwLock};

use rocksdb::{DB, Options, WriteBatch};


pub struct TiKVEngine {

    db: Arc,

    raft_config: Config,

    raft_groups: RwLock>,

}


pub struct RaftGroup {

    raw_node: RawNode,

    region: metapb::Region,

    peers: Vec,

}


impl TiKVEngine {

    pub fn new(db_path: &str) -> Result {

        // 初始化RocksDB

        let mut opts = Options::default();

        opts.create_if_missing(true);

        opts.set_max_background_jobs(4);

        opts.set_bytes_per_sync(1024 * 1024); // 1MB

        

        let db = DB::open(&opts, db_path)?;

        

        // Raft配置

        let mut raft_config = Config::default();

        raft_config.id = 1;

        raft_config.election_tick = 10;

        raft_config.heartbeat_tick = 3;

        raft_config.max_size_per_msg = 1024 * 1024; // 1MB

        raft_config.max_inflight_msgs = 256;

        

        Ok(TiKVEngine {

            db: Arc::new(db),

            raft_config,

            raft_groups: RwLock::new(HashMap::new()),

        })

    }

    

    pub fn propose(&self, region_id: u64, data: Vec) -> Result {

        let mut groups = self.raft_groups.write().unwrap();

        

        if let Some(group) = groups.get_mut(®ion_id) {

            // 创建Raft日志条目

            let mut entry = eraftpb::Entry::new();

            entry.set_entry_type(eraftpb::EntryType::EntryNormal);

            entry.set_data(data);

            

            // 提交到Raft

            group.raw_node.propose(vec![], vec![entry])?;

            

            // 获取日志索引

            let last_index = group.raw_node.raft.raft_log.last_index();

            

            // 等待提交

            self.wait_for_commit(region_id, last_index)?;

            

            Ok(last_index)

        } else {

            Err(EngineError::RegionNotFound(region_id))

        }

    }

    

    pub fn write_batch(&self, batch: WriteBatch) -> Result<(), EngineError> {

        let write_opts = WriteOptions::default();

        self.db.write_opt(batch, &write_opts)?;

        Ok(())

    }

    

    fn wait_for_commit(&self, region_id: u64, index: u64) -> Result<(), EngineError> {

        let start = Instant::now();

        let timeout = Duration::from_secs(5);

        

        while start.elapsed() < timeout {

            let groups = self.raft_groups.read().unwrap();

            if let Some(group) = groups.get(®ion_id) {

                let committed = group.raw_node.raft.raft_log.committed;

                if committed >= index {

                    return Ok(());

                }

            }

            

            thread::sleep(Duration::from_millis(10));

        }

        

        Err(EngineError::CommitTimeout)

    }

    

    pub fn handle_raft_message(&self, msg: raftpb::Message) -> Result<(), EngineError> {

        let region_id = msg.region_id();

        let mut groups = self.raft_groups.write().unwrap();

        

        if let Some(group) = groups.get_mut(®ion_id) {

            group.raw_node.step(msg)?;

            

            // 处理Raft状态机

            self.process_raft_ready(region_id, group)?;

        }

        

        Ok(())

    }

    

    fn process_raft_ready(&self, region_id: u64, group: &mut RaftGroup) -> Result<(), EngineError> {

        // 检查是否有待处理的Ready

        if !group.raw_node.has_ready() {

            return Ok(());

        }

        

        let mut ready = group.raw_node.ready();

        

        // 1. 持久化未保存的状态

        if !ready.hs().is_empty() {

            self.save_hard_state(region_id, ready.hs())?;

        }

        

        // 2. 保存日志条目

        if !ready.entries().is_empty() {

            self.save_entries(region_id, ready.entries())?;

        }

        

        // 3. 应用已提交的日志

        let mut applied = false;

        for entry in ready.committed_entries() {

            if entry.get_entry_type() == eraftpb::EntryType::EntryNormal {

                self.apply_entry(region_id, entry)?;

                applied = true;

            }

        }

        

        // 4. 发送消息

        for msg in ready.messages() {

            self.send_raft_message(msg)?;

        }

        

        // 5. 推进Raft状态

        if applied {

            let mut light_ready = group.raw_node.advance_apply();

            if !light_ready.commit_index().is_empty() {

                group.raw_node.advance_apply_to(light_ready.commit_index()[0]);

            }

        }

        

        group.raw_node.advance(ready);

        

        Ok(())

    }

    

    fn apply_entry(&self, region_id: u64, entry: &eraftpb::Entry) -> Result<(), EngineError> {

        // 解析Entry数据

        let mut write_batch = WriteBatch::default();

        

        match entry.get_data_type() {

            EntryDataType::Put => {

                let put_cmd: PutRequest = protobuf::parse_from_bytes(entry.get_data())?;

                

                // 构建RocksDB KV

                write_batch.put(

                    encode_key(region_id, &put_cmd.key),

                    put_cmd.value

                )?;

            }

            EntryDataType::Delete => {

                let delete_cmd: DeleteRequest = protobuf::parse_from_bytes(entry.get_data())?;

                

                write_batch.delete(encode_key(region_id, &delete_cmd.key))?;

            }

            _ => {}

        }

        

        // 写入存储

        self.write_batch(write_batch)?;

        

        Ok(())

    }

}

```


## HTAP架构与列存引擎TiFlash


TiFlash作为列式存储引擎,通过Raft Learner协议实时同步TiKV数据,支持实时分析查询。


```sql

-- TiFlash配置与使用

-- 启用TiFlash副本

ALTER TABLE account_transactions SET TIFLASH REPLICA 2;


-- 检查TiFlash副本状态

SELECT * FROM information_schema.tiflash_replica 

WHERE table_schema = 'financial_data' 

AND table_name = 'account_transactions';

<"edr.j9k5.org.cn"><"uyr.j9k5.org.cn"><"eve.j9k5.org.cn">

-- 强制查询使用TiFlash(列存引擎)

SELECT /*+ READ_FROM_STORAGE(TIFLASH[at]) */ 

    account_id,

    DATE(transaction_time) as trans_date,

    COUNT(*) as transaction_count,

    SUM(amount) as total_amount

FROM account_transactions at

WHERE transaction_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)

    AND status = 'COMPLETED'

GROUP BY account_id, DATE(transaction_time)

ORDER BY total_amount DESC

LIMIT 100;


-- 混合负载查询示例(同时访问TiKV和TiFlash)

WITH realtime_stats AS (

    -- TiFlash:历史数据分析

    SELECT /*+ READ_FROM_STORAGE(TIFLASH[at]) */

        account_id,

        AVG(amount) as avg_amount_30d,

        COUNT(*) as trans_count_30d

    FROM account_transactions at

    WHERE transaction_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)

        AND status = 'COMPLETED'

    GROUP BY account_id

),

current_balance AS (

    -- TiKV:实时余额查询

    SELECT 

        account_id,

        balance

    FROM account_balance

    WHERE update_time >= DATE_SUB(NOW(), INTERVAL 5 MINUTE)

)

SELECT 

    cb.account_id,

    cb.balance as current_balance,

    rs.avg_amount_30d,

    rs.trans_count_30d,

    CASE 

        WHEN cb.balance > rs.avg_amount_30d * 3 THEN 'HIGH_LIQUIDITY'

        WHEN cb.balance < rs.avg_amount_30d THEN 'LOW_LIQUIDITY'

        ELSE 'NORMAL_LIQUIDITY'

    END as liquidity_status

FROM current_balance cb

LEFT JOIN realtime_stats rs ON cb.account_id = rs.account_id

WHERE cb.balance > 0

ORDER BY cb.balance DESC;

```


## PD调度器与弹性扩缩容


Placement Driver(PD)负责TiDB集群的元数据管理和调度决策。


```go

// pd-scheduler.go - 调度器实现

package scheduler


import (

"context"

"fmt"

"time"

"github.com/pingcap/kvproto/pkg/metapb"

"github.com/pingcap/pd/server/core"

"github.com/pingcap/pd/server/schedule"

"github.com/pingcap/pd/server/schedule/operator"

)


type BalanceRegionScheduler struct {

name          string

cluster       schedule.Cluster

conf          *balanceRegionSchedulerConfig

filters       []schedule.Filter

}


func (s *BalanceRegionScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator {

// 获取所有存储节点

stores := cluster.GetStores()

// 过滤健康节点

var candidates []*core.StoreInfo

for _, store := range stores {

if s.IsHealthyStore(store) {

candidates = append(candidates, store)

}

}

if len(candidates) < 2 {

return nil

}

// 计算负载均衡

balanceOps := s.balanceRegions(cluster, candidates)

if len(balanceOps) > 0 {

return balanceOps

}

// 处理热点Region

hotspotOps := s.handleHotspotRegions(cluster, candidates)

return hotspotOps

}


func (s *BalanceRegionScheduler) balanceRegions(cluster schedule.Cluster, stores []*core.StoreInfo) []*operator.Operator {

var ops []*operator.Operator

// 按Region数量排序

sort.Slice(stores, func(i, j int) bool {

return stores[i].GetRegionCount() > stores[j].GetRegionCount()

})

source := stores[0]  // Region最多的节点

target := stores[len(stores)-1]  // Region最少的节点

// 计算需要迁移的Region数量

avgRegionCount := s.calculateAverageRegionCount(stores)

diff := source.GetRegionCount() - target.GetRegionCount()

if diff <= 1 {

return nil

}

// 选择要迁移的Region

regions := cluster.GetRegionsByStore(source.GetID())

for _, region := range regions {

// 检查Region是否适合迁移

if s.isRegionSuitableForMove(region, source, target) {

op := s.createMoveRegionOperator(region, source, target)

ops = append(ops, op)

if len(ops) >= diff/2 {  // 只迁移一半的差异

break

}

}

}

return ops

}


func (s *BalanceRegionScheduler) handleHotspotRegions(cluster schedule.Cluster, stores []*core.StoreInfo) []*operator.Operator {

// 检测热点Region

hotRegions := cluster.GetHotRegions()

var ops []*operator.Operator

for _, hotRegion := range hotRegions {

// 找到合适的目标节点

targetStore := s.findBestTargetForHotRegion(hotRegion, stores)

if targetStore != nil {

op := s.createHotRegionMoveOperator(hotRegion, targetStore)

ops = append(ops, op)

}

}

return ops

}


func (s *BalanceRegionScheduler) createMoveRegionOperator(region *core.RegionInfo, source, target *core.StoreInfo) *operator.Operator {

// 创建Region迁移操作

desc := fmt.Sprintf("move-region-%d-from-%d-to-%d", 

region.GetID(), source.GetID(), target.GetID())

// 添加Peer到目标节点

addPeer := operator.AddPeer{

ToStore: target.GetID(),

PeerID:  cluster.AllocID(),

}

// 从源节点移除Peer

removePeer := operator.RemovePeer{

FromStore: source.GetID(),

}

// 创建操作步骤

steps := []operator.OpStep{

operator.AddLearner{ToStore: target.GetID(), PeerID: addPeer.PeerID},

operator.ChangePeerV2Enter{},

operator.ChangePeerV2Leave{},

operator.RemovePeer{FromStore: source.GetID()},

}

return operator.NewOperator(desc, region.GetID(), region.GetRegionEpoch(), operator.OpRegion, steps...)

}


// 自动扩缩容控制器

type AutoScalingController struct {

cluster     schedule.Cluster

metrics     *MetricsCollector

scaleConfig *ScaleConfig

}


func (c *AutoScalingController) Run(ctx context.Context) {

ticker := time.NewTicker(5 * time.Minute)

defer ticker.Stop()

for {

select {

case <-ctx.Done():

return

case <-ticker.C:

c.evaluateAndScale()

}

}

}


func (c *AutoScalingController) evaluateAndScale() {

// 收集指标

metrics := c.metrics.Collect()

// CPU使用率判断

if metrics.AvgCPUUsage > c.scaleConfig.ScaleUpCPUThreshold {

c.scaleOut()

} else if metrics.AvgCPUUsage < c.scaleConfig.ScaleDownCPUThreshold {

c.scaleIn()

}

// 存储空间判断

if metrics.TotalStorageUsage > c.scaleConfig.ScaleUpStorageThreshold {

c.addStorageNode()

}

// QPS热点判断

if metrics.HotspotQPS > c.scaleConfig.HotspotThreshold {

c.splitHotRegions()

}

}


func (c *AutoScalingController) scaleOut() {

// 获取当前拓扑

currentTopology := c.cluster.GetTopology()

// 选择新节点规格

nodeSpec := c.chooseNodeSpec()

// 创建新TiKV节点

newNode := c.createTiKVNode(nodeSpec)

// 更新集群配置

c.cluster.AddStore(newNode)

// 触发Region重新平衡

c.triggerRebalance()

}

```


## 金融级事务保障机制


```sql

-- 分布式事务示例

BEGIN;


-- 账户A扣款

UPDATE account_balance 

SET balance = balance - 1000,

    version = version + 1,

    update_time = NOW(6)

WHERE account_id = 'ACC001'

    AND balance >= 1000

    AND version = 1;


-- 检查影响行数

SELECT ROW_COUNT() INTO @rows_affected;

<"wef.j9k5.org.cn"><"sgv.j9k5.org.cn"><"hxd.j9k5.org.cn">

IF @rows_affected = 0 THEN

    ROLLBACK;

    SIGNAL SQLSTATE '45000' 

        SET MESSAGE_TEXT = '账户余额不足或版本冲突';

END IF;


-- 账户B收款

UPDATE account_balance 

SET balance = balance + 1000,

    version = version + 1,

    update_time = NOW(6)

WHERE account_id = 'ACC002'

    AND version = 1;


-- 记录交易流水

INSERT INTO account_transactions (

    account_id,

    transaction_type,

    amount,

    currency,

    related_account,

    status

) VALUES 

('ACC001', 'TRANSFER', -1000, 'CNY', 'ACC002', 'COMPLETED'),

('ACC002', 'TRANSFER', 1000, 'CNY', 'ACC001', 'COMPLETED');


-- 提交事务

COMMIT;


-- 开启大事务(TiDB支持)

SET SESSION tidb_enable_1pc = 0;  -- 禁用1PC

SET SESSION tidb_txn_mode = 'optimistic';  -- 乐观事务


BEGIN;


-- 批量处理

INSERT INTO batch_processing_log (batch_id, start_time)

VALUES (UUID(), NOW(6));


-- 复杂业务逻辑

-- ...


-- 提交时使用2PC保证一致性

COMMIT;


-- 监控长时间运行的事务

SELECT 

    tidb_current_ts,

    start_time,

    timediff(NOW(), start_time) as duration,

    info

FROM information_schema.processlist

WHERE command != 'Sleep'

    AND time > 60  -- 超过60秒

ORDER BY time DESC;

```


## 监控与灾备方案


```yaml

# tidb-cluster-monitoring.yaml - 监控配置

apiVersion: monitoring.coreos.com/v1

kind: Prometheus

metadata:

  name: tidb-prometheus

  namespace: tidb-cluster

spec:

  serviceAccountName: prometheus

  serviceMonitorSelector: {}

  podMonitorSelector: {}

  resources:

    requests:

      memory: 4Gi

    limits:

      memory: 8Gi

  

  # TiDB集群监控目标

  additionalScrapeConfigs:

  - job_name: 'tidb'

    static_configs:

    - targets:

      - 'tidb-0.tidb-peer.tidb-cluster.svc:10080'

      - 'tidb-1.tidb-peer.tidb-cluster.svc:10080'

      - 'tidb-2.tidb-peer.tidb-cluster.svc:10080'

    metrics_path: /metrics

    

  - job_name: 'tikv'

    static_configs:

    - targets:

      - 'tikv-0.tikv-peer.tidb-cluster.svc:20180'

      - 'tikv-1.tikv-peer.tidb-cluster.svc:20180'

      - 'tikv-2.tikv-peer.tidb-cluster.svc:20180'

    metrics_path: /metrics

    

  - job_name: 'pd'

    static_configs:

    - targets:

      - 'pd-0.pd-peer.tidb-cluster.svc:2379'

      - 'pd-1.pd-peer.tidb-cluster.svc:2379'

      - 'pd-2.pd-peer.tidb-cluster.svc:2379'

    metrics_path: /metrics


---

# 告警规则

apiVersion: monitoring.coreos.com/v1

kind: PrometheusRule

metadata:

  name: tidb-alert-rules

  namespace: tidb-cluster

spec:

  groups:

  - name: tidb.rules

    rules:

    - alert: TiKVStoreDown

      expr: up{job="tikv"} == 0

      for: 5m

      labels:

        severity: critical

      annotations:

        summary: "TiKV节点宕机"

        description: "TiKV实例 {{ $labels.instance }} 已宕机超过5分钟"

        

    - alert: HighTiKVLeaderBalance

      expr: |

        abs(

          avg(tikv_store_leader_count) 

          by (instance) 

          - avg(avg(tikv_store_leader_count) without (instance))

        ) 

        / avg(avg(tikv_store_leader_count) without (instance)) 

        > 0.3

      for: 10m

      labels:

        severity: warning

      annotations:

        summary: "TiKV Leader分布不均"

        description: "TiKV节点 {{ $labels.instance }} 的Leader数量偏离平均值超过30%"

        

    - alert: TiDBHighQueryLatency

      expr: histogram_quantile(0.95, rate(tidb_server_handle_query_duration_seconds_bucket[5m])) > 1

      for: 5m

      labels:

        severity: warning

      annotations:

        summary: "TiDB查询延迟过高"

        description: "95%的查询延迟超过1秒"

        

    - alert: PDLeaderChange

      expr: changes(pd_server_is_leader[1h]) > 3

      labels:

        severity: warning

      annotations:

        summary: "PD Leader频繁切换"

        description: "过去1小时内PD Leader切换超过3次"

```


基于Raft一致性协议和HTAP混合负载架构的TiDB分布式数据库,为金融级应用提供了高可用、强一致性和实时分析能力。通过计算与存储分离、Multi-Raft副本管理、智能调度和列存引擎等技术,TiDB能够满足大规模在线事务处理和实时分析的双重需求。在实际部署中,需要根据业务特征合理设计数据分片策略,建立完善的监控告警和灾备体系,确保数据库服务的稳定可靠运行。随着数据规模的增长,TiDB的弹性扩展能力将为企业数字化转型提供坚实的数据基础设施支撑。


请使用浏览器的分享功能分享到微信等