跳轉到主要內容
EVM 鏈按不同網路有不同的出塊間隔(Ethereum 主網約 12 秒/塊),支援 Ethereum、BSC、Base、Polygon (Matic)、Optimism 等網路,共享統一的 Protobuf Schema。 Schema 倉庫github.com/chainstream-io/streaming_protobuf/evm

Message Types 總覽

EVM Streams 提供以下訊息類型:
Message Type說明Topic
TradeEventsDEX 交易事件{chain}.dex.trades
TokenEventsToken 事件{chain}.tokens
BalanceEvents餘額變動事件{chain}.balances
DexPoolEvents流動性池事件{chain}.dex.pools
TransfersMessage轉帳訊息{chain}.v1.transfers.proto
CandlestickEventsK線資料{chain}.candlesticks

Block-Level Data

每個區塊包含 BlockHeader,核心欄位:
欄位類型說明
Numberuint64區塊號
Hashbytes區塊雜湊
ParentHashbytes父區塊雜湊
TimestampTimestamp出塊時間
BaseFeePerGasuint64EIP-1559 基礎費用
GasUseduint64區塊 Gas 消耗
GasLimituint64區塊 Gas 上限
BlockMessage 還包含:
欄位說明
Transactions區塊內所有交易
Withdrawals驗證者提款(以太坊 Shanghai 升級後)
BlobGasUsedBlob Gas 消耗(EIP-4844)

Transaction-Level Data

TransactionHeader — 核心交易中繼資料

欄位類型說明
Hashbytes交易雜湊
Indexuint32區塊內索引
Frombytes發送方地址
Tobytes接收方地址
ValueBigInt轉帳金額(wei)
Nonceuint64發送方 nonce
Typeuint32交易類型(0/1/2)

ReceiptHeader — 執行結果

欄位說明
Status執行狀態(1=成功,0=失敗)
GasUsed實際 Gas 消耗
CumulativeGasUsed累計 Gas 消耗
ContractAddress建立的合約地址(如有)

TransactionFee — 費用明細

欄位說明
SenderFee發送方支付的總費用
MinerReward礦工/驗證者獎勵
BurntEIP-1559 燃燒部分
Savings節省的 Gas 費用

Calls — 內部呼叫 Trace

包含所有巢狀合約呼叫,每個 call 包含:
  • FromTo:呼叫方和被呼叫方
  • InputOutput:輸入輸出資料
  • GasUsed:Gas 消耗
  • Opcode:操作碼(CALL/DELEGATECALL/STATICCALL)
  • Signature:函數簽名
  • Logs:事件日誌
  • StateChanges:狀態變更
  • ReturnValues:傳回值

Balance Updates

ERC-20/721/1155 Token 餘額變動:
欄位說明
TokenToken 資訊(地址、是否 Fungible、精度、總供應量)
Wallet錢包地址
PostBalance交易後餘額

Transfer Data

TransfersMessage 提供 EVM 鏈的轉帳資訊(Topic: {chain}.v1.transfers.proto)。

TransfersMessage 結構

message TransfersMessage {
  Chain Chain = 1;
  BlockHeader Header = 2;
  repeated Transfer Transfers = 5;
  optional BlockHeader L1Header = 6;  // L2 鏈包含
}

Transfer 結構

欄位類型說明
CallIndexuint64呼叫索引
LogIndexuint64日誌索引
Senderstring發送方地址
Receiverstring接收方地址
Amountstring轉帳金額
IdstringToken ID(NFT)
URIstringToken URI
CurrencyTokenInfoToken 資訊
Successbool是否成功
Indexuint32轉帳索引
TransactionHeaderTransactionHeader交易頭資訊

TokenInfo 結構

欄位類型說明
SmartContractstring合約地址
Delegatedbool是否委託
DelegatedTostring委託地址
ProtocolNamestring協議名稱
NamestringToken 名稱
SymbolstringToken 符號
Decimalsint32精度
HasURIbool是否有 URI
Fungiblebool是否同質化
AssetIdstring資產 ID

支援的 Token 標準

標準說明
ERC-20同質化代幣
ERC-721非同質化代幣(NFT)
ERC-1155多資產代幣

DEX Data

TradeEvents 提供 DEX 交易資料(Topic: {chain}.dex.trades)。

TradeEvent 結構

