Hyperliquid 仓位监控服务 - 实时追踪链上大户仓位变化,捕获交易信号
utrading-hl-monitor 通过 WebSocket 连接 Hyperliquid,实时监听指定地址的仓位变化和订单成交事件,并将交易信号发布到 NATS 供下游服务使用。
- WebSocket 双订阅模式 - 同时订阅 orderFills(成交数据)和 orderUpdates(状态变化)
- 多连接负载均衡 - 自动管理多个 WebSocket 连接(5-10 个),支持大规模地址监控
- 智能订阅管理 - 每连接最多 100 个订阅,自动选择负载最少的连接
- 仓位实时追踪 - 订阅现货余额和合约仓位变化
- 订单成交聚合 - 智能聚合同一订单的多次 fill,计算加权平均价格
- 双触发机制 - 状态触发(filled/canceled)+ 超时触发(5 分钟)
- 反手订单处理 - 自动拆分反手订单为平仓+开仓两个信号
- 平仓比例计算 - 精确计算 CloseRate(平仓数量/持仓数量)
- 消息乱序处理 - OrderStatusTracker 处理先收到状态后收到 fill 的情况
- 订单去重机制 - 服务重启时自动加载已发送订单,防止重复处理
- 异步消息队列 - 4 个 worker 并发处理,队列满时自动降级为同步处理
- 批量数据库写入 - 缓冲区内去重,批量大小 100 条,刷新间隔 2 秒
- 多层缓存机制 - Symbol 转换、价格数据、订单去重、持仓余额缓存
- 协程池优化 - 使用 ants.Pool 管理并发任务(30 workers)
- 数据清理器 - 定期清理历史数据,防止数据库膨胀
- 健康检查 - HTTP 端点监控服务状态
- Prometheus 指标 - 缓存、队列、批量写入、订单聚合等核心指标
- 结构化日志 - 基于 zerolog 的详细日志记录
- 优雅关闭 - 信号处理和资源清理
flowchart TB
subgraph External["🌐 外部数据源"]
HL[("Hyperliquid<br/>WebSocket API")]
DB[("MySQL<br/>监控地址")]
end
subgraph Input["📥 数据采集层"]
AL[AddressLoader<br/>定期加载地址<br/>间隔: 5m]
PM[PoolManager<br/>连接池管理<br/>连接数: 5-10<br/>每连接订阅: ≤100]
end
subgraph WS["🔌 WebSocket 订阅层"]
WS1[连接 1]
WS2[连接 2]
WS3[连接 N]
SUB1[orderFills<br/>成交数据]
SUB2[orderUpdates<br/>状态变化]
SUB3[position<br/>仓位数据]
end
subgraph Aggregate["🔄 订单聚合层"]
OA[OrderAggregator<br/>双触发聚合<br/>● 状态触发: filled/canceled<br/>● 超时触发: 5分钟]
OST[OrderStatusTracker<br/>消息乱序处理<br/>TTL: 10分钟]
end
subgraph Position["📍 仓位管理层"]
POS_MGR[PositionManager<br/>仓位数据处理<br/>● processPositionCache<br/>● 分流处理]
end
subgraph Process["⚙️ 消息处理层"]
MQ[MessageQueue<br/>异步队列<br/>● 队列大小: 1000<br/>● Workers: 4<br/>● 背压保护<br/>● 处理订单+仓位]
OP[OrderProcessor<br/>订单处理引擎<br/>● PendingOrderCache<br/>● TID 去重<br/>● 协程池: 30 workers]
POS_PROC[PositionProcessor<br/>仓位处理引擎<br/>● PositionCacheItem<br/>● 写入缓冲区]
BW[BatchWriter<br/>批量写入<br/>● 批量大小: 100<br/>● 刷新间隔: 2s<br/>● 缓冲区去重<br/>● 订单+仓位双路径]
end
subgraph Cache["💾 缓存层"]
SC[SymbolCache<br/>Symbol 双向转换]
PC[PriceCache<br/>现货/合约价格]
DC[DedupCache<br/>订单去重<br/>TTL: 30分钟]
PBC[PositionBalanceCache<br/>仓位余额缓存<br/>实时查询支持]
end
subgraph Output["📤 输出层"]
NATS[NATS Publisher<br/>交易信号发布]
end
subgraph Storage["💾 持久化层"]
MYSQL[("MySQL<br/>数据持久化<br/>● hl_order_aggregation<br/>● hl_position_cache<br/>● hl_address_signal")]
CONSUMER[下游消费者]
end
subgraph Maintenance["🔧 维护层"]
DCLEANER[DataCleaner<br/>定期清理<br/>● 聚合数据: 2h<br/>● 信号数据: 7d]
MONITOR[HealthServer<br/>健康检查<br/>Prometheus 指标]
end
HL -->|"WebSocket 连接"| PM
DB -->|"SQL 查询"| AL
AL -->|"地址列表"| PM
PM -->|"分配订阅"| WS1
PM -->|"分配订阅"| WS2
PM -->|"分配订阅"| WS3
WS1 --> SUB1
WS1 --> SUB2
WS1 --> SUB3
WS2 --> SUB1
WS2 --> SUB2
WS2 --> SUB3
WS3 --> SUB1
WS3 --> SUB2
WS3 --> SUB3
SUB1 -->|"成交事件"| OA
SUB2 -->|"状态变更"| OA
OA -->|"聚合结果"| OST
OST -->|"处理消息"| MQ
SUB3 -->|"仓位更新"| POS_MGR
POS_MGR -->|"Enqueue<br/>PositionCacheMessage"| MQ
MQ -->|"分发任务<br/>订单消息"| OP
MQ -->|"分发任务<br/>仓位消息"| POS_PROC
OP -->|"发布信号"| NATS
OP -->|"写入缓冲<br/>OrderAggregationItem"| BW
POS_PROC -->|"写入缓冲<br/>PositionCacheItem"| BW
POS_MGR -->|"更新内存缓存"| PBC
BW -->|"批量写入<br/>订单聚合表"| MYSQL
BW -->|"批量写入<br/>仓位缓存表"| MYSQL
NATS -->|"消费信号"| CONSUMER
OP -->|"查询/更新"| DC
OP -->|"查询/更新"| SC
OP -->|"查询/更新"| PC
MYSQL -->|"清理任务"| DCLEANER
MYSQL -->|"监控指标"| MONITOR
classDef sourceStyle fill:#0d1117,stroke:#58a6ff,stroke-width:2px,color:#c9d1d9
classDef wsStyle fill:#161b22,stroke:#a371f7,stroke-width:2px,color:#c9d1d9
classDef positionStyle fill:#161b22,stroke:#f78166,stroke-width:2px,color:#c9d1d9
classDef processStyle fill:#161b22,stroke:#f0883e,stroke-width:2px,color:#c9d1d9
classDef cacheStyle fill:#161b22,stroke:#3fb950,stroke-width:2px,color:#c9d1d9
classDef outputStyle fill:#161b22,stroke:#ff7b72,stroke-width:2px,color:#c9d1d9
classDef storageStyle fill:#161b22,stroke:#79c0ff,stroke-width:2px,color:#c9d1d9
classDef maintainStyle fill:#161b22,stroke:#7ee787,stroke-width:2px,color:#c9d1d9
class HL,DB sourceStyle
class AL,PM,WS1,WS2,WS3,SUB1,SUB2,SUB3 wsStyle
class POS_MGR positionStyle
class OA,OST,MQ,OP,POS_PROC,BW processStyle
class SC,PC,DC,PBC cacheStyle
class NATS outputStyle
class MYSQL,CONSUMER storageStyle
class DCLEANER,MONITOR maintainStyle
数据流关键修正:
- 仓位数据路径更新:PositionManager 现在通过 MessageQueue 进行异步处理,而不是直接更新缓存
- 新增 PositionProcessor:专门处理仓位消息的处理器,将数据写入 BatchWriter
- 双路径设计:
- 写入路径:WebSocket → PositionManager → MessageQueue → PositionProcessor → BatchWriter → MySQL
- 缓存路径:PositionManager → PositionBalanceCache(内存缓存,供实时查询)
- BatchWriter 增强:同时处理订单聚合数据和仓位缓存数据的批量写入
sequenceDiagram
participant DB as 🗄️ MySQL<br/>监控地址
participant PM as 🔌 PoolManager
participant WS as 📡 WebSocket
participant OA as 🔄 OrderAggregator
participant POS as 📍 PositionManager
participant MQ as 📬 MessageQueue
participant OP as ⚙️ OrderProcessor
participant PP as ⚙️ PositionProcessor
participant BW as 💾 BatchWriter
participant NATS as 📤 NATS
participant PBC as 💾 PositionBalanceCache
rect rgb(13, 17, 23)
Note over DB,PM: 📅 定期加载监控地址 (每 5 分钟)
DB->>PM: 1. 查询激活地址列表
PM->>WS: 2. 分配订阅到最优连接
end
rect rgb(22, 27, 34)
Note over WS,OA: ⚡ 实时数据接收与聚合
WS->>WS: 接收 orderFills 事件
WS->>WS: 接收 orderUpdates 事件
WS->>OA: 3. 转发消息到聚合器
OA->>OA: 双触发检测<br/>● 状态触发: filled/canceled<br/>● 超时触发: 5 分钟
end
rect rgb(13, 17, 23)
Note over OA,OP: 🔄 订单消息异步处理
OA->>MQ: 4. 聚合完成,入队
MQ->>OP: 5. Worker 处理 (4 并发)
OP->>OP: ● TID 去重检查<br/>● CloseRate 计算<br/>● 构建交易信号
end
rect rgb(22, 27, 34)
Note over WS,POS: 📍 仓位数据接收与分流
WS->>POS: 6. 仓位变化通知 (WebData2)
POS->>POS: processPositionCache<br/>解析现货余额和合约仓位
end
rect rgb(13, 17, 23)
Note over POS,BW: 📤 仓位数据双路径处理
POS->>MQ: 7a. Enqueue PositionCacheMessage
POS->>PBC: 7b. 更新内存缓存<br/>实时查询支持
MQ->>PP: 8. Worker 处理仓位消息
PP->>BW: 9. 加入批量缓冲区<br/>PositionCacheItem
end
rect rgb(22, 27, 34)
Note over OP,NATS: 📤 信号发布与持久化
OP->>NATS: 10. 发布交易信号
OP->>BW: 11. 加入批量缓冲区<br/>OrderAggregationItem
BW->>BW: 缓冲区去重 (覆盖旧值)
Note over BW: ⏱️ 定时刷新 (100 条 / 2 秒)
BW->>DB: 12. 批量写入持久化<br/>订单聚合表 + 仓位缓存表
end
flowchart LR
subgraph Input["📥 数据输入"]
FILL[OrderFill<br/>成交数据]
UPDATE[OrderUpdate<br/>状态变化]
end
subgraph Layer1["🔒 第一层: OrderDeduper"]
CHECK1{已发送?}
DEDUP1["DedupCache<br/>go-cache<br/>TTL: 30min"]
end
subgraph Layer2["🔐 第二层: seenTids"]
CHECK2{TID<br/>已处理?}
DEDUP2["concurrent.Map<br/>单订单内去重"]
end
subgraph Layer3["💾 第三层: Buffer"]
CHECK3{相同键<br/>存在?}
DEDUP3["Buffer Store<br/>覆盖旧值"]
end
subgraph Output["✅ 结果"]
PROCESS[正常处理]
SKIP[⏭️ 跳过]
FLUSH[🔄 刷新]
end
FILL --> CHECK1
UPDATE --> CHECK1
CHECK1 -->|"✓ 是"| SKIP
CHECK1 -->|"✗ 否"| CHECK2
CHECK2 -->|"✓ 是"| SKIP
CHECK2 -->|"✗ 否"| CHECK3
CHECK3 -->|"✓ 是"| FLUSH
CHECK3 -->|"✗ 否"| PROCESS
CHECK1 -.-> DEDUP1
CHECK2 -.-> DEDUP2
CHECK3 -.-> DEDUP3
classDef inputStyle fill:#0d1117,stroke:#58a6ff,stroke-width:2px,color:#c9d1d9
classDef layerStyle fill:#161b22,stroke:#f0883e,stroke-width:2px,color:#c9d1d9
classDef outputStyle fill:#161b22,stroke:#3fb950,stroke-width:2px,color:#c9d1d9
class FILL,UPDATE inputStyle
class DEDUP1,DEDUP2,DEDUP3 layerStyle
class PROCESS,SKIP,FLUSH outputStyle
| 组件 | 文件 | 职责 | 关键特性 |
|---|---|---|---|
| PoolManager | ws/pool_manager.go |
WebSocket 连接池管理 | • 多连接负载均衡 (5-10 个连接) • 每连接最多 100 个订阅 • 自动选择负载最少的连接 |
| ConnectionWrapper | ws/connection_wrapper.go |
单连接封装与重连 | • 指数退避重连 (1s → 30s) • 最多重试 10 次 • 错误回调机制 |
| OrderAggregator | ws/subscription.go |
订单聚合与触发 | • 双触发机制 (状态 + 超时) • 反手订单拆分 • 聚合多次 fill |
| 组件 | 文件 | 职责 | 关键特性 |
|---|---|---|---|
| OrderProcessor | processor/order_processor.go |
订单处理核心逻辑 | • PendingOrderCache (O(1) 查询) • TID 去重机制 • CloseRate 计算 • 协程池 (30 workers) |
| OrderStatusTracker | processor/status_tracker.go |
消息乱序处理 | • go-cache 实现 • TTL: 10 分钟 • Key 格式: address-oid |
| MessageQueue | processor/message_queue.go |
异步消息队列 | • 缓冲队列 (1000) • 4 个 worker 并发 • 背压保护 (队列满时降级) |
| BatchWriter | processor/batch_writer.go |
批量数据库写入 | • 批量大小: 100 条 • 刷新间隔: 2 秒 • 缓冲区去重 (覆盖旧值) |
| 组件 | 文件 | 职责 | 关键特性 |
|---|---|---|---|
| DedupCache | cache/dedup_cache.go |
订单去重 | • go-cache 实现 • 范围: address-oid-direction • TTL: 30 分钟 |
| SymbolCache | cache/symbol_cache.go |
Symbol 双向转换 | • concurrent.Map 实现 • coin ↔ symbol 映射 • 持久化存储 |
| PriceCache | cache/price_cache.go |
价格数据缓存 | • concurrent.Map 实现 • LRU 淘汰策略 • 现货/合约价格 |
| PositionBalanceCache | cache/position_cache.go |
仓位余额缓存 | • concurrent.Map 实现 • 实时更新 • CloseRate 计算支持 |
| 组件 | 文件 | 职责 | 关键特性 |
|---|---|---|---|
| Symbol Manager | symbol/manager.go |
Symbol 元数据管理 | • 定期从 API 加载 (2 小时) • 统一管理 Symbol 和价格缓存 • 自动刷新机制 |
| Position Manager | position/manager.go |
仓位数据管理 | • 订阅仓位变化 • 更新持仓缓存 • 触发信号计算 |
| 组件 | 文件 | 职责 | 关键特性 |
|---|---|---|---|
| Data Cleaner | cleaner/cleaner.go |
定期清理历史数据 | • 聚合数据: 保留 2 小时 • 信号数据: 保留 7 天 • DAO 层批量删除 (1000 条/次) |
| Health Server | monitor/health.go |
健康检查与指标 | • HTTP 端点监控 • Prometheus 指标暴露 • 服务状态报告 |
- 语言:Go 1.23
- 数据库:MySQL 8.0 + GORM + gorm-gen
- 消息队列:NATS
- WebSocket:自定义实现(基于 gorilla/websocket)
- 缓存:go-cache, concurrent.Map
- 并发:sync.Map, ants 协程池
- 监控:Prometheus
- 日志:zerolog
- Go 1.23+
- MySQL 8.0+
- NATS Server
make docker-up这会启动:
- MySQL (端口 3306)
- NATS (端口 4222)
mysql -h 127.0.0.1 -u root -p < init.sql编辑 cfg.local.toml:
[hl_monitor]
hyperliquid_ws_url = "wss://api.hyperliquid.xyz/ws"
health_server_addr = "0.0.0.0:8080"
address_reload_interval = "5m"
max_connections = 5
[mysql]
dsn = "root:password@tcp(localhost:3306)/utrading?charset=utf8mb4&parseTime=True&loc=Local"
[nats]
endpoint = "nats://localhost:4222"INSERT INTO hl_watch_addresses (address, label, is_active)
VALUES ('0x1234...', 'Trader A', 1);# 前台运行(调试)
make run
# 后台启动
make start
# 查看日志
make logsmake stop监控地址配置表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | uint | 主键 |
| address | string | 链上地址 |
| label | string | 自定义标签 |
| is_active | bool | 是否激活 |
仓位缓存表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | uint | 主键 |
| address | string | 链上地址 |
| spot_balances | json | 现货余额 JSON |
| spot_total_usd | string | 现货总价值 |
| futures_positions | json | 合约仓位 JSON |
| account_value | string | 账户总价值 |
| updated_at | datetime | 更新时间 |
订单聚合表
| 字段 | 类型 | 说明 |
|---|---|---|
| oid | bigint | 订单 ID(主键) |
| address | string | 监控地址 |
| symbol | string | 交易对 |
| total_size | decimal | 总数量 |
| weighted_avg_px | decimal | 加权平均价 |
| order_status | varchar | 订单状态 |
| last_fill_time | bigint | 最后 fill 时间 |
| signal_sent | boolean | 信号是否已发送 |
地址信号表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | bigint | 主键 |
| address | varchar | 监控地址 |
| symbol | varchar | 交易对 |
| asset_type | varchar | spot/futures |
| direction | varchar | open/close |
| side | varchar | LONG/SHORT |
| position_size | varchar | Small/Medium/Large |
| size | decimal | 数量 |
| price | decimal | 价格 |
| close_rate | decimal | 平仓比例 |
| created_at | timestamp | 创建时间 |
type HlAddressSignal struct {
Address string // 监控地址
AssetType string // spot/futures
Symbol string // 交易对
Direction string // open/close
Side string // LONG/SHORT
PositionSize string // Small/Medium/Large
Size float64 // 数量
Price float64 // 加权平均价
CloseRate float64 // 平仓比例
Timestamp int64 // 时间戳
}utrading-hl-monitor/
├── cmd/hl_monitor/ # 主程序入口
├── internal/ # 内部包(领域驱动设计)
│ ├── address/ # 地址加载器
│ ├── cache/ # 缓存层
│ │ ├── dedup_cache.go # 订单去重
│ │ ├── symbol_cache.go # Symbol 转换
│ │ ├── price_cache.go # 价格缓存
│ │ └── position_cache.go # 仓位余额
│ ├── cleaner/ # 数据清理器
│ ├── dal/ # 数据库连接
│ ├── dao/ # 数据访问对象层
│ ├── manager/ # Symbol Manager, PoolManager
│ ├── models/ # 数据模型
│ ├── monitor/ # 健康检查
│ ├── nats/ # NATS 发布
│ ├── position/ # 仓位管理
│ ├── processor/ # 消息处理层
│ │ ├── message_queue.go
│ │ ├── batch_writer.go
│ │ ├── order_processor.go
│ │ └── status_tracker.go
│ └── ws/ # WebSocket 连接
├── pkg/ # 公共包
│ ├── concurrent/ # 线程安全容器
│ ├── go-hyperliquid/ # Hyperliquid SDK
│ ├── goplus/ # GoPlus API
│ ├── logger/ # 日志包
│ └── sigproc/ # 信号处理
├── docs/plans/ # 设计文档
├── cfg.toml # 生产配置
├── cfg.local.toml # 本地配置
├── init.sql # 数据库初始化
├── Dockerfile # Docker 镜像
├── docker-compose.yml # 服务编排
└── Makefile # 构建命令
# 构建
make build # 编译二进制文件
make clean # 清理构建产物
# 运行
make run # 前台运行(调试)
make start # 后台启动
make stop # 停止服务
make restart # 重启服务
make logs # 查看日志
# 测试
make test # 运行测试
make deps # 下载依赖
# Docker
make docker-up # 启动 MySQL 和 NATS
make docker-down # 停止 Docker 服务
make docker-logs # 查看 Docker 日志
make docker-ps # 查看服务状态重要:所有数据库操作必须通过 DAO 层,禁止在业务逻辑中直接使用 dal.MySQL()。
// ✅ 正确:通过 DAO 访问
dao.Position().UpsertPositionCache(cache)
dao.WatchAddress().ListDistinctAddresses()
// ❌ 错误:直接使用 dal
dal.MySQL().Where(...).First(...)- 在
internal/dao/对应的 DAO 文件中添加方法 - 使用 gorm-gen 提供的类型安全查询 API(
gen.Q.*) - 复杂查询可使用
UnderlyingDB()获取底层 GORM 连接
cd cmd/gen
go run main.go[hl_monitor]
hyperliquid_ws_url = "wss://api.hyperliquid.xyz/ws"
health_server_addr = "0.0.0.0:8080"
address_reload_interval = "5m"
max_connections = 5
max_subscriptions_per_connection = 100
[mysql]
dsn = "root:password@tcp(localhost:3306)/utrading?charset=utf8mb4&parseTime=True&loc=Local"
max_idle_connections = 16
max_open_connections = 64
set_conn_max_lifetime = 7200
[nats]
endpoint = "nats://localhost:4222"
[log]
level = "info"
max_size = 50
max_backups = 60
max_age = 15
compress = false
console = false
[order_aggregation]
timeout = "5m"
scan_interval = "30s"
max_retry = 3
retry_delay = "1s"
| 端点 | 说明 |
|---|---|
GET /health |
健康检查 |
GET /health/ready |
就绪检查 |
GET /health/live |
存活检查 |
GET /status |
服务状态 |
GET /metrics |
Prometheus 指标 |
hl_monitor_cache_hit_total{cache_type}- 缓存命中总数(dedup/symbol/price)hl_monitor_cache_miss_total{cache_type}- 缓存未命中总数
hl_monitor_message_queue_size- 消息队列当前大小hl_monitor_message_queue_full_total- 消息队列满事件总数
hl_monitor_batch_write_size- 批量写入大小分布hl_monitor_batch_write_duration_seconds- 批量写入耗时分布
hl_monitor_order_aggregation_active- 当前聚合中的订单数量hl_monitor_order_flush_total{trigger}- 订单发送总数(按触发原因)hl_monitor_order_fills_per_order- 每个 order 的 fill 数量分布
hl_monitor_pool_manager_connection_count- WebSocket 连接池当前连接数
日志文件位置:logs/output.log
日志使用 zerolog,通过 pkg/logger 包统一管理。
# 构建镜像
docker build -t utrading-hl-monitor .
# 运行
docker-compose up -d
# 查看日志
docker-compose logs -f
# 停止
docker-compose down| 日期 | 优化内容 | 设计文档 |
|---|---|---|
| 01-15 | 订单聚合器双触发机制 | order-aggregation-design |
| 01-16 | 反手订单处理优化 | reversed-order-handling-design |
| 01-19 | 仓位比例计算功能 | position-rate-calculation-design |
| 01-20 | Symbol Manager 实现 | symbol-manager-design |
| 01-20 | BatchWriter 去重优化 | batchwriter-dedup-plan |
| 01-21 | OrderProcessor 协程池优化 | orderprocessor-pool-design |
| 01-21 | OrderStatusTracker 状态追踪 | order-status-tracker-design |
| 01-21 | PositionBalanceCache 扩展 | position-cache-extension-design |
更多设计文档请查看 docs/plans/
MIT