# TiDB分布式数据库架构:Raft协议与HTAP混合负载的工程实践
在金融级应用场景中,数据库系统需要同时满足高可用性、强一致性和实时分析能力。TiDB作为新一代分布式数据库,基于Raft一致性协议构建了弹性扩展的存储层,并通过HTAP架构实现了事务处理与实时分析的混合负载支持。
## TiDB整体架构设计
TiDB采用分层架构设计,将计算层(TiDB Server)、存储层(TiKV)和调度层(PD)分离,实现了计算与存储的解耦。
```sql
-- TiDB集群初始化配置
-- 创建系统数据库
CREATE DATABASE IF NOT EXISTS mysql;
-- 配置系统表
CREATE TABLE IF NOT EXISTS mysql.user (
Host CHAR(60) COLLATE utf8_bin NOT NULL DEFAULT '',
User CHAR(32) COLLATE utf8_bin NOT NULL DEFAULT '',
-- 认证信息
authentication_string TEXT DEFAULT NULL,
-- 权限字段
Select_priv ENUM('N','Y') NOT NULL DEFAULT 'N',
Insert_priv ENUM('N','Y') NOT NULL DEFAULT 'N',
Update_priv ENUM('N','Y') NOT NULL DEFAULT 'N',
Delete_priv ENUM('N','Y') NOT NULL DEFAULT 'N',
-- 其他权限...
PRIMARY KEY (Host, User)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- 配置集群元数据
INSERT INTO mysql.tidb VALUES
('tikv_gc_life_time', '10m0s'),
('tikv_gc_run_interval', '10m0s'),
('tikv_gc_concurrency', '1'),
('max_connections', '4000');
-- 创建业务数据库
CREATE DATABASE IF NOT EXISTS financial_data;
USE financial_data;
-- 创建分片表
CREATE TABLE account_transactions (
transaction_id BIGINT NOT NULL AUTO_INCREMENT,
account_id VARCHAR(36) NOT NULL,
transaction_type ENUM('DEPOSIT', 'WITHDRAWAL', 'TRANSFER') NOT NULL,
amount DECIMAL(15,2) NOT NULL,
currency CHAR(3) NOT NULL DEFAULT 'CNY',
transaction_time TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
status ENUM('PENDING', 'COMPLETED', 'FAILED') NOT NULL DEFAULT 'PENDING',
-- 分区键
PRIMARY KEY (transaction_id, account_id),
-- 分片键
SHARD_ROW_ID_BITS = 4,
TIFLASH_REPLICA = 1 -- TiFlash副本
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
PARTITION BY HASH(MOD(transaction_id, 16))
TABLESPACE ts_accounts;
-- 创建二级索引
CREATE INDEX idx_account_time ON account_transactions(account_id, transaction_time);
CREATE INDEX idx_status_time ON account_transactions(status, transaction_time);
```
## TiKV存储引擎与Raft协议实现
TiKV基于RocksDB构建,通过Multi-Raft Group实现数据分片和副本管理。
```rust
// tikv-raft-engine.rs - Raft存储引擎核心实现
use raft::{RawNode, ProgressTracker, Config};
use raft::storage::{MemStorage, Storage};
use protobuf::Message;
use std::sync::{Arc, RwLock};
use rocksdb::{DB, Options, WriteBatch};
pub struct TiKVEngine {
db: Arc
raft_config: Config,
raft_groups: RwLock
}
pub struct RaftGroup {
raw_node: RawNode
region: metapb::Region,
peers: Vec
}
impl TiKVEngine {
pub fn new(db_path: &str) -> Result
// 初始化RocksDB
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_max_background_jobs(4);
opts.set_bytes_per_sync(1024 * 1024); // 1MB
let db = DB::open(&opts, db_path)?;
// Raft配置
let mut raft_config = Config::default();
raft_config.id = 1;
raft_config.election_tick = 10;
raft_config.heartbeat_tick = 3;
raft_config.max_size_per_msg = 1024 * 1024; // 1MB
raft_config.max_inflight_msgs = 256;
Ok(TiKVEngine {
db: Arc::new(db),
raft_config,
raft_groups: RwLock::new(HashMap::new()),
})
}
pub fn propose(&self, region_id: u64, data: Vec
let mut groups = self.raft_groups.write().unwrap();
if let Some(group) = groups.get_mut(®ion_id) {
// 创建Raft日志条目
let mut entry = eraftpb::Entry::new();
entry.set_entry_type(eraftpb::EntryType::EntryNormal);
entry.set_data(data);
// 提交到Raft
group.raw_node.propose(vec![], vec![entry])?;
// 获取日志索引
let last_index = group.raw_node.raft.raft_log.last_index();
// 等待提交
self.wait_for_commit(region_id, last_index)?;
Ok(last_index)
} else {
Err(EngineError::RegionNotFound(region_id))
}
}
pub fn write_batch(&self, batch: WriteBatch) -> Result<(), EngineError> {
let write_opts = WriteOptions::default();
self.db.write_opt(batch, &write_opts)?;
Ok(())
}
fn wait_for_commit(&self, region_id: u64, index: u64) -> Result<(), EngineError> {
let start = Instant::now();
let timeout = Duration::from_secs(5);
while start.elapsed() < timeout {
let groups = self.raft_groups.read().unwrap();
if let Some(group) = groups.get(®ion_id) {
let committed = group.raw_node.raft.raft_log.committed;
if committed >= index {
return Ok(());
}
}
thread::sleep(Duration::from_millis(10));
}
Err(EngineError::CommitTimeout)
}
pub fn handle_raft_message(&self, msg: raftpb::Message) -> Result<(), EngineError> {
let region_id = msg.region_id();
let mut groups = self.raft_groups.write().unwrap();
if let Some(group) = groups.get_mut(®ion_id) {
group.raw_node.step(msg)?;
// 处理Raft状态机
self.process_raft_ready(region_id, group)?;
}
Ok(())
}
fn process_raft_ready(&self, region_id: u64, group: &mut RaftGroup) -> Result<(), EngineError> {
// 检查是否有待处理的Ready
if !group.raw_node.has_ready() {
return Ok(());
}
let mut ready = group.raw_node.ready();
// 1. 持久化未保存的状态
if !ready.hs().is_empty() {
self.save_hard_state(region_id, ready.hs())?;
}
// 2. 保存日志条目
if !ready.entries().is_empty() {
self.save_entries(region_id, ready.entries())?;
}
// 3. 应用已提交的日志
let mut applied = false;
for entry in ready.committed_entries() {
if entry.get_entry_type() == eraftpb::EntryType::EntryNormal {
self.apply_entry(region_id, entry)?;
applied = true;
}
}
// 4. 发送消息
for msg in ready.messages() {
self.send_raft_message(msg)?;
}
// 5. 推进Raft状态
if applied {
let mut light_ready = group.raw_node.advance_apply();
if !light_ready.commit_index().is_empty() {
group.raw_node.advance_apply_to(light_ready.commit_index()[0]);
}
}
group.raw_node.advance(ready);
Ok(())
}
fn apply_entry(&self, region_id: u64, entry: &eraftpb::Entry) -> Result<(), EngineError> {
// 解析Entry数据
let mut write_batch = WriteBatch::default();
match entry.get_data_type() {
EntryDataType::Put => {
let put_cmd: PutRequest = protobuf::parse_from_bytes(entry.get_data())?;
// 构建RocksDB KV
write_batch.put(
encode_key(region_id, &put_cmd.key),
put_cmd.value
)?;
}
EntryDataType::Delete => {
let delete_cmd: DeleteRequest = protobuf::parse_from_bytes(entry.get_data())?;
write_batch.delete(encode_key(region_id, &delete_cmd.key))?;
}
_ => {}
}
// 写入存储
self.write_batch(write_batch)?;
Ok(())
}
}
```
## HTAP架构与列存引擎TiFlash
TiFlash作为列式存储引擎,通过Raft Learner协议实时同步TiKV数据,支持实时分析查询。
```sql
-- TiFlash配置与使用
-- 启用TiFlash副本
ALTER TABLE account_transactions SET TIFLASH REPLICA 2;
-- 检查TiFlash副本状态
SELECT * FROM information_schema.tiflash_replica
WHERE table_schema = 'financial_data'
AND table_name = 'account_transactions';
<"edr.j9k5.org.cn"><"uyr.j9k5.org.cn"><"eve.j9k5.org.cn">
-- 强制查询使用TiFlash(列存引擎)
SELECT /*+ READ_FROM_STORAGE(TIFLASH[at]) */
account_id,
DATE(transaction_time) as trans_date,
COUNT(*) as transaction_count,
SUM(amount) as total_amount
FROM account_transactions at
WHERE transaction_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
AND status = 'COMPLETED'
GROUP BY account_id, DATE(transaction_time)
ORDER BY total_amount DESC
LIMIT 100;
-- 混合负载查询示例(同时访问TiKV和TiFlash)
WITH realtime_stats AS (
-- TiFlash:历史数据分析
SELECT /*+ READ_FROM_STORAGE(TIFLASH[at]) */
account_id,
AVG(amount) as avg_amount_30d,
COUNT(*) as trans_count_30d
FROM account_transactions at
WHERE transaction_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
AND status = 'COMPLETED'
GROUP BY account_id
),
current_balance AS (
-- TiKV:实时余额查询
SELECT
account_id,
balance
FROM account_balance
WHERE update_time >= DATE_SUB(NOW(), INTERVAL 5 MINUTE)
)
SELECT
cb.account_id,
cb.balance as current_balance,
rs.avg_amount_30d,
rs.trans_count_30d,
CASE
WHEN cb.balance > rs.avg_amount_30d * 3 THEN 'HIGH_LIQUIDITY'
WHEN cb.balance < rs.avg_amount_30d THEN 'LOW_LIQUIDITY'
ELSE 'NORMAL_LIQUIDITY'
END as liquidity_status
FROM current_balance cb
LEFT JOIN realtime_stats rs ON cb.account_id = rs.account_id
WHERE cb.balance > 0
ORDER BY cb.balance DESC;
```
## PD调度器与弹性扩缩容
Placement Driver(PD)负责TiDB集群的元数据管理和调度决策。
```go
// pd-scheduler.go - 调度器实现
package scheduler
import (
"context"
"fmt"
"time"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedule/operator"
)
type BalanceRegionScheduler struct {
name string
cluster schedule.Cluster
conf *balanceRegionSchedulerConfig
filters []schedule.Filter
}
func (s *BalanceRegionScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator {
// 获取所有存储节点
stores := cluster.GetStores()
// 过滤健康节点
var candidates []*core.StoreInfo
for _, store := range stores {
if s.IsHealthyStore(store) {
candidates = append(candidates, store)
}
}
if len(candidates) < 2 {
return nil
}
// 计算负载均衡
balanceOps := s.balanceRegions(cluster, candidates)
if len(balanceOps) > 0 {
return balanceOps
}
// 处理热点Region
hotspotOps := s.handleHotspotRegions(cluster, candidates)
return hotspotOps
}
func (s *BalanceRegionScheduler) balanceRegions(cluster schedule.Cluster, stores []*core.StoreInfo) []*operator.Operator {
var ops []*operator.Operator
// 按Region数量排序
sort.Slice(stores, func(i, j int) bool {
return stores[i].GetRegionCount() > stores[j].GetRegionCount()
})
source := stores[0] // Region最多的节点
target := stores[len(stores)-1] // Region最少的节点
// 计算需要迁移的Region数量
avgRegionCount := s.calculateAverageRegionCount(stores)
diff := source.GetRegionCount() - target.GetRegionCount()
if diff <= 1 {
return nil
}
// 选择要迁移的Region
regions := cluster.GetRegionsByStore(source.GetID())
for _, region := range regions {
// 检查Region是否适合迁移
if s.isRegionSuitableForMove(region, source, target) {
op := s.createMoveRegionOperator(region, source, target)
ops = append(ops, op)
if len(ops) >= diff/2 { // 只迁移一半的差异
break
}
}
}
return ops
}
func (s *BalanceRegionScheduler) handleHotspotRegions(cluster schedule.Cluster, stores []*core.StoreInfo) []*operator.Operator {
// 检测热点Region
hotRegions := cluster.GetHotRegions()
var ops []*operator.Operator
for _, hotRegion := range hotRegions {
// 找到合适的目标节点
targetStore := s.findBestTargetForHotRegion(hotRegion, stores)
if targetStore != nil {
op := s.createHotRegionMoveOperator(hotRegion, targetStore)
ops = append(ops, op)
}
}
return ops
}
func (s *BalanceRegionScheduler) createMoveRegionOperator(region *core.RegionInfo, source, target *core.StoreInfo) *operator.Operator {
// 创建Region迁移操作
desc := fmt.Sprintf("move-region-%d-from-%d-to-%d",
region.GetID(), source.GetID(), target.GetID())
// 添加Peer到目标节点
addPeer := operator.AddPeer{
ToStore: target.GetID(),
PeerID: cluster.AllocID(),
}
// 从源节点移除Peer
removePeer := operator.RemovePeer{
FromStore: source.GetID(),
}
// 创建操作步骤
steps := []operator.OpStep{
operator.AddLearner{ToStore: target.GetID(), PeerID: addPeer.PeerID},
operator.ChangePeerV2Enter{},
operator.ChangePeerV2Leave{},
operator.RemovePeer{FromStore: source.GetID()},
}
return operator.NewOperator(desc, region.GetID(), region.GetRegionEpoch(), operator.OpRegion, steps...)
}
// 自动扩缩容控制器
type AutoScalingController struct {
cluster schedule.Cluster
metrics *MetricsCollector
scaleConfig *ScaleConfig
}
func (c *AutoScalingController) Run(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.evaluateAndScale()
}
}
}
func (c *AutoScalingController) evaluateAndScale() {
// 收集指标
metrics := c.metrics.Collect()
// CPU使用率判断
if metrics.AvgCPUUsage > c.scaleConfig.ScaleUpCPUThreshold {
c.scaleOut()
} else if metrics.AvgCPUUsage < c.scaleConfig.ScaleDownCPUThreshold {
c.scaleIn()
}
// 存储空间判断
if metrics.TotalStorageUsage > c.scaleConfig.ScaleUpStorageThreshold {
c.addStorageNode()
}
// QPS热点判断
if metrics.HotspotQPS > c.scaleConfig.HotspotThreshold {
c.splitHotRegions()
}
}
func (c *AutoScalingController) scaleOut() {
// 获取当前拓扑
currentTopology := c.cluster.GetTopology()
// 选择新节点规格
nodeSpec := c.chooseNodeSpec()
// 创建新TiKV节点
newNode := c.createTiKVNode(nodeSpec)
// 更新集群配置
c.cluster.AddStore(newNode)
// 触发Region重新平衡
c.triggerRebalance()
}
```
## 金融级事务保障机制
```sql
-- 分布式事务示例
BEGIN;
-- 账户A扣款
UPDATE account_balance
SET balance = balance - 1000,
version = version + 1,
update_time = NOW(6)
WHERE account_id = 'ACC001'
AND balance >= 1000
AND version = 1;
-- 检查影响行数
SELECT ROW_COUNT() INTO @rows_affected;
<"wef.j9k5.org.cn"><"sgv.j9k5.org.cn"><"hxd.j9k5.org.cn">
IF @rows_affected = 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = '账户余额不足或版本冲突';
END IF;
-- 账户B收款
UPDATE account_balance
SET balance = balance + 1000,
version = version + 1,
update_time = NOW(6)
WHERE account_id = 'ACC002'
AND version = 1;
-- 记录交易流水
INSERT INTO account_transactions (
account_id,
transaction_type,
amount,
currency,
related_account,
status
) VALUES
('ACC001', 'TRANSFER', -1000, 'CNY', 'ACC002', 'COMPLETED'),
('ACC002', 'TRANSFER', 1000, 'CNY', 'ACC001', 'COMPLETED');
-- 提交事务
COMMIT;
-- 开启大事务(TiDB支持)
SET SESSION tidb_enable_1pc = 0; -- 禁用1PC
SET SESSION tidb_txn_mode = 'optimistic'; -- 乐观事务
BEGIN;
-- 批量处理
INSERT INTO batch_processing_log (batch_id, start_time)
VALUES (UUID(), NOW(6));
-- 复杂业务逻辑
-- ...
-- 提交时使用2PC保证一致性
COMMIT;
-- 监控长时间运行的事务
SELECT
tidb_current_ts,
start_time,
timediff(NOW(), start_time) as duration,
info
FROM information_schema.processlist
WHERE command != 'Sleep'
AND time > 60 -- 超过60秒
ORDER BY time DESC;
```
## 监控与灾备方案
```yaml
# tidb-cluster-monitoring.yaml - 监控配置
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: tidb-prometheus
namespace: tidb-cluster
spec:
serviceAccountName: prometheus
serviceMonitorSelector: {}
podMonitorSelector: {}
resources:
requests:
memory: 4Gi
limits:
memory: 8Gi
# TiDB集群监控目标
additionalScrapeConfigs:
- job_name: 'tidb'
static_configs:
- targets:
- 'tidb-0.tidb-peer.tidb-cluster.svc:10080'
- 'tidb-1.tidb-peer.tidb-cluster.svc:10080'
- 'tidb-2.tidb-peer.tidb-cluster.svc:10080'
metrics_path: /metrics
- job_name: 'tikv'
static_configs:
- targets:
- 'tikv-0.tikv-peer.tidb-cluster.svc:20180'
- 'tikv-1.tikv-peer.tidb-cluster.svc:20180'
- 'tikv-2.tikv-peer.tidb-cluster.svc:20180'
metrics_path: /metrics
- job_name: 'pd'
static_configs:
- targets:
- 'pd-0.pd-peer.tidb-cluster.svc:2379'
- 'pd-1.pd-peer.tidb-cluster.svc:2379'
- 'pd-2.pd-peer.tidb-cluster.svc:2379'
metrics_path: /metrics
---
# 告警规则
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: tidb-alert-rules
namespace: tidb-cluster
spec:
groups:
- name: tidb.rules
rules:
- alert: TiKVStoreDown
expr: up{job="tikv"} == 0
for: 5m
labels:
severity: critical
annotations:
summary: "TiKV节点宕机"
description: "TiKV实例 {{ $labels.instance }} 已宕机超过5分钟"
- alert: HighTiKVLeaderBalance
expr: |
abs(
avg(tikv_store_leader_count)
by (instance)
- avg(avg(tikv_store_leader_count) without (instance))
)
/ avg(avg(tikv_store_leader_count) without (instance))
> 0.3
for: 10m
labels:
severity: warning
annotations:
summary: "TiKV Leader分布不均"
description: "TiKV节点 {{ $labels.instance }} 的Leader数量偏离平均值超过30%"
- alert: TiDBHighQueryLatency
expr: histogram_quantile(0.95, rate(tidb_server_handle_query_duration_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
annotations:
summary: "TiDB查询延迟过高"
description: "95%的查询延迟超过1秒"
- alert: PDLeaderChange
expr: changes(pd_server_is_leader[1h]) > 3
labels:
severity: warning
annotations:
summary: "PD Leader频繁切换"
description: "过去1小时内PD Leader切换超过3次"
```
基于Raft一致性协议和HTAP混合负载架构的TiDB分布式数据库,为金融级应用提供了高可用、强一致性和实时分析能力。通过计算与存储分离、Multi-Raft副本管理、智能调度和列存引擎等技术,TiDB能够满足大规模在线事务处理和实时分析的双重需求。在实际部署中,需要根据业务特征合理设计数据分片策略,建立完善的监控告警和灾备体系,确保数据库服务的稳定可靠运行。随着数据规模的增长,TiDB的弹性扩展能力将为企业数字化转型提供坚实的数据基础设施支撑。