跳轉到主要內容
ChainStream 透過 Kafka Streams 提供多鏈實時鏈上資料流。相較於 GraphQL Subscriptions 和 WebSocket,Kafka Streams 面向對延遲敏感、可靠性要求高的服務端應用場景,提供更低延遲、更強容錯的資料消費能力。

Protobuf Schema 倉庫

ChainStream 官方 Protobuf Schema 定義,支援 Go 和 Python,包含 EVM、Solana、TRON 所有訊息型別。

支援矩陣

dex.tradestokensbalancesdex.poolstransferscandlesticks
Ethereum (eth)
BSC (bsc)
Solana (sol)
TRON (tron)
所有鏈還支援 token-suppliestoken-pricestoken-holdingstoken-market-capstrade-stats 等 Topics。詳見完整 Topic 列表。

Kafka Streams vs WebSocket 選型指南

何時選擇 Kafka Streams

延遲敏感

延遲是首要考量,應用部署在雲端或專用伺服器

訊息可靠

不可接受丟失任何訊息,需要持久可靠的資料消費

複雜處理

需要對資料做複雜計算、過濾或格式化,超出預處理能力範圍

水平擴充套件

需要多例項水平擴充套件消費能力

何時選擇 WebSocket

快速原型

正在構建原型,開發速度是首要因素

統一介面

應用同時需要歷史資料和實時資料,需要統一查詢與訂閱介面

瀏覽器端

應用直接在瀏覽器端消費資料(Kafka Streams 僅支援服務端)

動態過濾

需要根據頁面內容動態過濾資料

對比總結

特性Kafka StreamsWebSocket
延遲最低
可靠性持久化,不丟訊息斷連可能丟失
擴充套件性原生水平擴充套件需額外設計
資料過濾消費端處理服務端預過濾
客戶端支援僅服務端服務端 + 瀏覽器
接入複雜度較高較低

接入憑證獲取

Kafka Streams 使用獨立的認證憑證,需要聯絡 ChainStream 團隊申請開通。
1

聯絡申請

傳送郵件至 support@chainstream.io 申請 Kafka Streams 接入許可權
2

獲取憑證

稽核透過後,您將收到以下憑證資訊:
  • Username
  • Password
  • Broker 地址列表
3

配置連線

使用獲取的憑證配置 Kafka 客戶端連線

連線配置

Broker 地址

Broker 地址將在您的申請稽核透過後,隨憑證資訊一同提供。請勿使用任何未經授權的地址進行連線。

SASL_SSL 連線配置

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='your_group_id'
)

Topic 命名規範與完整列表

命名規範

Topic 命名遵循以下 pattern:
{chain}.{message_type}              # 原始事件数据
{chain}.{message_type}.processed    # 处理后的数据(含价格、标记等增强信息)
{chain}.{message_type}.created      # 创建事件(如代币创建)
其中 {chain} 包括:solbscethtron

訊息型別說明

型別說明
dex.tradesDEX 交易事件
dex.pools流動性池事件
tokensToken 事件
balances餘額變動事件
transfers轉賬事件
token-supplies代幣供應量事件
token-prices代幣價格事件
token-holdings代幣持倉資料
token-market-caps代幣市值事件
candlesticksK線資料
trade-stats交易統計資料

完整 Topic 列表

以下 Topics 適用於所有支援的鏈(將 {chain} 替換為 solbsceth):
# DEX 交易
{chain}.dex.trades
{chain}.dex.trades.processed    # 包含 USD/原生币价格、可疑标记

# 代币事件
{chain}.tokens
{chain}.tokens.created          # 代币创建事件
{chain}.tokens.processed        # 包含描述、图片、社交链接

# 余额变动
{chain}.balances
{chain}.balances.processed      # 包含 USD/原生币价值

# 流动性池
{chain}.dex.pools
{chain}.dex.pools.processed     # 包含流动性 USD/原生币价值

# 代币数据
{chain}.token-supplies
{chain}.token-supplies.processed
{chain}.token-prices
{chain}.token-holdings
{chain}.token-market-caps.processed

# 聚合数据
{chain}.candlesticks            # OHLCV K线数据
{chain}.trade-stats             # 交易统计
完整的 Protobuf Schema 和 Topic 對映請參考 streaming_protobuf 倉庫

消費模式與 Offset 管理