message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // processed topic 包含
}

Trade 核心欄位

欄位說明
token_a_address / token_b_address交易對代幣地址
token_a_decimals / token_b_decimals代幣精度
user_a_amount / user_b_amount使用者交易數量
user_a_pre_amount / user_a_post_amount使用者交易前後餘額
pool_address池子地址
vault_a / vault_b池子 Vault 地址
vault_a_amount / vault_b_amountVault 數量
vault_a_pre_amount / vault_a_post_amountVault 交易前後餘額
was_original_direction是否原始方向
pool_config_address池子設定地址

TradeProcessed 增強欄位

.processed topic 額外包含:
欄位說明
token_a_price_in_usd / token_b_price_in_usdUSD 價格
token_a_price_in_native / token_b_price_in_native原生幣價格
is_token_a_price_in_usd_suspect價格是否可疑
is_token_a_price_in_usd_suspect_reason可疑原因(如價格波動異常、小額交易等)

DexPoolEvent - 流動性池

DexPoolEvents 提供流動性池事件(Topic: {chain}.dex.pools)。
欄位說明
type事件類型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
address池子地址
token_a_address / token_b_address代幣地址
token_a_vault_address / token_b_vault_addressVault 地址
token_a_amount / token_b_amount代幣數量
lp_walletLP 錢包地址

EVM 鏈特性

Gas 與 EIP-1559 費用模型

EVM 使用 Gas 機制計量計算資源。EIP-1559 引入了 BaseFee 動態調整和費用燃燒,交易費拆分為:
  • 驗證者獎勵MinerReward
  • 燃燒部分Burnt

Layer 2 支援

EVM Streams 為 L2 鏈提供專屬欄位:
欄位說明適用鏈
L1Header對應的 L1 區塊資訊所有 L2
SequenceNumber序列號Optimism
BatcherAddrBatcher 地址Optimism
L1FeeOverheadL1 費用開銷Optimism
GasL1L1 資料成本Arbitrum

Topic → Message Type 映射表

TopicProto FileMessage Type說明
{chain}.dex.tradestrade_event.protoTradeEventsDEX 交易事件
{chain}.dex.trades.processedtrade_event.protoTradeEvents含 USD 價格、可疑標記
{chain}.tokenstoken_event.protoTokenEventsToken 事件
{chain}.tokens.processedtoken_event.protoTokenEvents含描述、圖片、社交連結
{chain}.balancesbalance_event.protoBalanceEvents餘額變動事件
{chain}.balances.processedbalance_event.protoBalanceEvents含 USD 價值
{chain}.dex.poolsdex_pool_event.protoDexPoolEvents流動性池事件
{chain}.v1.transfers.prototransfers_message.protoTransfersMessageEVM 轉帳訊息
{chain}.candlestickscandlestick.protoCandlestickEventsK線資料
{chain} 替換為 ethbsc。例如 eth.dex.tradesbsc.tokens.processed

程式碼示例

Python 示例:消費 DEX 交易資料

from kafka import KafkaConsumer
from common import trade_event_pb2  # 從 streaming_protobuf 倉庫取得

# 建立 consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# 消費並解析訊息
for message in consumer:
    # 解析 protobuf 訊息
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # 遍歷 DEX 交易
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
        print("---")

JavaScript 示例

const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['<your_broker_address>'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: 'your_username',
    password: 'your_password'
  }
});

const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });

async function run() {
  // 載入 protobuf 定義
  const root = await protobuf.load('common/trade_event.proto');
  const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');

  await consumer.connect();
  await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const tradeEvents = TradeEvents.decode(message.value);
      
      tradeEvents.events.forEach(event => {
        console.log(`Pool: ${event.trade.poolAddress}`);
        console.log(`Token A: ${event.trade.tokenAAddress}`);
        console.log(`Token B: ${event.trade.tokenBAddress}`);
        console.log(`Amount A: ${event.trade.userAAmount}`);
        console.log(`Amount B: ${event.trade.userBAmount}`);
        console.log(`Block: ${event.block.height}`);
      });
    }
  });
}

run().catch(console.error);

相關文件

概念與接入指南

Kafka Streams 接入基礎

Solana Streams

Solana 高吞吐資料流

TRON Streams

TRON 網路資料流

WebSocket 即時資料

WebSocket 接入方式