跳转到主要内容
EVM 链按不同网络有不同的出块间隔(Ethereum 主网约 12 秒/块),支持 Ethereum、BSC、Base、Polygon (Matic)、Optimism 等网络,共享统一的 Protobuf Schema。 Schema 仓库github.com/chainstream-io/streaming_protobuf/evm

Message Types 总览

EVM Streams 提供以下消息类型:
Message Type说明Topic
TradeEventsDEX 交易事件{chain}.dex.trades
TokenEventsToken 事件{chain}.tokens
BalanceEvents余额变动事件{chain}.balances
DexPoolEvents流动性池事件{chain}.dex.pools
TransfersMessage转账消息{chain}.v1.transfers.proto
CandlestickEventsK线数据{chain}.candlesticks

Block-Level Data

每个区块包含 BlockHeader,核心字段:
字段类型说明
Numberuint64区块号
Hashbytes区块哈希
ParentHashbytes父区块哈希
TimestampTimestamp出块时间
BaseFeePerGasuint64EIP-1559 基础费用
GasUseduint64区块 Gas 消耗
GasLimituint64区块 Gas 上限
BlockMessage 还包含:
字段说明
Transactions区块内所有交易
Withdrawals验证者提款(以太坊 Shanghai 升级后)
BlobGasUsedBlob Gas 消耗(EIP-4844)

Transaction-Level Data

TransactionHeader — 核心交易元数据

字段类型说明
Hashbytes交易哈希
Indexuint32区块内索引
Frombytes发送方地址
Tobytes接收方地址
ValueBigInt转账金额(wei)
Nonceuint64发送方 nonce
Typeuint32交易类型(0/1/2)

ReceiptHeader — 执行结果

字段说明
Status执行状态(1=成功,0=失败)
GasUsed实际 Gas 消耗
CumulativeGasUsed累计 Gas 消耗
ContractAddress创建的合约地址(如有)

TransactionFee — 费用明细

字段说明
SenderFee发送方支付的总费用
MinerReward矿工/验证者奖励
BurntEIP-1559 燃烧部分
Savings节省的 Gas 费用

Calls — 内部调用 Trace

包含所有嵌套合约调用,每个 call 包含:
  • FromTo:调用方和被调用方
  • InputOutput:输入输出数据
  • GasUsed:Gas 消耗
  • Opcode:操作码(CALL/DELEGATECALL/STATICCALL)
  • Signature:函数签名
  • Logs:事件日志
  • StateChanges:状态变更
  • ReturnValues:返回值

Balance Updates

ERC-20/721/1155 Token 余额变动:
字段说明
TokenToken 信息(地址、是否 Fungible、精度、总供应量)
Wallet钱包地址
PostBalance交易后余额

Transfer Data

TransfersMessage 提供 EVM 链的转账信息(Topic: {chain}.v1.transfers.proto)。

TransfersMessage 结构

message TransfersMessage {
  Chain Chain = 1;
  BlockHeader Header = 2;
  repeated Transfer Transfers = 5;
  optional BlockHeader L1Header = 6;  // L2 链包含
}

Transfer 结构

字段类型说明
CallIndexuint64调用索引
LogIndexuint64日志索引
Senderstring发送方地址
Receiverstring接收方地址
Amountstring转账金额
IdstringToken ID(NFT)
URIstringToken URI
CurrencyTokenInfoToken 信息
Successbool是否成功
Indexuint32转账索引
TransactionHeaderTransactionHeader交易头信息

TokenInfo 结构

字段类型说明
SmartContractstring合约地址
Delegatedbool是否委托
DelegatedTostring委托地址
ProtocolNamestring协议名称
NamestringToken 名称
SymbolstringToken 符号
Decimalsint32精度
HasURIbool是否有 URI
Fungiblebool是否同质化
AssetIdstring资产 ID

支持的 Token 标准

标准说明
ERC-20同质化代币
ERC-721非同质化代币(NFT)
ERC-1155多资产代币

DEX Data

TradeEvents 提供 DEX 交易数据(Topic: {chain}.dex.trades)。

