EVM 链按不同网络有不同的出块间隔(Ethereum 主网约 12 秒/块),支持 Ethereum、BSC、Base、Polygon (Matic)、Optimism 等网络,共享统一的 Protobuf Schema。
Schema 仓库:github.com/chainstream-io/streaming_protobuf/evm
Message Types 总览
EVM Streams 提供以下消息类型:
| Message Type | 说明 | Topic |
|---|
| TradeEvents | DEX 交易事件 | {chain}.dex.trades |
| TokenEvents | Token 事件 | {chain}.tokens |
| BalanceEvents | 余额变动事件 | {chain}.balances |
| DexPoolEvents | 流动性池事件 | {chain}.dex.pools |
| TransfersMessage | 转账消息 | {chain}.v1.transfers.proto |
| CandlestickEvents | K线数据 | {chain}.candlesticks |
Block-Level Data
每个区块包含 BlockHeader,核心字段:
| 字段 | 类型 | 说明 |
|---|
Number | uint64 | 区块号 |
Hash | bytes | 区块哈希 |
ParentHash | bytes | 父区块哈希 |
Timestamp | Timestamp | 出块时间 |
BaseFeePerGas | uint64 | EIP-1559 基础费用 |
GasUsed | uint64 | 区块 Gas 消耗 |
GasLimit | uint64 | 区块 Gas 上限 |
BlockMessage 还包含:
| 字段 | 说明 |
|---|
Transactions | 区块内所有交易 |
Withdrawals | 验证者提款(以太坊 Shanghai 升级后) |
BlobGasUsed | Blob Gas 消耗(EIP-4844) |
Transaction-Level Data
| 字段 | 类型 | 说明 |
|---|
Hash | bytes | 交易哈希 |
Index | uint32 | 区块内索引 |
From | bytes | 发送方地址 |
To | bytes | 接收方地址 |
Value | BigInt | 转账金额(wei) |
Nonce | uint64 | 发送方 nonce |
Type | uint32 | 交易类型(0/1/2) |
| 字段 | 说明 |
|---|
Status | 执行状态(1=成功,0=失败) |
GasUsed | 实际 Gas 消耗 |
CumulativeGasUsed | 累计 Gas 消耗 |
ContractAddress | 创建的合约地址(如有) |
TransactionFee — 费用明细
| 字段 | 说明 |
|---|
SenderFee | 发送方支付的总费用 |
MinerReward | 矿工/验证者奖励 |
Burnt | EIP-1559 燃烧部分 |
Savings | 节省的 Gas 费用 |
Calls — 内部调用 Trace
包含所有嵌套合约调用,每个 call 包含:
From、To:调用方和被调用方
Input、Output:输入输出数据
GasUsed:Gas 消耗
Opcode:操作码(CALL/DELEGATECALL/STATICCALL)
Signature:函数签名
Logs:事件日志
StateChanges:状态变更
ReturnValues:返回值
Balance Updates
Token Balance Updates
Native Balance Updates
ERC-20/721/1155 Token 余额变动:| 字段 | 说明 |
|---|
Token | Token 信息(地址、是否 Fungible、精度、总供应量) |
Wallet | 钱包地址 |
PostBalance | 交易后余额 |
原生币(如 ETH)余额变动:| 字段 | 说明 |
|---|
Address | 地址 |
PreBalance | 交易前余额 |
PostBalance | 交易后余额 |
ReasonCode | 变动原因码 |
Transfer Data
TransfersMessage 提供 EVM 链的转账信息(Topic: {chain}.v1.transfers.proto)。
TransfersMessage 结构
message TransfersMessage {
Chain Chain = 1;
BlockHeader Header = 2;
repeated Transfer Transfers = 5;
optional BlockHeader L1Header = 6; // L2 链包含
}
Transfer 结构
| 字段 | 类型 | 说明 |
|---|
CallIndex | uint64 | 调用索引 |
LogIndex | uint64 | 日志索引 |
Sender | string | 发送方地址 |
Receiver | string | 接收方地址 |
Amount | string | 转账金额 |
Id | string | Token ID(NFT) |
URI | string | Token URI |
Currency | TokenInfo | Token 信息 |
Success | bool | 是否成功 |
Index | uint32 | 转账索引 |
TransactionHeader | TransactionHeader | 交易头信息 |
TokenInfo 结构
| 字段 | 类型 | 说明 |
|---|
SmartContract | string | 合约地址 |
Delegated | bool | 是否委托 |
DelegatedTo | string | 委托地址 |
ProtocolName | string | 协议名称 |
Name | string | Token 名称 |
Symbol | string | Token 符号 |
Decimals | int32 | 精度 |
HasURI | bool | 是否有 URI |
Fungible | bool | 是否同质化 |
AssetId | string | 资产 ID |
支持的 Token 标准
| 标准 | 说明 |
|---|
| ERC-20 | 同质化代币 |
| ERC-721 | 非同质化代币(NFT) |
| ERC-1155 | 多资产代币 |
DEX Data
TradeEvents 提供 DEX 交易数据(Topic: {chain}.dex.trades)。
TradeEvent 结构
message TradeEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Trade trade = 100;
BondingCurve bonding_curve = 110;
TradeProcessed trade_processed = 200; // processed topic 包含
}
Trade 核心字段
| 字段 | 说明 |
|---|
token_a_address / token_b_address | 交易对代币地址 |
token_a_decimals / token_b_decimals | 代币精度 |
user_a_amount / user_b_amount | 用户交易数量 |
user_a_pre_amount / user_a_post_amount | 用户交易前后余额 |
pool_address | 池子地址 |
vault_a / vault_b | 池子 Vault 地址 |
vault_a_amount / vault_b_amount | Vault 数量 |
vault_a_pre_amount / vault_a_post_amount | Vault 交易前后余额 |
was_original_direction | 是否原始方向 |
pool_config_address | 池子配置地址 |
TradeProcessed 增强字段
.processed topic 额外包含:
| 字段 | 说明 |
|---|
token_a_price_in_usd / token_b_price_in_usd | USD 价格 |
token_a_price_in_native / token_b_price_in_native | 原生币价格 |
is_token_a_price_in_usd_suspect | 价格是否可疑 |
is_token_a_price_in_usd_suspect_reason | 可疑原因(如价格波动异常、小额交易等) |
DexPoolEvent - 流动性池
DexPoolEvents 提供流动性池事件(Topic: {chain}.dex.pools)。
| 字段 | 说明 |
|---|
type | 事件类型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | 池子地址 |
token_a_address / token_b_address | 代币地址 |
token_a_vault_address / token_b_vault_address | Vault 地址 |
token_a_amount / token_b_amount | 代币数量 |
lp_wallet | LP 钱包地址 |
EVM 链特性
Gas 与 EIP-1559 费用模型
EVM 使用 Gas 机制计量计算资源。EIP-1559 引入了 BaseFee 动态调整和费用燃烧,交易费拆分为:
- 验证者奖励:
MinerReward
- 燃烧部分:
Burnt
Layer 2 支持
EVM Streams 为 L2 链提供专属字段:
| 字段 | 说明 | 适用链 |
|---|
L1Header | 对应的 L1 区块信息 | 所有 L2 |
SequenceNumber | 序列号 | Optimism |
BatcherAddr | Batcher 地址 | Optimism |
L1FeeOverhead | L1 费用开销 | Optimism |
GasL1 | L1 数据成本 | Arbitrum |
Topic → Message Type 映射表
| Topic | Proto File | Message Type | 说明 |
|---|
{chain}.dex.trades | trade_event.proto | TradeEvents | DEX 交易事件 |
{chain}.dex.trades.processed | trade_event.proto | TradeEvents | 含 USD 价格、可疑标记 |
{chain}.tokens | token_event.proto | TokenEvents | Token 事件 |
{chain}.tokens.processed | token_event.proto | TokenEvents | 含描述、图片、社交链接 |
{chain}.balances | balance_event.proto | BalanceEvents | 余额变动事件 |
{chain}.balances.processed | balance_event.proto | BalanceEvents | 含 USD 价值 |
{chain}.dex.pools | dex_pool_event.proto | DexPoolEvents | 流动性池事件 |
{chain}.v1.transfers.proto | transfers_message.proto | TransfersMessage | EVM 转账消息 |
{chain}.candlesticks | candlestick.proto | CandlestickEvents | K线数据 |
将 {chain} 替换为 eth 或 bsc。例如 eth.dex.trades、bsc.tokens.processed。
代码示例
Python 示例:消费 DEX 交易数据
from kafka import KafkaConsumer
from common import trade_event_pb2 # 从 streaming_protobuf 仓库获取
# 创建 consumer
consumer = KafkaConsumer(
'eth.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-dex-consumer'
)
# 消费并解析消息
for message in consumer:
# 解析 protobuf 消息
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
# 遍历 DEX 交易
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
print("---")
JavaScript 示例
const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['<your_broker_address>'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: 'your_username',
password: 'your_password'
}
});
const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });
async function run() {
// 加载 protobuf 定义
const root = await protobuf.load('common/trade_event.proto');
const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');
await consumer.connect();
await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const tradeEvents = TradeEvents.decode(message.value);
tradeEvents.events.forEach(event => {
console.log(`Pool: ${event.trade.poolAddress}`);
console.log(`Token A: ${event.trade.tokenAAddress}`);
console.log(`Token B: ${event.trade.tokenBAddress}`);
console.log(`Amount A: ${event.trade.userAAmount}`);
console.log(`Amount B: ${event.trade.userBAmount}`);
console.log(`Block: ${event.block.height}`);
});
}
});
}
run().catch(console.error);
相关文档