EVM チェーンはネットワークによってブロック間隔が異なります(Ethereum メインネットは約 12 秒/ブロック)。Ethereum、BSC、Base、Polygon(Matic)、Optimism などのネットワークをサポートし、統一された Protobuf Schema を共有しています。
Schema リポジトリ:github.com/chainstream-io/streaming_protobuf/evm
メッセージタイプ一覧
EVM Streams は以下のメッセージタイプを提供します:
| メッセージタイプ | 説明 | Topic |
|---|
| TradeEvents | DEX 取引イベント | {chain}.dex.trades |
| TokenEvents | トークンイベント | {chain}.tokens |
| BalanceEvents | 残高変動イベント | {chain}.balances |
| DexPoolEvents | 流動性プールイベント | {chain}.dex.pools |
| TransfersMessage | 送金メッセージ | {chain}.v1.transfers.proto |
| CandlestickEvents | ローソク足データ | {chain}.candlesticks |
ブロックレベルデータ
各ブロックには BlockHeader が含まれ、コアフィールドは以下の通りです:
| フィールド | 型 | 説明 |
|---|
Number | uint64 | ブロック番号 |
Hash | bytes | ブロックハッシュ |
ParentHash | bytes | 親ブロックハッシュ |
Timestamp | Timestamp | ブロック時刻 |
BaseFeePerGas | uint64 | EIP-1559 基本手数料 |
GasUsed | uint64 | ブロック Gas 消費量 |
GasLimit | uint64 | ブロック Gas 上限 |
BlockMessage には以下も含まれます:
| フィールド | 説明 |
|---|
Transactions | ブロック内のすべてのトランザクション |
Withdrawals | バリデータ引き出し(Shanghai アップグレード以降) |
BlobGasUsed | Blob Gas 消費量(EIP-4844) |
トランザクションレベルデータ
| フィールド | 型 | 説明 |
|---|
Hash | bytes | トランザクションハッシュ |
Index | uint32 | ブロック内インデックス |
From | bytes | 送信者アドレス |
To | bytes | 受信者アドレス |
Value | BigInt | 送金額(wei) |
Nonce | uint64 | 送信者 nonce |
Type | uint32 | トランザクションタイプ(0/1/2) |
| フィールド | 説明 |
|---|
Status | 実行ステータス(1=成功、0=失敗) |
GasUsed | 実際の Gas 消費量 |
CumulativeGasUsed | 累積 Gas 消費量 |
ContractAddress | 作成されたコントラクトアドレス(ある場合) |
TransactionFee — 手数料の詳細
| フィールド | 説明 |
|---|
SenderFee | 送信者が支払った総手数料 |
MinerReward | マイナー/バリデータ報酬 |
Burnt | EIP-1559 バーン分 |
Savings | 節約された Gas 手数料 |
Calls — 内部コールトレース
すべてのネストされたコントラクト呼び出しを含み、各 call には以下が含まれます:
From、To:呼び出し元と呼び出し先
Input、Output:入出力データ
GasUsed:Gas 消費量
Opcode:オペコード(CALL/DELEGATECALL/STATICCALL)
Signature:関数シグネチャ
Logs:イベントログ
StateChanges:状態変更
ReturnValues:戻り値
Balance Updates
Token Balance Updates
Native Balance Updates
ERC-20/721/1155 トークンの残高変動:| フィールド | 説明 |
|---|
Token | トークン情報(アドレス、ファンジブル性、精度、総供給量) |
Wallet | ウォレットアドレス |
PostBalance | トランザクション後の残高 |
ネイティブ通貨(例:ETH)の残高変動:| フィールド | 説明 |
|---|
Address | アドレス |
PreBalance | トランザクション前の残高 |
PostBalance | トランザクション後の残高 |
ReasonCode | 変動理由コード |
送金データ
TransfersMessage は EVM チェーンの送金情報を提供します(Topic: {chain}.v1.transfers.proto)。
TransfersMessage 構造
message TransfersMessage {
Chain Chain = 1;
BlockHeader Header = 2;
repeated Transfer Transfers = 5;
optional BlockHeader L1Header = 6; // Included for L2 chains
}
Transfer 構造
| フィールド | 型 | 説明 |
|---|
CallIndex | uint64 | コールインデックス |
LogIndex | uint64 | ログインデックス |
Sender | string | 送信者アドレス |
Receiver | string | 受信者アドレス |
Amount | string | 送金額 |
Id | string | トークン ID(NFT) |
URI | string | トークン URI |
Currency | TokenInfo | トークン情報 |
Success | bool | 成功ステータス |
Index | uint32 | 送金インデックス |
TransactionHeader | TransactionHeader | トランザクションヘッダー |
TokenInfo 構造
| フィールド | 型 | 説明 |
|---|
SmartContract | string | コントラクトアドレス |
Delegated | bool | 委任されているか |
DelegatedTo | string | 委任先アドレス |
ProtocolName | string | プロトコル名 |
Name | string | トークン名 |
Symbol | string | トークンシンボル |
Decimals | int32 | 精度 |
HasURI | bool | URI があるか |
Fungible | bool | ファンジブルか |
AssetId | string | アセット ID |
サポートされるトークン標準
| 標準 | 説明 |
|---|
| ERC-20 | ファンジブルトークン |
| ERC-721 | ノンファンジブルトークン(NFT) |
| ERC-1155 | マルチアセットトークン |
DEX データ
TradeEvents は DEX 取引データを提供します(Topic: {chain}.dex.trades)。
TradeEvent 構造
message TradeEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Trade trade = 100;
BondingCurve bonding_curve = 110;
TradeProcessed trade_processed = 200; // included in processed topic
}
Trade コアフィールド
| フィールド | 説明 |
|---|
token_a_address / token_b_address | 取引ペアトークンアドレス |
token_a_decimals / token_b_decimals | トークン精度 |
user_a_amount / user_b_amount | ユーザー取引数量 |
user_a_pre_amount / user_a_post_amount | ユーザーの取引前後の残高 |
pool_address | プールアドレス |
vault_a / vault_b | プール Vault アドレス |
vault_a_amount / vault_b_amount | Vault 数量 |
vault_a_pre_amount / vault_a_post_amount | Vault の取引前後の残高 |
was_original_direction | オリジナル方向かどうか |
pool_config_address | プール設定アドレス |
TradeProcessed 拡張フィールド
.processed topic には以下の追加フィールドが含まれます:
| フィールド | 説明 |
|---|
token_a_price_in_usd / token_b_price_in_usd | USD 価格 |
token_a_price_in_native / token_b_price_in_native | ネイティブ通貨価格 |
is_token_a_price_in_usd_suspect | 価格が疑わしいか |
is_token_a_price_in_usd_suspect_reason | 疑わしい理由(例:価格変動、少額取引) |
DexPoolEvent - 流動性プール
DexPoolEvents は流動性プールイベントを提供します(Topic: {chain}.dex.pools)。
| フィールド | 説明 |
|---|
type | イベントタイプ(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | プールアドレス |
token_a_address / token_b_address | トークンアドレス |
token_a_vault_address / token_b_vault_address | Vault アドレス |
token_a_amount / token_b_amount | トークン数量 |
lp_wallet | LP ウォレットアドレス |
EVM チェーンの特徴
Gas と EIP-1559 手数料モデル
EVM は Gas メカニズムを使用して計算リソースを計量します。EIP-1559 では動的な BaseFee 調整と手数料バーンが導入され、トランザクション手数料は以下に分割されます:
- バリデータ報酬:
MinerReward
- バーン分:
Burnt
Layer 2 サポート
EVM Streams は L2 チェーン専用のフィールドを提供します:
| フィールド | 説明 | 対応チェーン |
|---|
L1Header | 対応する L1 ブロック情報 | 全 L2 |
SequenceNumber | シーケンス番号 | Optimism |
BatcherAddr | Batcher アドレス | Optimism |
L1FeeOverhead | L1 手数料オーバーヘッド | Optimism |
GasL1 | L1 データコスト | Arbitrum |
Topic → メッセージタイプ マッピング
| Topic | Proto File | メッセージタイプ | 説明 |
|---|
{chain}.dex.trades | trade_event.proto | TradeEvents | DEX 取引イベント |
{chain}.dex.trades.processed | trade_event.proto | TradeEvents | USD 価格、疑わしいフラグ付き |
{chain}.tokens | token_event.proto | TokenEvents | トークンイベント |
{chain}.tokens.processed | token_event.proto | TokenEvents | 説明、画像、SNS リンク付き |
{chain}.balances | balance_event.proto | BalanceEvents | 残高変動イベント |
{chain}.balances.processed | balance_event.proto | BalanceEvents | USD 価値付き |
{chain}.dex.pools | dex_pool_event.proto | DexPoolEvents | 流動性プールイベント |
{chain}.v1.transfers.proto | transfers_message.proto | TransfersMessage | EVM 送金メッセージ |
{chain}.candlesticks | candlestick.proto | CandlestickEvents | ローソク足データ |
{chain} を eth や bsc に置き換えてください。例:eth.dex.trades、bsc.tokens.processed。
コード例
Python 例:DEX 取引データを消費
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'eth.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-dex-consumer'
)
# Consume and parse messages
for message in consumer:
# Parse protobuf message
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
# Iterate DEX trades
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
print("---")
JavaScript 例
const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['<your_broker_address>'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: 'your_username',
password: 'your_password'
}
});
const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });
async function run() {
// Load protobuf definitions
const root = await protobuf.load('common/trade_event.proto');
const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');
await consumer.connect();
await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const tradeEvents = TradeEvents.decode(message.value);
tradeEvents.events.forEach(event => {
console.log(`Pool: ${event.trade.poolAddress}`);
console.log(`Token A: ${event.trade.tokenAAddress}`);
console.log(`Token B: ${event.trade.tokenBAddress}`);
console.log(`Amount A: ${event.trade.userAAmount}`);
console.log(`Amount B: ${event.trade.userBAmount}`);
console.log(`Block: ${event.block.height}`);
});
}
});
}
run().catch(console.error);
関連ドキュメント
コンセプトと統合ガイド
Kafka Streams 統合の基礎
Solana Streams
Solana 高スループットデータストリーム
TRON Streams
TRON ネットワークデータストリーム
WebSocket リアルタイムデータ
WebSocket 統合