メインコンテンツへスキップ
EVM チェーンはネットワークによってブロック間隔が異なります(Ethereum メインネットは約 12 秒/ブロック)。Ethereum、BSC、Base、Polygon(Matic)、Optimism などのネットワークをサポートし、統一された Protobuf Schema を共有しています。 Schema リポジトリgithub.com/chainstream-io/streaming_protobuf/evm

メッセージタイプ一覧

EVM Streams は以下のメッセージタイプを提供します:
メッセージタイプ説明Topic
TradeEventsDEX 取引イベント{chain}.dex.trades
TokenEventsトークンイベント{chain}.tokens
BalanceEvents残高変動イベント{chain}.balances
DexPoolEvents流動性プールイベント{chain}.dex.pools
TransfersMessage送金メッセージ{chain}.v1.transfers.proto
CandlestickEventsローソク足データ{chain}.candlesticks

ブロックレベルデータ

各ブロックには BlockHeader が含まれ、コアフィールドは以下の通りです:
フィールド説明
Numberuint64ブロック番号
Hashbytesブロックハッシュ
ParentHashbytes親ブロックハッシュ
TimestampTimestampブロック時刻
BaseFeePerGasuint64EIP-1559 基本手数料
GasUseduint64ブロック Gas 消費量
GasLimituint64ブロック Gas 上限
BlockMessage には以下も含まれます:
フィールド説明
Transactionsブロック内のすべてのトランザクション
Withdrawalsバリデータ引き出し(Shanghai アップグレード以降)
BlobGasUsedBlob Gas 消費量(EIP-4844)

トランザクションレベルデータ

TransactionHeader — コアトランザクションメタデータ

フィールド説明
Hashbytesトランザクションハッシュ
Indexuint32ブロック内インデックス
Frombytes送信者アドレス
Tobytes受信者アドレス
ValueBigInt送金額(wei)
Nonceuint64送信者 nonce
Typeuint32トランザクションタイプ(0/1/2)

ReceiptHeader — 実行結果

フィールド説明
Status実行ステータス(1=成功、0=失敗)
GasUsed実際の Gas 消費量
CumulativeGasUsed累積 Gas 消費量
ContractAddress作成されたコントラクトアドレス(ある場合)

TransactionFee — 手数料の詳細

フィールド説明
SenderFee送信者が支払った総手数料
MinerRewardマイナー/バリデータ報酬
BurntEIP-1559 バーン分
Savings節約された Gas 手数料

Calls — 内部コールトレース

すべてのネストされたコントラクト呼び出しを含み、各 call には以下が含まれます:
  • FromTo:呼び出し元と呼び出し先
  • InputOutput:入出力データ
  • GasUsed:Gas 消費量
  • Opcode:オペコード(CALL/DELEGATECALL/STATICCALL)
  • Signature:関数シグネチャ
  • Logs:イベントログ
  • StateChanges:状態変更
  • ReturnValues:戻り値

Balance Updates

ERC-20/721/1155 トークンの残高変動:
フィールド説明
Tokenトークン情報(アドレス、ファンジブル性、精度、総供給量)
Walletウォレットアドレス
PostBalanceトランザクション後の残高

送金データ

TransfersMessage は EVM チェーンの送金情報を提供します(Topic: {chain}.v1.transfers.proto)。

TransfersMessage 構造

message TransfersMessage {
  Chain Chain = 1;
  BlockHeader Header = 2;
  repeated Transfer Transfers = 5;
  optional BlockHeader L1Header = 6;  // Included for L2 chains
}

Transfer 構造

フィールド説明
CallIndexuint64コールインデックス
LogIndexuint64ログインデックス
Senderstring送信者アドレス
Receiverstring受信者アドレス
Amountstring送金額
Idstringトークン ID(NFT)
URIstringトークン URI
CurrencyTokenInfoトークン情報
Successbool成功ステータス
Indexuint32送金インデックス
TransactionHeaderTransactionHeaderトランザクションヘッダー

TokenInfo 構造

フィールド説明
SmartContractstringコントラクトアドレス
Delegatedbool委任されているか
DelegatedTostring委任先アドレス
ProtocolNamestringプロトコル名
Namestringトークン名
Symbolstringトークンシンボル
Decimalsint32精度
HasURIboolURI があるか
Fungibleboolファンジブルか
AssetIdstringアセット ID

サポートされるトークン標準

標準説明
ERC-20ファンジブルトークン
ERC-721ノンファンジブルトークン(NFT)
ERC-1155マルチアセットトークン

DEX データ

