跳转到主要内容
ChainStream 通过 Kafka Streams 提供多链实时链上数据流。相较于 GraphQL Subscriptions 和 WebSocket,Kafka Streams 面向对延迟敏感、可靠性要求高的服务端应用场景,提供更低延迟、更强容错的数据消费能力。

Protobuf Schema 仓库

ChainStream 官方 Protobuf Schema 定义,支持 Go 和 Python,包含 EVM、Solana、TRON 所有消息类型。

支持矩阵

dex.tradestokensbalancesdex.poolstransferscandlesticks
Ethereum (eth)
BSC (bsc)
Solana (sol)
TRON (tron)
所有链还支持 token-suppliestoken-pricestoken-holdingstoken-market-capstrade-stats 等 Topics。详见完整 Topic 列表。

Kafka Streams vs WebSocket 选型指南

何时选择 Kafka Streams

延迟敏感

延迟是首要考量,应用部署在云端或专用服务器

消息可靠

不可接受丢失任何消息,需要持久可靠的数据消费

复杂处理

需要对数据做复杂计算、过滤或格式化,超出预处理能力范围

水平扩展

需要多实例水平扩展消费能力

何时选择 WebSocket

快速原型

正在构建原型,开发速度是首要因素

统一接口

应用同时需要历史数据和实时数据,需要统一查询与订阅接口

浏览器端

应用直接在浏览器端消费数据(Kafka Streams 仅支持服务端)

动态过滤

需要根据页面内容动态过滤数据

对比总结

特性Kafka StreamsWebSocket
延迟最低
可靠性持久化,不丢消息断连可能丢失
扩展性原生水平扩展需额外设计
数据过滤消费端处理服务端预过滤
客户端支持仅服务端服务端 + 浏览器
接入复杂度较高较低

接入凭证获取

Kafka Streams 使用独立的认证凭证,需要联系 ChainStream 团队申请开通。
1

联系申请

发送邮件至 support@chainstream.io 申请 Kafka Streams 接入权限
2

获取凭证

审核通过后,您将收到以下凭证信息:
  • Username
  • Password
  • Broker 地址列表
3

配置连接

使用获取的凭证配置 Kafka 客户端连接

连接配置

Broker 地址

Broker 地址将在您的申请审核通过后,随凭证信息一同提供。请勿使用任何未经授权的地址进行连接。

SASL_SSL 连接配置

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='your_group_id'
)

Topic 命名规范与完整列表

命名规范

Topic 命名遵循以下 pattern:
{chain}.{message_type}              # 原始事件数据
{chain}.{message_type}.processed    # 处理后的数据(含价格、标记等增强信息)
{chain}.{message_type}.created      # 创建事件(如代币创建)
其中 {chain} 包括:solbscethtron

消息类型说明

类型说明
dex.tradesDEX 交易事件
dex.pools流动性池事件
tokensToken 事件
balances余额变动事件
transfers转账事件
token-supplies代币供应量事件
token-prices代币价格事件
token-holdings代币持仓数据
token-market-caps代币市值事件
candlesticksK线数据
trade-stats交易统计数据

完整 Topic 列表

以下 Topics 适用于所有支持的链(将 {chain} 替换为 solbsceth):
# DEX 交易
{chain}.dex.trades
{chain}.dex.trades.processed    # 包含 USD/原生币价格、可疑标记

# 代币事件
{chain}.tokens
{chain}.tokens.created          # 代币创建事件
{chain}.tokens.processed        # 包含描述、图片、社交链接

# 余额变动
{chain}.balances
{chain}.balances.processed      # 包含 USD/原生币价值

# 流动性池
{chain}.dex.pools
{chain}.dex.pools.processed     # 包含流动性 USD/原生币价值

# 代币数据
{chain}.token-supplies
{chain}.token-supplies.processed
{chain}.token-prices
{chain}.token-holdings
{chain}.token-market-caps.processed

# 聚合数据
{chain}.candlesticks            # OHLCV K线数据
{chain}.trade-stats             # 交易统计
完整的 Protobuf Schema 和 Topic 映射请参考 streaming_protobuf 仓库

