## 前言
公司项目验收过程中要求使用国产的海量数据库,项目是由多个系统组成,有统一的用户中心作为基础底座并仿照字节、钉钉等开放平台自研了一套数据库变更订阅系统,用于实时捕获数据库数据变更,并推送到 Kafka 供下游系统消费,从而实现主动把数据变动同步给三方应用达到数据共享的目的。由于这套数据同步机制原本是基于 mysql 数据库的 binlog 日志、使用了 canal 进行数据同步的方案,但是项目要求使用 vastbase 国产数据库,没有 binlog 的概念,所以就无法使用 canal 了,需要自己实现一个适配器去对接 vastbase 数据库的逻辑复制协议来实现数据变动消息写入 kafka 达到与原来同步功能等效的目的。
本文详细记录了 VastBase CDC 适配器的完整技术实现过程,包括数据库逻辑复制协议逆向工程、问题排查和解决方案。这是一次从零开始适配非标准数据库复制协议的技术探索之旅。
## 一、项目背景
### 1.1 需求
需要实时捕获 VastBase 数据库的数据变更(INSERT/UPDATE/DELETE),并推送到 Kafka 供下游系统消费。
因原系统是 go 语言写的,所以这里就也用 go 语言实现了,后面的实现过程都是基于 go 。
### 1.2 技术选型考虑
| 方案 | 优点 | 缺点 | 结论 || ------------- | -------------- | ---------------- | ------------------- || 触发器 + 队列 | 稳定可靠 | 需要修改数据库 | ❌ 侵入性太强 || 定期轮询 | 简单 | 实时性差、性能低 | ❌ 不满足实时性要求 || Debezium | 功能完善 | 不支持 VastBase | ❌ 协议不兼容 || **逻辑复制** | 无侵入、性能好 | 需要协议适配 | ✅ 最优方案 |
### 1.3 初始假设
VastBase 基于 PostgreSQL,应该兼容 PostgreSQL 的逻辑复制协议。
**结果**:这个假设是错误的!VastBase 使用了自定义的复制协议。
## 二、初步尝试与问题发现
### 2.1 第一次尝试:使用标准 PostgreSQL 协议
```go// 尝试使用标准 pglogrepl 库err := pglogrepl.StartReplication(ctx, conn, slotName, startLSN, pglogrepl.StartReplicationOptions{ PluginArgs: []string{ "proto_version '1'", "publication_names 'k12_publication'", }, })```
**结果**:
- ✅ 连接成功- ✅ 复制槽创建成功- ✅ 启动复制成功- ❌ **30 秒后连接断开**
### 2.2 问题现象
```2024-11-19 10:23:45 连接到 VastBase 成功2024-11-19 10:23:46 启动逻辑复制成功2024-11-19 10:23:47 收到心跳消息 (32 字节)2024-11-19 10:24:15 收到心跳消息 (32 字节)2024-11-19 10:24:17 连接已关闭: unexpected EOF```
**关键发现**:
1. 心跳消息是 32 字节(标准 PostgreSQL 是 17 字节)2. 连接在约 30 秒后被服务器主动断开3. 没有收到任何错误消息
## 三、协议逆向工程
### 3.1 寻找线索
既然 VastBase 有官方的 JDBC 驱动,那么驱动源码中一定包含了正确的协议实现。
**关键文件**:`vastbase-jdbc-2.12_vb_2025071514.jar.src/cn/com/vastbase/core/v3/replication/V3PGReplicationStream.java`
### 3.2 心跳消息逆向
#### 3.2.1 抓包分析
编写 go 代码分析心跳包,发现 VastBase 发送的心跳消息:
```6B 78 49 AA 3B 00 00 00 00 01 00 00 00 01 00 00 00A1 B2 C3 D4 E5 F6 07 08 01 00 00 00 00 00 00```
**分析**:
- 第 1 字节:`0x6B` ('k') - 消息类型- 第 2-9 字节:WAL 位置- 第 10-32 字节:未知字段
#### 3.2.2 JDBC 驱动源码分析
在 `V3PGReplicationStream.java` 第 235-256 行找到关键代码:
```javaprivate boolean processKeepAliveMessage(ByteBuffer buffer) { buffer.order(ByteOrder.LITTLE_ENDIAN); // 关键!Little-endian this.lastServerLSN = LogSequenceNumber.valueOf(buffer.getLong()); int serverMode = buffer.getInt(); // 字节 9-12 int dbState = buffer.getInt(); // 字节 13-16 long lastServerClock = buffer.getLong(); // 字节 17-24 boolean replyRequired = (buffer.get() != 0); // 字节 25 return replyRequired;}```
**分析成果**:VastBase 心跳消息格式(32 字节)
```字节 0: 消息类型 0x6B ('k')字节 1-8: WAL 位置 (8字节, little-endian)字节 9-12: serverMode (4字节, little-endian)字节 13-16: dbState (4字节, little-endian)字节 17-24: 时间戳 (8字节, little-endian)字节 25: replyRequired (1字节)字节 26-32: 保留字段```
#### 3.2.3 Go 语言实现
```gofunc (v *VastBaseCDC) parseVastBaseKeepalive(data []byte) bool { if len(data) u003c 26 { return false }
// Little-endian 解析 WAL 位置 walPos := uint64(data[1]) | uint64(data[2])u003cu003c8 | uint64(data[3])u003cu003c16 | uint64(data[4])u003cu003c24 | uint64(data[5])u003cu003c32 | uint64(data[6])u003cu003c40 | uint64(data[7])u003cu003c48 | uint64(data[8])u003cu003c56
// 提取 serverMode serverMode := uint32(data[9]) | uint32(data[10])u003cu003c8 | uint32(data[11])u003cu003c16 | uint32(data[12])u003cu003c24
// 提取 dbState dbState := uint32(data[13]) | uint32(data[14])u003cu003c8 | uint32(data[15])u003cu003c16 | uint32(data[16])u003cu003c24
// 提取时间戳 timestamp := uint64(data[17]) | uint64(data[18])u003cu003c8 | uint64(data[19])u003cu003c16 | uint64(data[20])u003cu003c24 | uint64(data[21])u003cu003c32 | uint64(data[22])u003cu003c40 | uint64(data[23])u003cu003c48 | uint64(data[24])u003cu003c56
// 提取 replyRequired replyRequired := data[25] != 0
return replyRequired}```
### 3.3 状态更新消息逆向
#### 3.3.1 第一次尝试:使用标准格式
```go// 尝试使用标准 PostgreSQL 的 SendStandbyStatusUpdateerr := pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{ WALWritePosition: lsn, })```
**结果**:发送后立即报错
```错误: 信息中剩下的数据不够```
**分析**:VastBase 期望的消息格式与标准 PostgreSQL 不同。
#### 3.3.2 JDBC 驱动源码深度分析
在 `V3PGReplicationStream.java` 第 199-232 行找到状态更新构造代码:
```javaprivate byte[] prepareUpdateStatus(LogSequenceNumber received, LogSequenceNumber flushed, LogSequenceNumber applied, boolean replyRequired) { ByteBuffer byteBuffer = ByteBuffer.allocate(65); // 65 字节! byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
long now = System.currentTimeMillis(); long systemClock = TimeUnit.MICROSECONDS.convert( now - 946684800000L, TimeUnit.MICROSECONDS);
byteBuffer.put((byte)114); // 'r' byteBuffer.putLong(Long.MAX_VALUE); // 保留字段 1 byteBuffer.putLong(received.asLong()); // Received LSN byteBuffer.putLong(flushed.asLong()); // Flushed LSN byteBuffer.putLong(Long.MAX_VALUE); // 保留字段 2 byteBuffer.putLong(applied.asLong()); // Applied LSN byteBuffer.putInt(Integer.MAX_VALUE); // 保留字段 3 byteBuffer.putInt(Integer.MAX_VALUE); // 保留字段 4 byteBuffer.putLong(systemClock); // 时间戳 byteBuffer.put(replyRequired ? (byte)1 : (byte)0); byteBuffer.putInt(0); // 保留字段 5 byteBuffer.put((byte)1); // 标志位 1 byteBuffer.put((byte)1); // 标志位 2 byteBuffer.put((byte)1); // 标志位 3
return byteBuffer.array();}```
**关键发现**:
1. 消息长度是 65 字节(标准 PostgreSQL 是 34 字节)2. 使用 Little-endian 字节序3. 包含大量保留字段(0xFF 和 0x7F)4. 时间戳使用 PostgreSQL epoch(2000-01-01)
#### 3.3.3 第二次尝试:直接发送 65 字节
```go// 构造 65 字节消息statusBuf := make([]byte, 65)// ... 填充数据 ...
// 直接发送conn.Conn().Write(statusBuf)```
**结果**:仍然失败,连接断开
#### 3.3.4 继续深挖:CopyData 包装
在 JDBC 驱动的 `writeToCopy` 方法中发现关键代码:
```java// V3PGReplicationStream.java 第 122 行private void updateStatusInternal(LogSequenceNumber received, LogSequenceNumber flushed, LogSequenceNumber applied, boolean replyRequired) { byte[] reply = prepareUpdateStatus(received, flushed, applied, replyRequired); this.copyDual.writeToCopy(reply, 0, reply.length); this.copyDual.flushCopy();}```
继续追踪 `writeToCopy` 方法:
```java// PGStream.javapublic void sendChar(int val) throws IOException { pg_output.write((byte)val);}
public void sendInteger4(int val) throws IOException { pg_output.write((val u003eu003eu003e 24) u0026 0xFF); pg_output.write((val u003eu003eu003e 16) u0026 0xFF); pg_output.write((val u003eu003eu003e 8) u0026 0xFF); pg_output.write(val u0026 0xFF);}
public void send(byte[] buf) throws IOException { pg_output.write(buf);}```
**分析成果**:必须使用 CopyData 协议包装!
```完整消息格式(70 字节):字节 0: 'd' (0x64) - CopyData 消息类型字节 1-4: 消息长度 69 (big-endian)字节 5-69: 状态更新数据 (65 字节)```
#### 3.3.5 最终实现
```gofunc (v *VastBaseCDC) sendVastBaseStatusUpdate(ctx context.Context, lsn pglogrepl.LSN) error { // 1. 构造 65 字节状态更新 statusBuf := make([]byte, 65) statusBuf[0] = 'r' // 0x72
// 保留字段 1 (字节 1-8): 0xFFFFFFFFFFFFFFFF for i := 1; i u003c= 8; i++ { statusBuf[i] = 0xFF }
// Received LSN (字节 9-16, little-endian) receivedLSN := uint64(lsn) statusBuf[9] = byte(receivedLSN) statusBuf[10] = byte(receivedLSN u003eu003e 8) statusBuf[11] = byte(receivedLSN u003eu003e 16) statusBuf[12] = byte(receivedLSN u003eu003e 24) statusBuf[13] = byte(receivedLSN u003eu003e 32) statusBuf[14] = byte(receivedLSN u003eu003e 40) statusBuf[15] = byte(receivedLSN u003eu003e 48) statusBuf[16] = byte(receivedLSN u003eu003e 56)
// ... 其他字段类似 ...
// 时间戳转换 now := time.Now() unixMicros := now.UnixMicro() pgMicros := uint64(unixMicros - 946684800000000) // PG epoch statusBuf[49] = byte(pgMicros) // ... 写入其他字节 ...
// 2. CopyData 包装(关键!) copyDataLen := len(statusBuf) + 4 // 69 copyDataBuf := make([]byte, 1+4+len(statusBuf)) // 70
copyDataBuf[0] = 'd' // CopyData 类型
// 长度字段 (big-endian) copyDataBuf[1] = byte(copyDataLen u003eu003e 24) copyDataBuf[2] = byte(copyDataLen u003eu003e 16) copyDataBuf[3] = byte(copyDataLen u003eu003e 8) copyDataBuf[4] = byte(copyDataLen)
// 复制状态数据 copy(copyDataBuf[5:], statusBuf)
// 3. 发送 _, err := v.conn.Conn().Write(copyDataBuf) if err != nil { return err }
// 4. 刷新缓冲区 if flusher, ok := v.conn.Conn().(interface{ Flush() error }); ok { return flusher.Flush() }
return nil}```
### 3.4 心跳回复策略逆向
#### 3.4.1 观察 JDBC 驱动行为
在 `readInternal` 方法中发现心跳回复逻辑:
```javaprivate ByteBuffer readInternal(boolean block) throws SQLException { boolean updateStatusRequired = false;
while (this.copyDual.isActive()) { // 检查是否需要定期更新(每 10 秒) if (updateStatusRequired || isTimeUpdate()) { timeUpdateStatus(); }
ByteBuffer buffer = receiveNextData(block); if (buffer == null) return null;
int code = buffer.get(); switch (code) { case 107: // 'k' - Keepalive updateStatusRequired = processKeepAliveMessage(buffer); continue; case 119: // 'w' - XLogData return processXLogData(buffer); } }}```
**关键发现**:
1. 定期更新:每 10 秒主动发送一次2. 响应式回复:当 replyRequired=true 时立即回复3. 数据处理后:不立即发送,等待下次定期更新
#### 3.4.2 实现心跳策略
```gofunc (v *VastBaseCDC) receiveMessages(ctx context.Context) error { standbyMessageTimeout := time.Second * 5 var clientXLogPos pglogrepl.LSN lastStatusUpdate := time.Now().Add(-time.Second * 10) statusUpdateInterval := time.Second * 10 // 10 秒间隔
for { // 1. 检查是否需要定期更新 updateStatusRequired := false if time.Since(lastStatusUpdate) u003e= statusUpdateInterval { updateStatusRequired = true }
if updateStatusRequired { if err := v.sendVastBaseStatusUpdate(ctx, clientXLogPos); err != nil { log.Printf("定期状态更新失败: %v", err) } else { lastStatusUpdate = time.Now() log.Printf("✓ 定期状态更新: LSN=%s", clientXLogPos) } }
// 2. 接收消息 ctx2, cancel := context.WithTimeout(ctx, standbyMessageTimeout) msg, err := v.conn.ReceiveMessage(ctx2) cancel()
if err != nil { if pgconn.Timeout(err) { continue // 超时正常,继续循环 } return err }
// 3. 处理消息 switch msg := msg.(type) { case *pgproto3.CopyData: switch msg.Data[0] { case pglogrepl.XLogDataByteID: // 处理数据变更 xld, _ := pglogrepl.ParseXLogData(msg.Data[1:]) v.handleXLogData(ctx, xld.WALData) clientXLogPos = xld.WALStart + pglogrepl.LSN(len(xld.WALData)) v.lastLSN = clientXLogPos // 注意:不立即发送状态更新
case pglogrepl.PrimaryKeepaliveMessageByteID: // 解析心跳 replyRequired := v.parseVastBaseKeepalive(msg.Data)
// 如果服务器要求回复,立即发送 if replyRequired { if err := v.sendVastBaseStatusUpdate(ctx, clientXLogPos); err != nil { log.Printf("回复心跳失败: %v", err) } else { lastStatusUpdate = time.Now() log.Printf("✓ 回复心跳: LSN=%s", clientXLogPos) } } } } }}```
## 四、关键技术难点突破
### 4.1 字节序问题
**问题**:VastBase 使用 Little-endian,而标准 PostgreSQL 使用 Big-endian
**解决**:
```go// Little-endian 读取value := uint64(data[0]) | uint64(data[1])u003cu003c8 | uint64(data[2])u003cu003c16 | uint64(data[3])u003cu003c24 | uint64(data[4])u003cu003c32 | uint64(data[5])u003cu003c40 | uint64(data[6])u003cu003c48 | uint64(data[7])u003cu003c56
// Little-endian 写入buf[0] = byte(value)buf[1] = byte(value u003eu003e 8)buf[2] = byte(value u003eu003e 16)buf[3] = byte(value u003eu003e 24)buf[4] = byte(value u003eu003e 32)buf[5] = byte(value u003eu003e 40)buf[6] = byte(value u003eu003e 48)buf[7] = byte(value u003eu003e 56)```
### 4.2 时间戳转换
**问题**:PostgreSQL 使用 2000-01-01 作为 epoch,而 Unix 使用 1970-01-01
**计算**:
- PostgreSQL epoch: 2000-01-01 00:00:00 UTC- Unix epoch: 1970-01-01 00:00:00 UTC- 差值: 946684800 秒 = 946684800000000 微秒
**实现**:
```gonow := time.Now()unixMicros := now.UnixMicro()pgMicros := uint64(unixMicros - 946684800000000)```
### 4.3 CopyData 包装
**问题**:直接发送 65 字节消息失败
**原因**:VastBase 期望使用 CopyData 协议包装
**解决**:
```go// 外层包装copyDataBuf[0] = 'd' // CopyData 类型copyDataBuf[1] = byte(len u003eu003e 24) // 长度 (big-endian)copyDataBuf[2] = byte(len u003eu003e 16)copyDataBuf[3] = byte(len u003eu003e 8)copyDataBuf[4] = byte(len)copy(copyDataBuf[5:], statusBuf) // 内层数据```
### 4.4 保留字段填充
**问题**:保留字段应该填充什么值?
**发现**:通过 JDBC 驱动源码发现:
- `0xFFFFFFFFFFFFFFFF` (Long.MAX_VALUE)- `0x7FFFFFFF` (Integer.MAX_VALUE)- `0x00000000` (0)- `0x01` (标志位)
**实现**:
```go// 保留字段 1: 0xFFFFFFFFFFFFFFFFfor i := 1; i u003c= 8; i++ { statusBuf[i] = 0xFF}
// 保留字段 3: 0x7FFFFFFFstatusBuf[41] = 0xFFstatusBuf[42] = 0xFFstatusBuf[43] = 0xFFstatusBuf[44] = 0x7F
// 标志位: 0x01statusBuf[62] = 1statusBuf[63] = 1statusBuf[64] = 1```
## 五、测试与验证
### 5.1 协议正确性验证
#### 测试工具
```go// test_heartbeat.gofunc main() { // 连接到 VastBase conn, _ := pgconn.Connect(context.Background(), dsn)
// 启动复制 pglogrepl.StartReplication(ctx, conn, slotName, 0, opts)
// 接收心跳 for { msg, _ := conn.ReceiveMessage(ctx) if copyData, ok := msg.(*pgproto3.CopyData); ok { if copyData.Data[0] == pglogrepl.PrimaryKeepaliveMessageByteID { // 打印十六进制 fmt.Printf("心跳消息:%s\", hex.Dump(copyData.Data))
// 解析 replyRequired := parseVastBaseKeepalive(copyData.Data) fmt.Printf("需要回复: %v\", replyRequired)
// 发送状态更新 sendVastBaseStatusUpdate(ctx, conn, lsn) } } }}```
#### 测试结果
```心跳消息:00000000 6b 78 49 aa 3b 00 00 00 00 01 00 00 00 01 00 00 |kxI.;...........|00000010 00 a1 b2 c3 d4 e5 f6 07 08 01 00 00 00 00 00 00 |................|需要回复: true
发送状态更新:00000000 64 00 00 00 45 72 ff ff ff ff ff ff ff ff 78 49 |d...Er........xI|00000010 aa 3b 00 00 00 00 78 49 aa 3b 00 00 00 00 ff ff |.;....xI.;......|00000020 ff ff ff ff ff ff 78 49 aa 3b 00 00 00 00 ff ff |......xI.;......|00000030 ff 7f ff ff ff 7f a1 b2 c3 d4 e5 f6 07 08 00 00 |................|00000040 00 00 00 01 01 01 |......|
✓ 状态更新发送成功✓ 连接保持稳定```
#### 测试视频
!video[vastbase数据变更同步到kafka](https://cloud.tencent.com/developer/video/84497)
### 5.2 长时间稳定性测试
# 运行 24 小时测试go run main.go -config config.yaml</p><h3>监控日志tail -f logs/cdc.log | grep "心跳\|断开"
**测试结果**:
- ✅ 连接保持 24 小时稳定- ✅ 无异常断开- ✅ 心跳正常(每 10 秒一次)- ✅ 数据同步正常
## 六、性能优化
### 6.1 批量发送优化
```go// 配置批量参数writer := u0026kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, BatchSize: 100, // 批量大小 BatchTimeout: 1000 * time.Millisecond, // 批量超时 Compression: kafka.Gzip, // 启用压缩}```
**效果**:
- 吞吐量提升 5 倍- 网络流量减少 60%
### 6.2 LSN 持久化优化
```go// 定期保存 LSN(每 30 秒)go v.lsnTracker.SavePeriodically( func() pglogrepl.LSN { return v.lastLSN }, 30*time.Second, v.stopCh,)```
**效果**:
- 重启后从上次位置继续- 避免重复消费
## 七、协议对比总结
### 7.1 消息格式对比
| 特性 | PostgreSQL | VastBase | 差异 || ------------ | ---------- | ------------------- | -------- || 心跳消息长度 | 17 字节 | 32 字节 | +15 字节 || 状态更新长度 | 34 字节 | 65 字节 | +31 字节 || 外层包装 | 无 | CopyData (5 字节) | 必须 || 总长度 | 34 字节 | 70 字节 | +36 字节 || 字节序 | Big-endian | Little-endian | 完全不同 || 扩展字段 | 无 | serverMode, dbState | 新增 || 保留字段 | 无 | 31 字节 | 新增 |
### 7.2 协议兼容性
```PostgreSQL 协议 ❌ 不兼容 VastBaseVastBase 协议 ❌ 不兼容 PostgreSQL```
**结论**:必须针对 VastBase 单独实现协议适配。
## 八、经验总结
### 8.1 逆向工程方法论
1. **从现象入手**:观察错误日志和异常行为2. **寻找参考实现**:官方驱动是最好的参考3. **源码阅读**:深入理解协议细节4. **逐步验证**:小步快跑,逐个突破5. **完整测试**:确保稳定性和正确性
### 8.2 关键技术点
1. **字节序**:Little-endian vs Big-endian2. **消息包装**:CopyData 协议3. **时间戳**:PostgreSQL epoch 转换4. **保留字段**:正确填充特殊值5. **心跳策略**:定期 + 响应式
### 8.3 调试技巧
```go// 1. 十六进制打印import "encoding/hex"fmt.Printf("消息内容:%s\", hex.Dump(data))
// 2. 字节逐个打印for i, b := range data { fmt.Printf("字节 %d: 0x%02X (%d)\", i, b, b)}
// 3. 对比标准实现// 发送标准 PostgreSQL 消息pglogrepl.SendStandbyStatusUpdate(...)// 发送 VastBase 消息sendVastBaseStatusUpdate(...)// 对比差异```
## 九、未来展望
### 9.1 开源贡献
本项目的协议对接部分已在 GitHub 开源:**[https://github.com/xdmjun/vastbase-replication-go](https://github.com/xdmjun/vastbase-replication-go)**
本项目的协议逆向成果可以帮助:
- 其他需要对接 VastBase 的项目- VastBase 生态工具开发- 数据库协议研究
欢迎通过以下方式参与贡献:
- 提交 Issue 反馈问题- 提交 Pull Request 改进代码- 完善文档和示例- 分享使用经验
## 十、参考资料
### 10.1 源码参考
- VastBase JDBC 驱动:`V3PGReplicationStream.java`- PostgreSQL 复制协议文档- pglogrepl Go 库源码
### 10.2 工具
- Hex Editor:二进制文件查看- Go Debugger:代码调试
## 结语
这次 VastBase CDC 适配器的开发是一次完整的协议逆向工程实践。从最初的"30 秒断线"问题,到最终实现稳定的长连接,经历了:
1. 问题发现与分析2. 协议逆向与分析3. 代码实现与调试4. 测试验证与优化
整个过程充满挑战,但也收获满满。希望这份文档能够帮助其他开发者理解 VastBase 的复制协议,并为类似的技术探索提供参考。
**核心收获**:
- ✅ 掌握了数据库复制协议的逆向方法- ✅ 理解了 VastBase 与 PostgreSQL 的差异- ✅ 实现了稳定可靠的 CDC 解决方案- ✅ 积累了协议适配的宝贵经验
**最终成果**:
- 连接稳定性:从 30 秒断线到长时间稳定- 数据完整性:零丢失- 性能表现:满足生产环境要求- 代码质量:清晰、可维护、可扩展
以上就是VastBase CDC 适配器技术深度解析的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号