TradeEvents は DEX 取引データを提供します(Topic: {chain}.dex.trades)。

TradeEvent 構造

message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // included in processed topic
}

Trade コアフィールド

フィールド説明
token_a_address / token_b_address取引ペアトークンアドレス
token_a_decimals / token_b_decimalsトークン精度
user_a_amount / user_b_amountユーザー取引数量
user_a_pre_amount / user_a_post_amountユーザーの取引前後の残高
pool_addressプールアドレス
vault_a / vault_bプール Vault アドレス
vault_a_amount / vault_b_amountVault 数量
vault_a_pre_amount / vault_a_post_amountVault の取引前後の残高
was_original_directionオリジナル方向かどうか
pool_config_addressプール設定アドレス

TradeProcessed 拡張フィールド

.processed topic には以下の追加フィールドが含まれます:
フィールド説明
token_a_price_in_usd / token_b_price_in_usdUSD 価格
token_a_price_in_native / token_b_price_in_nativeネイティブ通貨価格
is_token_a_price_in_usd_suspect価格が疑わしいか
is_token_a_price_in_usd_suspect_reason疑わしい理由(例:価格変動、少額取引)

DexPoolEvent - 流動性プール

DexPoolEvents は流動性プールイベントを提供します(Topic: {chain}.dex.pools)。
フィールド説明
typeイベントタイプ(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
addressプールアドレス
token_a_address / token_b_addressトークンアドレス
token_a_vault_address / token_b_vault_addressVault アドレス
token_a_amount / token_b_amountトークン数量
lp_walletLP ウォレットアドレス

EVM チェーンの特徴

Gas と EIP-1559 手数料モデル

EVM は Gas メカニズムを使用して計算リソースを計量します。EIP-1559 では動的な BaseFee 調整と手数料バーンが導入され、トランザクション手数料は以下に分割されます:
  • バリデータ報酬MinerReward
  • バーン分Burnt

Layer 2 サポート

EVM Streams は L2 チェーン専用のフィールドを提供します:
フィールド説明対応チェーン
L1Header対応する L1 ブロック情報全 L2
SequenceNumberシーケンス番号Optimism
BatcherAddrBatcher アドレスOptimism
L1FeeOverheadL1 手数料オーバーヘッドOptimism
GasL1L1 データコストArbitrum

Topic → メッセージタイプ マッピング

TopicProto Fileメッセージタイプ説明
{chain}.dex.tradestrade_event.protoTradeEventsDEX 取引イベント
{chain}.dex.trades.processedtrade_event.protoTradeEventsUSD 価格、疑わしいフラグ付き
{chain}.tokenstoken_event.protoTokenEventsトークンイベント
{chain}.tokens.processedtoken_event.protoTokenEvents説明、画像、SNS リンク付き
{chain}.balancesbalance_event.protoBalanceEvents残高変動イベント
{chain}.balances.processedbalance_event.protoBalanceEventsUSD 価値付き
{chain}.dex.poolsdex_pool_event.protoDexPoolEvents流動性プールイベント
{chain}.v1.transfers.prototransfers_message.protoTransfersMessageEVM 送金メッセージ
{chain}.candlestickscandlestick.protoCandlestickEventsローソク足データ
{chain}ethbsc に置き換えてください。例:eth.dex.tradesbsc.tokens.processed

コード例

Python 例:DEX 取引データを消費

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# Consume and parse messages
for message in consumer:
    # Parse protobuf message
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # Iterate DEX trades
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
        print("---")

JavaScript 例

const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['<your_broker_address>'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: 'your_username',
    password: 'your_password'
  }
});

const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });

async function run() {
  // Load protobuf definitions
  const root = await protobuf.load('common/trade_event.proto');
  const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');

  await consumer.connect();
  await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const tradeEvents = TradeEvents.decode(message.value);
      
      tradeEvents.events.forEach(event => {
        console.log(`Pool: ${event.trade.poolAddress}`);
        console.log(`Token A: ${event.trade.tokenAAddress}`);
        console.log(`Token B: ${event.trade.tokenBAddress}`);
        console.log(`Amount A: ${event.trade.userAAmount}`);
        console.log(`Amount B: ${event.trade.userBAmount}`);
        console.log(`Block: ${event.block.height}`);
      });
    }
  });
}

run().catch(console.error);

関連ドキュメント

コンセプトと統合ガイド

Kafka Streams 統合の基礎

Solana Streams

Solana 高スループットデータストリーム

TRON Streams

TRON ネットワークデータストリーム

WebSocket リアルタイムデータ

WebSocket 統合