消费模式与 Offset 管理

订阅 topic 时需要关注两个核心配置:

Offset 策略选择

消费者在连接 Kafka 后,需要决定从哪个位置开始读取消息。两种常见策略:
每次连接从当前最新位置开始,适合只关心实时数据的场景。重连后不会回溯历史消息。
{
  autoCommit: false,
  fromBeginning: false,
  'auto.offset.reset': 'latest'
}

Group ID 规则

多实例部署同一 Group ID 可实现故障转移和负载均衡——同一 topic 的消息只会被 Group 中的一个实例消费,Kafka 自动在实例间分配分区。
通常建议每个 topic 对应一个独立 consumer,因为不同 topic 的消息解析逻辑不同。

Quick Start:5 分钟跑通第一个 Consumer

以下示例展示如何消费 eth.dex.trades topic 并解析 DEX 交易数据。
1

获取 Protobuf Schema

从官方仓库克隆 Schema 定义:
git clone https://github.com/chainstream-io/streaming_protobuf.git
或作为 Git submodule 添加到项目:
git submodule add https://github.com/chainstream-io/streaming_protobuf.git
2

安装依赖

pip install kafka-python protobuf
3

配置连接并消费

from kafka import KafkaConsumer
from common import trade_event_pb2  # 从 streaming_protobuf 仓库获取

# 创建 consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# 消费消息
for message in consumer:
    # 解析 protobuf 消息
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # 打印 DEX 交易信息
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}")
        print("---")

核心数据结构

所有消息类型共享以下基础结构(定义于 common/common.proto):

基础结构

区块信息:
字段类型说明
timestampint64区块时间戳
hashstring区块哈希
heightuint64区块高度
slotuint64Slot 号(Solana)

主要消息类型

Topic: {chain}.dex.trades
message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // processed topic 包含
}
Trade 核心字段
字段说明
token_a_address / token_b_address交易对代币地址
user_a_amount / user_b_amount用户交易数量
pool_address池子地址
vault_a / vault_b池子 Vault 地址
vault_a_amount / vault_b_amountVault 数量
TradeProcessed 增强字段(processed topic):
字段说明
token_a_price_in_usd / token_b_price_in_usdUSD 价格
token_a_price_in_native / token_b_price_in_native原生币价格
is_token_a_price_in_usd_suspect价格是否可疑
is_token_a_price_in_usd_suspect_reason可疑原因
Topic: {chain}.tokens, {chain}.tokens.created
message TokenEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  EventType type = 100;        // CREATED, UPDATED
  Token token = 101;
  TokenProcessed token_processed = 200;
}
Token 核心字段
字段说明
address代币地址
name / symbol名称和符号
decimals精度
uri元数据 URI
metadata_address元数据地址
creators创建者列表
solana_extraSolana 特有字段
evm_extraEVM 特有字段(token_standard)
Topic: {chain}.balances
message BalanceEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Balance balance = 100;
  BalanceProcessed balance_processed = 200;
}
Balance 核心字段
字段说明
token_account_addressToken 账户地址
account_owner_address账户所有者地址
token_address代币地址
pre_amount / post_amount变动前后余额
decimals精度
lifecycle账户生命周期(NEW/EXISTING/CLOSED)
Topic: {chain}.dex.pools
message DexPoolEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  DexPoolEventType type = 100;  // INITIALIZE, INCREASE_LIQUIDITY, DECREASE_LIQUIDITY, SWAP
  DexPool pool = 101;
  DexPoolProcessed pool_processed = 200;
}
DexPool 核心字段
字段说明
address池子地址
token_a_address / token_b_address代币地址
token_a_vault_address / token_b_vault_addressVault 地址
token_a_amount / token_b_amount代币数量
lp_walletLP 钱包地址
Topic: {chain}.candlesticks
字段说明
token_address代币地址
resolution时间周期(1m, 5m, 15m, 1h 等)
timestamp时间戳
open / high / low / closeOHLC 价格(USD)
open_in_native / high_in_native / low_in_native / close_in_nativeOHLC 价格(原生币)
volume / volume_in_usd / volume_in_native成交量
trades交易笔数
dimension维度类型(TOKEN_ADDRESS/POOL_ADDRESS/PAIR)
Topic: {chain}.trade-stats
字段说明
token_address代币地址
resolution时间周期
buys / sells买入/卖出笔数
buyers / sellers买家/卖家数
buy_volume / sell_volume买入/卖出量
buy_volume_in_usd / sell_volume_in_usdUSD 成交量
high_in_usd / low_in_usd最高/最低价
Topic: {chain}.token-holdings
字段组说明
top10_holders / top10_amount / top10_ratioTop 10 持有者统计
top50_holders / top50_amount / top50_ratioTop 50 持有者统计
top100_holders / top100_amount / top100_ratioTop 100 持有者统计
holders总持有者数
creators_count / creators_amount / creators_ratio创建者持仓统计
fresh_count / fresh_amount / fresh_ratio新地址持仓统计
smart_count / smart_amount / smart_ratioSmart Money 持仓统计
sniper_count / sniper_amount / sniper_ratioSniper 持仓统计
insider_count / insider_amount / insider_ratio内部人持仓统计
Topic: {chain}.token-prices
字段说明
token_address代币地址
price_in_usdUSD 价格
price_in_native原生币价格
Topic: {chain}.token-supplies
字段说明
type事件类型(INITIALIZE_MINT/MINT/BURN)
token_address代币地址
amount数量
decimals精度
amount_with_decimals带精度的数量
完整的 Protobuf 定义请参考 streaming_protobuf 仓库