訂閱 topic 時需要關注兩個核心配置:

Offset 策略選擇

消費者在連線 Kafka 後,需要決定從哪個位置開始讀取訊息。兩種常見策略:
每次連線從當前最新位置開始,適合只關心實時資料的場景。重連後不會回溯歷史訊息。
{
  autoCommit: false,
  fromBeginning: false,
  'auto.offset.reset': 'latest'
}

Group ID 規則

多例項部署同一 Group ID 可實現故障轉移和負載均衡——同一 topic 的訊息只會被 Group 中的一個例項消費,Kafka 自動在例項間分配分割槽。
通常建議每個 topic 對應一個獨立 consumer,因為不同 topic 的訊息解析邏輯不同。

Quick Start:5 分鐘跑通第一個 Consumer

以下示例展示如何消費 eth.dex.trades topic 並解析 DEX 交易資料。
1

獲取 Protobuf Schema

從官方倉庫克隆 Schema 定義:
git clone https://github.com/chainstream-io/streaming_protobuf.git
或作為 Git submodule 新增到專案:
git submodule add https://github.com/chainstream-io/streaming_protobuf.git
2

安裝依賴

pip install kafka-python protobuf
3

配置連線並消費

from kafka import KafkaConsumer
from common import trade_event_pb2  # 从 streaming_protobuf 仓库获取

# 创建 consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# 消费消息
for message in consumer:
    # 解析 protobuf 消息
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # 打印 DEX 交易信息
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}")
        print("---")

核心資料結構

所有訊息型別共享以下基礎結構(定義於 common/common.proto):

基礎結構

區塊資訊:
欄位型別說明
timestampint64區塊時間戳
hashstring區塊雜湊
heightuint64區塊高度
slotuint64Slot 號(Solana)

主要訊息型別

Topic: {chain}.dex.trades
message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // processed topic 包含
}
Trade 核心欄位
欄位說明
token_a_address / token_b_address交易對代幣地址
user_a_amount / user_b_amount使用者交易數量
pool_address池子地址
vault_a / vault_b池子 Vault 地址
vault_a_amount / vault_b_amountVault 數量
TradeProcessed 增強欄位(processed topic):
欄位說明
token_a_price_in_usd / token_b_price_in_usdUSD 價格
token_a_price_in_native / token_b_price_in_native原生幣價格
is_token_a_price_in_usd_suspect價格是否可疑
is_token_a_price_in_usd_suspect_reason可疑原因
Topic: {chain}.tokens, {chain}.tokens.created
message TokenEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  EventType type = 100;        // CREATED, UPDATED
  Token token = 101;
  TokenProcessed token_processed = 200;
}
Token 核心欄位
欄位說明
address代幣地址
name / symbol名稱和符號
decimals精度
uri後設資料 URI
metadata_address後設資料地址
creators建立者列表
solana_extraSolana 特有欄位
evm_extraEVM 特有欄位(token_standard)
Topic: {chain}.balances
message BalanceEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Balance balance = 100;
  BalanceProcessed balance_processed = 200;
}
Balance 核心欄位
欄位說明
token_account_addressToken 賬戶地址
account_owner_address賬戶所有者地址
token_address代幣地址
pre_amount / post_amount變動前後餘額
decimals精度
lifecycle賬戶生命週期(NEW/EXISTING/CLOSED)
Topic: {chain}.dex.pools
message DexPoolEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  DexPoolEventType type = 100;  // INITIALIZE, INCREASE_LIQUIDITY, DECREASE_LIQUIDITY, SWAP
  DexPool pool = 101;
  DexPoolProcessed pool_processed = 200;
}
DexPool 核心欄位
欄位說明
address池子地址
token_a_address / token_b_address代幣地址
token_a_vault_address / token_b_vault_addressVault 地址
token_a_amount / token_b_amount代幣數量
lp_walletLP 錢包地址
Topic: {chain}.candlesticks
欄位說明
token_address代幣地址
resolution時間週期(1m, 5m, 15m, 1h 等)
timestamp時間戳
open / high / low / closeOHLC 價格(USD)
open_in_native / high_in_native / low_in_native / close_in_nativeOHLC 價格(原生幣)
volume / volume_in_usd / volume_in_native成交量
trades交易筆數
dimension維度型別(TOKEN_ADDRESS/POOL_ADDRESS/PAIR)
Topic: {chain}.trade-stats
欄位說明
token_address代幣地址
resolution時間週期
buys / sells買入/賣出筆數
buyers / sellers買家/賣家數
buy_volume / sell_volume買入/賣出量
buy_volume_in_usd / sell_volume_in_usdUSD 成交量
high_in_usd / low_in_usd最高/最低價
Topic: {chain}.token-holdings
欄位組說明
top10_holders / top10_amount / top10_ratioTop 10 持有者統計
top50_holders / top50_amount / top50_ratioTop 50 持有者統計
top100_holders / top100_amount / top100_ratioTop 100 持有者統計
holders總持有者數
creators_count / creators_amount / creators_ratio建立者持倉統計
fresh_count / fresh_amount / fresh_ratio新地址持倉統計
smart_count / smart_amount / smart_ratioSmart Money 持倉統計
sniper_count / sniper_amount / sniper_ratioSniper 持倉統計
insider_count / insider_amount / insider_ratio內部人持倉統計
Topic: {chain}.token-prices
欄位說明
token_address代幣地址
price_in_usdUSD 價格
price_in_native原生幣價格
Topic: {chain}.token-supplies
欄位說明
type事件型別(INITIALIZE_MINT/MINT/BURN)
token_address代幣地址
amount數量
decimals精度
amount_with_decimals帶精度的數量
完整的 Protobuf 定義請參考 streaming_protobuf 倉庫