TradeEvent 结构

message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // processed topic 包含
}

Trade 核心字段

字段说明
token_a_address / token_b_address交易对代币地址
token_a_decimals / token_b_decimals代币精度
user_a_amount / user_b_amount用户交易数量
user_a_pre_amount / user_a_post_amount用户交易前后余额
pool_address池子地址
vault_a / vault_b池子 Vault 地址
vault_a_amount / vault_b_amountVault 数量
vault_a_pre_amount / vault_a_post_amountVault 交易前后余额
was_original_direction是否原始方向
pool_config_address池子配置地址

TradeProcessed 增强字段

.processed topic 额外包含:
字段说明
token_a_price_in_usd / token_b_price_in_usdUSD 价格
token_a_price_in_native / token_b_price_in_native原生币价格
is_token_a_price_in_usd_suspect价格是否可疑
is_token_a_price_in_usd_suspect_reason可疑原因(如价格波动异常、小额交易等)

DexPoolEvent - 流动性池

DexPoolEvents 提供流动性池事件(Topic: {chain}.dex.pools)。
字段说明
type事件类型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
address池子地址
token_a_address / token_b_address代币地址
token_a_vault_address / token_b_vault_addressVault 地址
token_a_amount / token_b_amount代币数量
lp_walletLP 钱包地址

EVM 链特性

Gas 与 EIP-1559 费用模型

EVM 使用 Gas 机制计量计算资源。EIP-1559 引入了 BaseFee 动态调整和费用燃烧,交易费拆分为:
  • 验证者奖励MinerReward
  • 燃烧部分Burnt

Layer 2 支持

EVM Streams 为 L2 链提供专属字段:
字段说明适用链
L1Header对应的 L1 区块信息所有 L2
SequenceNumber序列号Optimism
BatcherAddrBatcher 地址Optimism
L1FeeOverheadL1 费用开销Optimism
GasL1L1 数据成本Arbitrum

Topic → Message Type 映射表

TopicProto FileMessage Type说明
{chain}.dex.tradestrade_event.protoTradeEventsDEX 交易事件
{chain}.dex.trades.processedtrade_event.protoTradeEvents含 USD 价格、可疑标记
{chain}.tokenstoken_event.protoTokenEventsToken 事件
{chain}.tokens.processedtoken_event.protoTokenEvents含描述、图片、社交链接
{chain}.balancesbalance_event.protoBalanceEvents余额变动事件
{chain}.balances.processedbalance_event.protoBalanceEvents含 USD 价值
{chain}.dex.poolsdex_pool_event.protoDexPoolEvents流动性池事件
{chain}.v1.transfers.prototransfers_message.protoTransfersMessageEVM 转账消息
{chain}.candlestickscandlestick.protoCandlestickEventsK线数据
{chain} 替换为 ethbsc。例如 eth.dex.tradesbsc.tokens.processed

代码示例

Python 示例:消费 DEX 交易数据

from kafka import KafkaConsumer
from common import trade_event_pb2  # 从 streaming_protobuf 仓库获取

# 创建 consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# 消费并解析消息
for message in consumer:
    # 解析 protobuf 消息
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # 遍历 DEX 交易
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
        print("---")

JavaScript 示例

const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['<your_broker_address>'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: 'your_username',
    password: 'your_password'
  }
});

const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });

async function run() {
  // 加载 protobuf 定义
  const root = await protobuf.load('common/trade_event.proto');
  const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');

  await consumer.connect();
  await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const tradeEvents = TradeEvents.decode(message.value);
      
      tradeEvents.events.forEach(event => {
        console.log(`Pool: ${event.trade.poolAddress}`);
        console.log(`Token A: ${event.trade.tokenAAddress}`);
        console.log(`Token B: ${event.trade.tokenBAddress}`);
        console.log(`Amount A: ${event.trade.userAAmount}`);
        console.log(`Amount B: ${event.trade.userBAmount}`);
        console.log(`Block: ${event.block.height}`);
      });
    }
  });
}

run().catch(console.error);

相关文档