消息特性与注意事项

开发者在消费 Kafka Streams 时需要注意以下消息特性:
Stream 不做预过滤,包含 topic 内的所有消息和完整数据。这意味着消费端需要有足够的网络吞吐、服务器性能和高效的解析代码。
同一个代币或同一个账号的消息严格按 block 顺序到达。这意味着针对特定代币或钱包地址的事件流是有序的,方便追踪状态变化。但不同代币/账号之间的消息到达顺序不做保证。
同一条消息可能被投递多次。如果重复处理会造成问题,消费端需要维护缓存或存储来实现幂等处理。
ChainStream 保证每条消息的完整性,消息不会被拆分。无论区块包含多少交易,您收到的消息都是完整的数据单元。
消息使用 Protobuf 编码,比 JSON 更紧凑。消费端需要使用对应语言的 Protobuf 库进行解析。

延迟模型

Kafka Streams 的延迟取决于数据在管道中经过的处理环节。同一条链的不同 topic 延迟不同:

Broadcasted vs Committed

类型说明延迟数据确定性
Broadcasted交易在广播阶段即可消费,无需等待区块确认最低较低
Committed交易经过区块确认后才进入 stream较高最高

处理管道延迟

数据从区块链节点到 Kafka topic 的每一层转换(解析、结构化、enrichment)都会引入约 100-1000ms 的延迟:
  • raw topic:延迟最低,接近原始节点数据
  • transactions topic:经过解析和结构化
  • dextrades topic:延迟相对更高,但数据更丰富
如果延迟是首要考量,优先选择离原始数据最近、你能有效解析的 topic。

最佳实践

分区并行消费

Kafka topic 被划分为多个分区(partition),每个分区需要并行读取以最大化吞吐量。 消息的分区键设置为 代币地址钱包地址(所有链统一),这确保:
  • 同一代币的所有事件路由到同一分区,保证顺序性
  • 同一钱包的所有余额变动路由到同一分区,方便状态追踪
建议为每个分区分配一个独立线程,确保负载均衡。

持续消费,不阻塞主循环

Consumer 的读取循环应保持持续运行,避免因消息处理阻塞而导致积压。如果需要对消息做处理,应采用异步处理模式:主循环负责读取,处理逻辑委托给 worker 线程。

消息处理效率

批量处理可以降低开销,但需要在批量大小和延迟之间权衡。在 Go 中可以使用 channel + worker group 实现并发处理。

链特定文档


相关文档