Protobuf Schema 仓库
ChainStream 官方 Protobuf Schema 定义,支持 Go 和 Python,包含 EVM、Solana、TRON 所有消息类型。
支持矩阵
| 链 | dex.trades | tokens | balances | dex.pools | transfers | candlesticks |
|---|---|---|---|---|---|---|
| Ethereum (eth) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| BSC (bsc) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Solana (sol) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| TRON (tron) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
所有链还支持
token-supplies、token-prices、token-holdings、token-market-caps、trade-stats 等 Topics。详见完整 Topic 列表。Kafka Streams vs WebSocket 选型指南
何时选择 Kafka Streams
延迟敏感
延迟是首要考量,应用部署在云端或专用服务器
消息可靠
不可接受丢失任何消息,需要持久可靠的数据消费
复杂处理
需要对数据做复杂计算、过滤或格式化,超出预处理能力范围
水平扩展
需要多实例水平扩展消费能力
何时选择 WebSocket
快速原型
正在构建原型,开发速度是首要因素
统一接口
应用同时需要历史数据和实时数据,需要统一查询与订阅接口
浏览器端
应用直接在浏览器端消费数据(Kafka Streams 仅支持服务端)
动态过滤
需要根据页面内容动态过滤数据
对比总结
| 特性 | Kafka Streams | WebSocket |
|---|---|---|
| 延迟 | 最低 | 低 |
| 可靠性 | 持久化,不丢消息 | 断连可能丢失 |
| 扩展性 | 原生水平扩展 | 需额外设计 |
| 数据过滤 | 消费端处理 | 服务端预过滤 |
| 客户端支持 | 仅服务端 | 服务端 + 浏览器 |
| 接入复杂度 | 较高 | 较低 |
接入凭证获取
Kafka Streams 使用独立的认证凭证,需要联系 ChainStream 团队申请开通。联系申请
发送邮件至 support@chainstream.io 申请 Kafka Streams 接入权限
连接配置
Broker 地址
Broker 地址将在您的申请审核通过后,随凭证信息一同提供。请勿使用任何未经授权的地址进行连接。
SASL_SSL 连接配置
- Python
- JavaScript
- Go
Topic 命名规范与完整列表
命名规范
Topic 命名遵循以下 pattern:{chain} 包括:sol、bsc、eth、tron
消息类型说明
| 类型 | 说明 |
|---|---|
dex.trades | DEX 交易事件 |
dex.pools | 流动性池事件 |
tokens | Token 事件 |
balances | 余额变动事件 |
transfers | 转账事件 |
token-supplies | 代币供应量事件 |
token-prices | 代币价格事件 |
token-holdings | 代币持仓数据 |
token-market-caps | 代币市值事件 |
candlesticks | K线数据 |
trade-stats | 交易统计数据 |
完整 Topic 列表
- 跨链通用 Topics
- Solana 专用
- EVM 专用
- TRON 专用
以下 Topics 适用于所有支持的链(将
{chain} 替换为 sol、bsc、eth):消费模式与 Offset 管理
订阅 topic 时需要关注两个核心配置:Offset 策略选择
消费者在连接 Kafka 后,需要决定从哪个位置开始读取消息。两种常见策略:- 仅消费最新消息
- 持久消费不丢消息
每次连接从当前最新位置开始,适合只关心实时数据的场景。重连后不会回溯历史消息。
Group ID 规则
多实例部署同一 Group ID 可实现故障转移和负载均衡——同一 topic 的消息只会被 Group 中的一个实例消费,Kafka 自动在实例间分配分区。Quick Start:5 分钟跑通第一个 Consumer
以下示例展示如何消费eth.dex.trades topic 并解析 DEX 交易数据。
核心数据结构
所有消息类型共享以下基础结构(定义于common/common.proto):
基础结构
- Block
- Transaction
- Instruction
- DApp
区块信息:
| 字段 | 类型 | 说明 |
|---|---|---|
timestamp | int64 | 区块时间戳 |
hash | string | 区块哈希 |
height | uint64 | 区块高度 |
slot | uint64 | Slot 号(Solana) |
主要消息类型
TradeEvent - DEX 交易事件
TradeEvent - DEX 交易事件
Topic: Trade 核心字段:
TradeProcessed 增强字段(processed topic):
{chain}.dex.trades| 字段 | 说明 |
|---|---|
token_a_address / token_b_address | 交易对代币地址 |
user_a_amount / user_b_amount | 用户交易数量 |
pool_address | 池子地址 |
vault_a / vault_b | 池子 Vault 地址 |
vault_a_amount / vault_b_amount | Vault 数量 |
| 字段 | 说明 |
|---|---|
token_a_price_in_usd / token_b_price_in_usd | USD 价格 |
token_a_price_in_native / token_b_price_in_native | 原生币价格 |
is_token_a_price_in_usd_suspect | 价格是否可疑 |
is_token_a_price_in_usd_suspect_reason | 可疑原因 |
TokenEvent - 代币事件
TokenEvent - 代币事件
Topic: Token 核心字段:
{chain}.tokens, {chain}.tokens.created| 字段 | 说明 |
|---|---|
address | 代币地址 |
name / symbol | 名称和符号 |
decimals | 精度 |
uri | 元数据 URI |
metadata_address | 元数据地址 |
creators | 创建者列表 |
solana_extra | Solana 特有字段 |
evm_extra | EVM 特有字段(token_standard) |
BalanceEvent - 余额变动事件
BalanceEvent - 余额变动事件
Topic: Balance 核心字段:
{chain}.balances| 字段 | 说明 |
|---|---|
token_account_address | Token 账户地址 |
account_owner_address | 账户所有者地址 |
token_address | 代币地址 |
pre_amount / post_amount | 变动前后余额 |
decimals | 精度 |
lifecycle | 账户生命周期(NEW/EXISTING/CLOSED) |
DexPoolEvent - 流动性池事件
DexPoolEvent - 流动性池事件
Topic: DexPool 核心字段:
{chain}.dex.pools| 字段 | 说明 |
|---|---|
address | 池子地址 |
token_a_address / token_b_address | 代币地址 |
token_a_vault_address / token_b_vault_address | Vault 地址 |
token_a_amount / token_b_amount | 代币数量 |
lp_wallet | LP 钱包地址 |
CandlestickEvent - K线数据
CandlestickEvent - K线数据
Topic:
{chain}.candlesticks| 字段 | 说明 |
|---|---|
token_address | 代币地址 |
resolution | 时间周期(1m, 5m, 15m, 1h 等) |
timestamp | 时间戳 |
open / high / low / close | OHLC 价格(USD) |
open_in_native / high_in_native / low_in_native / close_in_native | OHLC 价格(原生币) |
volume / volume_in_usd / volume_in_native | 成交量 |
trades | 交易笔数 |
dimension | 维度类型(TOKEN_ADDRESS/POOL_ADDRESS/PAIR) |
TradeStatEvent - 交易统计
TradeStatEvent - 交易统计
Topic:
{chain}.trade-stats| 字段 | 说明 |
|---|---|
token_address | 代币地址 |
resolution | 时间周期 |
buys / sells | 买入/卖出笔数 |
buyers / sellers | 买家/卖家数 |
buy_volume / sell_volume | 买入/卖出量 |
buy_volume_in_usd / sell_volume_in_usd | USD 成交量 |
high_in_usd / low_in_usd | 最高/最低价 |
TokenHoldingEvent - 持仓统计
TokenHoldingEvent - 持仓统计
Topic:
{chain}.token-holdings| 字段组 | 说明 |
|---|---|
top10_holders / top10_amount / top10_ratio | Top 10 持有者统计 |
top50_holders / top50_amount / top50_ratio | Top 50 持有者统计 |
top100_holders / top100_amount / top100_ratio | Top 100 持有者统计 |
holders | 总持有者数 |
creators_count / creators_amount / creators_ratio | 创建者持仓统计 |
fresh_count / fresh_amount / fresh_ratio | 新地址持仓统计 |
smart_count / smart_amount / smart_ratio | Smart Money 持仓统计 |
sniper_count / sniper_amount / sniper_ratio | Sniper 持仓统计 |
insider_count / insider_amount / insider_ratio | 内部人持仓统计 |
TokenPriceEvent - 价格事件
TokenPriceEvent - 价格事件
Topic:
{chain}.token-prices| 字段 | 说明 |
|---|---|
token_address | 代币地址 |
price_in_usd | USD 价格 |
price_in_native | 原生币价格 |
TokenSupplyEvent - 供应量事件
TokenSupplyEvent - 供应量事件
Topic:
{chain}.token-supplies| 字段 | 说明 |
|---|---|
type | 事件类型(INITIALIZE_MINT/MINT/BURN) |
token_address | 代币地址 |
amount | 数量 |
decimals | 精度 |
amount_with_decimals | 带精度的数量 |
消息特性与注意事项
开发者在消费 Kafka Streams 时需要注意以下消息特性:无过滤的完整数据流
无过滤的完整数据流
Stream 不做预过滤,包含 topic 内的所有消息和完整数据。这意味着消费端需要有足够的网络吞吐、服务器性能和高效的解析代码。
同一实体消息有序
同一实体消息有序
同一个代币或同一个账号的消息严格按 block 顺序到达。这意味着针对特定代币或钱包地址的事件流是有序的,方便追踪状态变化。但不同代币/账号之间的消息到达顺序不做保证。
消息可能重复
消息可能重复
同一条消息可能被投递多次。如果重复处理会造成问题,消费端需要维护缓存或存储来实现幂等处理。
消息完整性保证
消息完整性保证
ChainStream 保证每条消息的完整性,消息不会被拆分。无论区块包含多少交易,您收到的消息都是完整的数据单元。
Protobuf 二进制格式
Protobuf 二进制格式
消息使用 Protobuf 编码,比 JSON 更紧凑。消费端需要使用对应语言的 Protobuf 库进行解析。
延迟模型
Kafka Streams 的延迟取决于数据在管道中经过的处理环节。同一条链的不同 topic 延迟不同:Broadcasted vs Committed
| 类型 | 说明 | 延迟 | 数据确定性 |
|---|---|---|---|
| Broadcasted | 交易在广播阶段即可消费,无需等待区块确认 | 最低 | 较低 |
| Committed | 交易经过区块确认后才进入 stream | 较高 | 最高 |
处理管道延迟
数据从区块链节点到 Kafka topic 的每一层转换(解析、结构化、enrichment)都会引入约 100-1000ms 的延迟:- raw topic:延迟最低,接近原始节点数据
- transactions topic:经过解析和结构化
- dextrades topic:延迟相对更高,但数据更丰富
最佳实践
分区并行消费
Kafka topic 被划分为多个分区(partition),每个分区需要并行读取以最大化吞吐量。 消息的分区键设置为 代币地址 或 钱包地址(所有链统一),这确保:- 同一代币的所有事件路由到同一分区,保证顺序性
- 同一钱包的所有余额变动路由到同一分区,方便状态追踪