訊息特性與注意事項

開發者在消費 Kafka Streams 時需要注意以下訊息特性:
Stream 不做預過濾,包含 topic 內的所有訊息和完整資料。這意味著消費端需要有足夠的網路吞吐、伺服器效能和高效的解析程式碼。
同一個代幣或同一個賬號的訊息嚴格按 block 順序到達。這意味著針對特定代幣或錢包地址的事件流是有序的,方便追蹤狀態變化。但不同代幣/賬號之間的訊息到達順序不做保證。
同一條訊息可能被投遞多次。如果重複處理會造成問題,消費端需要維護快取或儲存來實現冪等處理。
ChainStream 保證每條訊息的完整性,訊息不會被拆分。無論區塊包含多少交易,您收到的訊息都是完整的資料單元。
訊息使用 Protobuf 編碼,比 JSON 更緊湊。消費端需要使用對應語言的 Protobuf 庫進行解析。

延遲模型

Kafka Streams 的延遲取決於資料在管道中經過的處理環節。同一條鏈的不同 topic 延遲不同:

Broadcasted vs Committed

型別說明延遲資料確定性
Broadcasted交易在廣播階段即可消費,無需等待區塊確認最低較低
Committed交易經過區塊確認後才進入 stream較高最高

處理管道延遲

資料從區塊鏈節點到 Kafka topic 的每一層轉換(解析、結構化、enrichment)都會引入約 100-1000ms 的延遲:
  • raw topic:延遲最低,接近原始節點資料
  • transactions topic:經過解析和結構化
  • dextrades topic:延遲相對更高,但資料更豐富
如果延遲是首要考量,優先選擇離原始資料最近、你能有效解析的 topic。

最佳實踐

分割槽並行消費

Kafka topic 被劃分為多個分割槽(partition),每個分割槽需要並行讀取以最大化吞吐量。 訊息的分割槽鍵設定為 代幣地址錢包地址(所有鏈統一),這確保:
  • 同一代幣的所有事件路由到同一分割槽,保證順序性
  • 同一錢包的所有餘額變動路由到同一分割槽,方便狀態追蹤
建議為每個分割槽分配一個獨立執行緒,確保負載均衡。

持續消費,不阻塞主迴圈

Consumer 的讀取迴圈應保持持續執行,避免因訊息處理阻塞而導致積壓。如果需要對訊息做處理,應採用非同步處理模式:主迴圈負責讀取,處理邏輯委託給 worker 執行緒。

訊息處理效率

批次處理可以降低開銷,但需要在批次大小和延遲之間權衡。在 Go 中可以使用 channel + worker group 實現併發處理。

鏈特定文件

EVM Streams

Ethereum、BSC、Base、Polygon、Optimism

Solana Streams

Solana 高吞吐資料流

TRON Streams

TRON 網路資料流

相關文件

實時資料流

WebSocket 實時資料接入指南

認證指南

獲取 Access Token