Protobuf Schema 倉庫
ChainStream 官方 Protobuf Schema 定義,支援 Go 和 Python,包含 EVM、Solana、TRON 所有訊息型別。
支援矩陣
| 鏈 | dex.trades | tokens | balances | dex.pools | transfers | candlesticks |
|---|---|---|---|---|---|---|
| Ethereum (eth) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| BSC (bsc) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Solana (sol) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| TRON (tron) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
所有鏈還支援
token-supplies、token-prices、token-holdings、token-market-caps、trade-stats 等 Topics。詳見完整 Topic 列表。Kafka Streams vs WebSocket 選型指南
何時選擇 Kafka Streams
延遲敏感
延遲是首要考量,應用部署在雲端或專用伺服器
訊息可靠
不可接受丟失任何訊息,需要持久可靠的資料消費
複雜處理
需要對資料做複雜計算、過濾或格式化,超出預處理能力範圍
水平擴充套件
需要多例項水平擴充套件消費能力
何時選擇 WebSocket
快速原型
正在構建原型,開發速度是首要因素
統一介面
應用同時需要歷史資料和實時資料,需要統一查詢與訂閱介面
瀏覽器端
應用直接在瀏覽器端消費資料(Kafka Streams 僅支援服務端)
動態過濾
需要根據頁面內容動態過濾資料
對比總結
| 特性 | Kafka Streams | WebSocket |
|---|---|---|
| 延遲 | 最低 | 低 |
| 可靠性 | 持久化,不丟訊息 | 斷連可能丟失 |
| 擴充套件性 | 原生水平擴充套件 | 需額外設計 |
| 資料過濾 | 消費端處理 | 服務端預過濾 |
| 客戶端支援 | 僅服務端 | 服務端 + 瀏覽器 |
| 接入複雜度 | 較高 | 較低 |
接入憑證獲取
Kafka Streams 使用獨立的認證憑證,需要聯絡 ChainStream 團隊申請開通。聯絡申請
傳送郵件至 support@chainstream.io 申請 Kafka Streams 接入許可權
連線配置
Broker 地址
Broker 地址將在您的申請稽核透過後,隨憑證資訊一同提供。請勿使用任何未經授權的地址進行連線。
SASL_SSL 連線配置
- Python
- JavaScript
- Go
Topic 命名規範與完整列表
命名規範
Topic 命名遵循以下 pattern:{chain} 包括:sol、bsc、eth、tron
訊息型別說明
| 型別 | 說明 |
|---|---|
dex.trades | DEX 交易事件 |
dex.pools | 流動性池事件 |
tokens | Token 事件 |
balances | 餘額變動事件 |
transfers | 轉賬事件 |
token-supplies | 代幣供應量事件 |
token-prices | 代幣價格事件 |
token-holdings | 代幣持倉資料 |
token-market-caps | 代幣市值事件 |
candlesticks | K線資料 |
trade-stats | 交易統計資料 |
完整 Topic 列表
- 跨鏈通用 Topics
- Solana 專用
- EVM 專用
- TRON 專用
以下 Topics 適用於所有支援的鏈(將
{chain} 替換為 sol、bsc、eth):消費模式與 Offset 管理
訂閱 topic 時需要關注兩個核心配置:Offset 策略選擇
消費者在連線 Kafka 後,需要決定從哪個位置開始讀取訊息。兩種常見策略:- 僅消費最新訊息
- 持久消費不丟訊息
每次連線從當前最新位置開始,適合只關心實時資料的場景。重連後不會回溯歷史訊息。
Group ID 規則
多例項部署同一 Group ID 可實現故障轉移和負載均衡——同一 topic 的訊息只會被 Group 中的一個例項消費,Kafka 自動在例項間分配分割槽。Quick Start:5 分鐘跑通第一個 Consumer
以下示例展示如何消費eth.dex.trades topic 並解析 DEX 交易資料。
核心資料結構
所有訊息型別共享以下基礎結構(定義於common/common.proto):
基礎結構
- Block
- Transaction
- Instruction
- DApp
區塊資訊:
| 欄位 | 型別 | 說明 |
|---|---|---|
timestamp | int64 | 區塊時間戳 |
hash | string | 區塊雜湊 |
height | uint64 | 區塊高度 |
slot | uint64 | Slot 號(Solana) |
主要訊息型別
TradeEvent - DEX 交易事件
TradeEvent - DEX 交易事件
Topic: Trade 核心欄位:
TradeProcessed 增強欄位(processed topic):
{chain}.dex.trades| 欄位 | 說明 |
|---|---|
token_a_address / token_b_address | 交易對代幣地址 |
user_a_amount / user_b_amount | 使用者交易數量 |
pool_address | 池子地址 |
vault_a / vault_b | 池子 Vault 地址 |
vault_a_amount / vault_b_amount | Vault 數量 |
| 欄位 | 說明 |
|---|---|
token_a_price_in_usd / token_b_price_in_usd | USD 價格 |
token_a_price_in_native / token_b_price_in_native | 原生幣價格 |
is_token_a_price_in_usd_suspect | 價格是否可疑 |
is_token_a_price_in_usd_suspect_reason | 可疑原因 |
TokenEvent - 代幣事件
TokenEvent - 代幣事件
Topic: Token 核心欄位:
{chain}.tokens, {chain}.tokens.created| 欄位 | 說明 |
|---|---|
address | 代幣地址 |
name / symbol | 名稱和符號 |
decimals | 精度 |
uri | 後設資料 URI |
metadata_address | 後設資料地址 |
creators | 建立者列表 |
solana_extra | Solana 特有欄位 |
evm_extra | EVM 特有欄位(token_standard) |
BalanceEvent - 餘額變動事件
BalanceEvent - 餘額變動事件
Topic: Balance 核心欄位:
{chain}.balances| 欄位 | 說明 |
|---|---|
token_account_address | Token 賬戶地址 |
account_owner_address | 賬戶所有者地址 |
token_address | 代幣地址 |
pre_amount / post_amount | 變動前後餘額 |
decimals | 精度 |
lifecycle | 賬戶生命週期(NEW/EXISTING/CLOSED) |
DexPoolEvent - 流動性池事件
DexPoolEvent - 流動性池事件
Topic: DexPool 核心欄位:
{chain}.dex.pools| 欄位 | 說明 |
|---|---|
address | 池子地址 |
token_a_address / token_b_address | 代幣地址 |
token_a_vault_address / token_b_vault_address | Vault 地址 |
token_a_amount / token_b_amount | 代幣數量 |
lp_wallet | LP 錢包地址 |
CandlestickEvent - K線資料
CandlestickEvent - K線資料
Topic:
{chain}.candlesticks| 欄位 | 說明 |
|---|---|
token_address | 代幣地址 |
resolution | 時間週期(1m, 5m, 15m, 1h 等) |
timestamp | 時間戳 |
open / high / low / close | OHLC 價格(USD) |
open_in_native / high_in_native / low_in_native / close_in_native | OHLC 價格(原生幣) |
volume / volume_in_usd / volume_in_native | 成交量 |
trades | 交易筆數 |
dimension | 維度型別(TOKEN_ADDRESS/POOL_ADDRESS/PAIR) |
TradeStatEvent - 交易統計
TradeStatEvent - 交易統計
Topic:
{chain}.trade-stats| 欄位 | 說明 |
|---|---|
token_address | 代幣地址 |
resolution | 時間週期 |
buys / sells | 買入/賣出筆數 |
buyers / sellers | 買家/賣家數 |
buy_volume / sell_volume | 買入/賣出量 |
buy_volume_in_usd / sell_volume_in_usd | USD 成交量 |
high_in_usd / low_in_usd | 最高/最低價 |
TokenHoldingEvent - 持倉統計
TokenHoldingEvent - 持倉統計
Topic:
{chain}.token-holdings| 欄位組 | 說明 |
|---|---|
top10_holders / top10_amount / top10_ratio | Top 10 持有者統計 |
top50_holders / top50_amount / top50_ratio | Top 50 持有者統計 |
top100_holders / top100_amount / top100_ratio | Top 100 持有者統計 |
holders | 總持有者數 |
creators_count / creators_amount / creators_ratio | 建立者持倉統計 |
fresh_count / fresh_amount / fresh_ratio | 新地址持倉統計 |
smart_count / smart_amount / smart_ratio | Smart Money 持倉統計 |
sniper_count / sniper_amount / sniper_ratio | Sniper 持倉統計 |
insider_count / insider_amount / insider_ratio | 內部人持倉統計 |
TokenPriceEvent - 價格事件
TokenPriceEvent - 價格事件
Topic:
{chain}.token-prices| 欄位 | 說明 |
|---|---|
token_address | 代幣地址 |
price_in_usd | USD 價格 |
price_in_native | 原生幣價格 |
TokenSupplyEvent - 供應量事件
TokenSupplyEvent - 供應量事件
Topic:
{chain}.token-supplies| 欄位 | 說明 |
|---|---|
type | 事件型別(INITIALIZE_MINT/MINT/BURN) |
token_address | 代幣地址 |
amount | 數量 |
decimals | 精度 |
amount_with_decimals | 帶精度的數量 |
訊息特性與注意事項
開發者在消費 Kafka Streams 時需要注意以下訊息特性:無過濾的完整資料流
無過濾的完整資料流
Stream 不做預過濾,包含 topic 內的所有訊息和完整資料。這意味著消費端需要有足夠的網路吞吐、伺服器效能和高效的解析程式碼。
同一實體訊息有序
同一實體訊息有序
同一個代幣或同一個賬號的訊息嚴格按 block 順序到達。這意味著針對特定代幣或錢包地址的事件流是有序的,方便追蹤狀態變化。但不同代幣/賬號之間的訊息到達順序不做保證。
訊息可能重複
訊息可能重複
同一條訊息可能被投遞多次。如果重複處理會造成問題,消費端需要維護快取或儲存來實現冪等處理。
訊息完整性保證
訊息完整性保證
ChainStream 保證每條訊息的完整性,訊息不會被拆分。無論區塊包含多少交易,您收到的訊息都是完整的資料單元。
Protobuf 二進位制格式
Protobuf 二進位制格式
訊息使用 Protobuf 編碼,比 JSON 更緊湊。消費端需要使用對應語言的 Protobuf 庫進行解析。
延遲模型
Kafka Streams 的延遲取決於資料在管道中經過的處理環節。同一條鏈的不同 topic 延遲不同:Broadcasted vs Committed
| 型別 | 說明 | 延遲 | 資料確定性 |
|---|---|---|---|
| Broadcasted | 交易在廣播階段即可消費,無需等待區塊確認 | 最低 | 較低 |
| Committed | 交易經過區塊確認後才進入 stream | 較高 | 最高 |
處理管道延遲
資料從區塊鏈節點到 Kafka topic 的每一層轉換(解析、結構化、enrichment)都會引入約 100-1000ms 的延遲:- raw topic:延遲最低,接近原始節點資料
- transactions topic:經過解析和結構化
- dextrades topic:延遲相對更高,但資料更豐富
最佳實踐
分割槽並行消費
Kafka topic 被劃分為多個分割槽(partition),每個分割槽需要並行讀取以最大化吞吐量。 訊息的分割槽鍵設定為 代幣地址 或 錢包地址(所有鏈統一),這確保:- 同一代幣的所有事件路由到同一分割槽,保證順序性
- 同一錢包的所有餘額變動路由到同一分割槽,方便狀態追蹤
持續消費,不阻塞主迴圈
Consumer 的讀取迴圈應保持持續執行,避免因訊息處理阻塞而導致積壓。如果需要對訊息做處理,應採用非同步處理模式:主迴圈負責讀取,處理邏輯委託給 worker 執行緒。訊息處理效率
批次處理可以降低開銷,但需要在批次大小和延遲之間權衡。在 Go 中可以使用 channel + worker group 實現併發處理。鏈特定文件
EVM Streams
Ethereum、BSC、Base、Polygon、Optimism
Solana Streams
Solana 高吞吐資料流
TRON Streams
TRON 網路資料流
相關文件
實時資料流
WebSocket 實時資料接入指南
認證指南
獲取 Access Token

