EVM 鏈按不同網路有不同的出塊間隔(Ethereum 主網約 12 秒/塊),支援 Ethereum、BSC、Base、Polygon (Matic)、Optimism 等網路,共享統一的 Protobuf Schema。
Schema 倉庫:github.com/chainstream-io/streaming_protobuf/evm
Message Types 總覽
EVM Streams 提供以下訊息類型:
| Message Type | 說明 | Topic |
|---|
| TradeEvents | DEX 交易事件 | {chain}.dex.trades |
| TokenEvents | Token 事件 | {chain}.tokens |
| BalanceEvents | 餘額變動事件 | {chain}.balances |
| DexPoolEvents | 流動性池事件 | {chain}.dex.pools |
| TransfersMessage | 轉帳訊息 | {chain}.v1.transfers.proto |
| CandlestickEvents | K線資料 | {chain}.candlesticks |
Block-Level Data
每個區塊包含 BlockHeader,核心欄位:
| 欄位 | 類型 | 說明 |
|---|
Number | uint64 | 區塊號 |
Hash | bytes | 區塊雜湊 |
ParentHash | bytes | 父區塊雜湊 |
Timestamp | Timestamp | 出塊時間 |
BaseFeePerGas | uint64 | EIP-1559 基礎費用 |
GasUsed | uint64 | 區塊 Gas 消耗 |
GasLimit | uint64 | 區塊 Gas 上限 |
BlockMessage 還包含:
| 欄位 | 說明 |
|---|
Transactions | 區塊內所有交易 |
Withdrawals | 驗證者提款(以太坊 Shanghai 升級後) |
BlobGasUsed | Blob Gas 消耗(EIP-4844) |
Transaction-Level Data
| 欄位 | 類型 | 說明 |
|---|
Hash | bytes | 交易雜湊 |
Index | uint32 | 區塊內索引 |
From | bytes | 發送方地址 |
To | bytes | 接收方地址 |
Value | BigInt | 轉帳金額(wei) |
Nonce | uint64 | 發送方 nonce |
Type | uint32 | 交易類型(0/1/2) |
| 欄位 | 說明 |
|---|
Status | 執行狀態(1=成功,0=失敗) |
GasUsed | 實際 Gas 消耗 |
CumulativeGasUsed | 累計 Gas 消耗 |
ContractAddress | 建立的合約地址(如有) |
TransactionFee — 費用明細
| 欄位 | 說明 |
|---|
SenderFee | 發送方支付的總費用 |
MinerReward | 礦工/驗證者獎勵 |
Burnt | EIP-1559 燃燒部分 |
Savings | 節省的 Gas 費用 |
Calls — 內部呼叫 Trace
包含所有巢狀合約呼叫,每個 call 包含:
From、To:呼叫方和被呼叫方
Input、Output:輸入輸出資料
GasUsed:Gas 消耗
Opcode:操作碼(CALL/DELEGATECALL/STATICCALL)
Signature:函數簽名
Logs:事件日誌
StateChanges:狀態變更
ReturnValues:傳回值
Balance Updates
Token Balance Updates
Native Balance Updates
ERC-20/721/1155 Token 餘額變動:| 欄位 | 說明 |
|---|
Token | Token 資訊(地址、是否 Fungible、精度、總供應量) |
Wallet | 錢包地址 |
PostBalance | 交易後餘額 |
原生幣(如 ETH)餘額變動:| 欄位 | 說明 |
|---|
Address | 地址 |
PreBalance | 交易前餘額 |
PostBalance | 交易後餘額 |
ReasonCode | 變動原因碼 |
Transfer Data
TransfersMessage 提供 EVM 鏈的轉帳資訊(Topic: {chain}.v1.transfers.proto)。
TransfersMessage 結構
message TransfersMessage {
Chain Chain = 1;
BlockHeader Header = 2;
repeated Transfer Transfers = 5;
optional BlockHeader L1Header = 6; // L2 鏈包含
}
Transfer 結構
| 欄位 | 類型 | 說明 |
|---|
CallIndex | uint64 | 呼叫索引 |
LogIndex | uint64 | 日誌索引 |
Sender | string | 發送方地址 |
Receiver | string | 接收方地址 |
Amount | string | 轉帳金額 |
Id | string | Token ID(NFT) |
URI | string | Token URI |
Currency | TokenInfo | Token 資訊 |
Success | bool | 是否成功 |
Index | uint32 | 轉帳索引 |
TransactionHeader | TransactionHeader | 交易頭資訊 |
TokenInfo 結構
| 欄位 | 類型 | 說明 |
|---|
SmartContract | string | 合約地址 |
Delegated | bool | 是否委託 |
DelegatedTo | string | 委託地址 |
ProtocolName | string | 協議名稱 |
Name | string | Token 名稱 |
Symbol | string | Token 符號 |
Decimals | int32 | 精度 |
HasURI | bool | 是否有 URI |
Fungible | bool | 是否同質化 |
AssetId | string | 資產 ID |
支援的 Token 標準
| 標準 | 說明 |
|---|
| ERC-20 | 同質化代幣 |
| ERC-721 | 非同質化代幣(NFT) |
| ERC-1155 | 多資產代幣 |
DEX Data
TradeEvents 提供 DEX 交易資料(Topic: {chain}.dex.trades)。
TradeEvent 結構
message TradeEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Trade trade = 100;
BondingCurve bonding_curve = 110;
TradeProcessed trade_processed = 200; // processed topic 包含
}
Trade 核心欄位
| 欄位 | 說明 |
|---|
token_a_address / token_b_address | 交易對代幣地址 |
token_a_decimals / token_b_decimals | 代幣精度 |
user_a_amount / user_b_amount | 使用者交易數量 |
user_a_pre_amount / user_a_post_amount | 使用者交易前後餘額 |
pool_address | 池子地址 |
vault_a / vault_b | 池子 Vault 地址 |
vault_a_amount / vault_b_amount | Vault 數量 |
vault_a_pre_amount / vault_a_post_amount | Vault 交易前後餘額 |
was_original_direction | 是否原始方向 |
pool_config_address | 池子設定地址 |
TradeProcessed 增強欄位
.processed topic 額外包含:
| 欄位 | 說明 |
|---|
token_a_price_in_usd / token_b_price_in_usd | USD 價格 |
token_a_price_in_native / token_b_price_in_native | 原生幣價格 |
is_token_a_price_in_usd_suspect | 價格是否可疑 |
is_token_a_price_in_usd_suspect_reason | 可疑原因(如價格波動異常、小額交易等) |
DexPoolEvent - 流動性池
DexPoolEvents 提供流動性池事件(Topic: {chain}.dex.pools)。
| 欄位 | 說明 |
|---|
type | 事件類型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | 池子地址 |
token_a_address / token_b_address | 代幣地址 |
token_a_vault_address / token_b_vault_address | Vault 地址 |
token_a_amount / token_b_amount | 代幣數量 |
lp_wallet | LP 錢包地址 |
EVM 鏈特性
Gas 與 EIP-1559 費用模型
EVM 使用 Gas 機制計量計算資源。EIP-1559 引入了 BaseFee 動態調整和費用燃燒,交易費拆分為:
- 驗證者獎勵:
MinerReward
- 燃燒部分:
Burnt
Layer 2 支援
EVM Streams 為 L2 鏈提供專屬欄位:
| 欄位 | 說明 | 適用鏈 |
|---|
L1Header | 對應的 L1 區塊資訊 | 所有 L2 |
SequenceNumber | 序列號 | Optimism |
BatcherAddr | Batcher 地址 | Optimism |
L1FeeOverhead | L1 費用開銷 | Optimism |
GasL1 | L1 資料成本 | Arbitrum |
Topic → Message Type 映射表
| Topic | Proto File | Message Type | 說明 |
|---|
{chain}.dex.trades | trade_event.proto | TradeEvents | DEX 交易事件 |
{chain}.dex.trades.processed | trade_event.proto | TradeEvents | 含 USD 價格、可疑標記 |
{chain}.tokens | token_event.proto | TokenEvents | Token 事件 |
{chain}.tokens.processed | token_event.proto | TokenEvents | 含描述、圖片、社交連結 |
{chain}.balances | balance_event.proto | BalanceEvents | 餘額變動事件 |
{chain}.balances.processed | balance_event.proto | BalanceEvents | 含 USD 價值 |
{chain}.dex.pools | dex_pool_event.proto | DexPoolEvents | 流動性池事件 |
{chain}.v1.transfers.proto | transfers_message.proto | TransfersMessage | EVM 轉帳訊息 |
{chain}.candlesticks | candlestick.proto | CandlestickEvents | K線資料 |
將 {chain} 替換為 eth 或 bsc。例如 eth.dex.trades、bsc.tokens.processed。
程式碼示例
Python 示例:消費 DEX 交易資料
from kafka import KafkaConsumer
from common import trade_event_pb2 # 從 streaming_protobuf 倉庫取得
# 建立 consumer
consumer = KafkaConsumer(
'eth.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-dex-consumer'
)
# 消費並解析訊息
for message in consumer:
# 解析 protobuf 訊息
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
# 遍歷 DEX 交易
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
print("---")
JavaScript 示例
const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['<your_broker_address>'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: 'your_username',
password: 'your_password'
}
});
const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });
async function run() {
// 載入 protobuf 定義
const root = await protobuf.load('common/trade_event.proto');
const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');
await consumer.connect();
await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const tradeEvents = TradeEvents.decode(message.value);
tradeEvents.events.forEach(event => {
console.log(`Pool: ${event.trade.poolAddress}`);
console.log(`Token A: ${event.trade.tokenAAddress}`);
console.log(`Token B: ${event.trade.tokenBAddress}`);
console.log(`Amount A: ${event.trade.userAAmount}`);
console.log(`Amount B: ${event.trade.userBAmount}`);
console.log(`Block: ${event.block.height}`);
});
}
});
}
run().catch(console.error);
相關文件
概念與接入指南
Kafka Streams 接入基礎
Solana Streams
Solana 高吞吐資料流
WebSocket 即時資料
WebSocket 接入方式