Skip to main content
EVM chains have different block intervals depending on the network (Ethereum mainnet ~12 seconds/block), supporting Ethereum, BSC, Base, Polygon (Matic), Optimism and other networks, sharing a unified Protobuf Schema. Schema Repository: github.com/chainstream-io/streaming_protobuf/evm

Message Types Overview

EVM Streams provides the following message types:
Message TypeDescriptionTopic
TradeEventsDEX trade events{chain}.dex.trades
TokenEventsToken events{chain}.tokens
BalanceEventsBalance change events{chain}.balances
DexPoolEventsLiquidity pool events{chain}.dex.pools
TransfersMessageTransfer messages{chain}.v1.transfers.proto
CandlestickEventsCandlestick data{chain}.candlesticks

Block-Level Data

Each block contains BlockHeader with core fields:
FieldTypeDescription
Numberuint64Block number
HashbytesBlock hash
ParentHashbytesParent block hash
TimestampTimestampBlock time
BaseFeePerGasuint64EIP-1559 base fee
GasUseduint64Block gas consumption
GasLimituint64Block gas limit
BlockMessage also contains:
FieldDescription
TransactionsAll transactions in the block
WithdrawalsValidator withdrawals (post-Shanghai upgrade)
BlobGasUsedBlob gas consumption (EIP-4844)

Transaction-Level Data

TransactionHeader — Core Transaction Metadata

FieldTypeDescription
HashbytesTransaction hash
Indexuint32Index within block
FrombytesSender address
TobytesRecipient address
ValueBigIntTransfer amount (wei)
Nonceuint64Sender nonce
Typeuint32Transaction type (0/1/2)

ReceiptHeader — Execution Result

FieldDescription
StatusExecution status (1=success, 0=failure)
GasUsedActual gas consumption
CumulativeGasUsedCumulative gas consumption
ContractAddressCreated contract address (if any)

TransactionFee — Fee Details

FieldDescription
SenderFeeTotal fee paid by sender
MinerRewardMiner/validator reward
BurntEIP-1559 burnt portion
SavingsSaved gas fees

Calls — Internal Call Traces

Contains all nested contract calls, each call includes:
  • From, To: Caller and callee
  • Input, Output: Input and output data
  • GasUsed: Gas consumption
  • Opcode: Operation code (CALL/DELEGATECALL/STATICCALL)
  • Signature: Function signature
  • Logs: Event logs
  • StateChanges: State changes
  • ReturnValues: Return values

Balance Updates

ERC-20/721/1155 Token balance changes:
FieldDescription
TokenToken info (address, fungibility, decimals, total supply)
WalletWallet address
PostBalanceBalance after transaction

Transfer Data

TransfersMessage provides EVM chain transfer information (Topic: {chain}.v1.transfers.proto).

TransfersMessage Structure

message TransfersMessage {
  Chain Chain = 1;
  BlockHeader Header = 2;
  repeated Transfer Transfers = 5;
  optional BlockHeader L1Header = 6;  // Included for L2 chains
}

Transfer Structure

FieldTypeDescription
CallIndexuint64Call index
LogIndexuint64Log index
SenderstringSender address
ReceiverstringReceiver address
AmountstringTransfer amount
IdstringToken ID (NFT)
URIstringToken URI
CurrencyTokenInfoToken information
SuccessboolSuccess status
Indexuint32Transfer index
TransactionHeaderTransactionHeaderTransaction header

TokenInfo Structure

FieldTypeDescription
SmartContractstringContract address
DelegatedboolIs delegated
DelegatedTostringDelegation address
ProtocolNamestringProtocol name
NamestringToken name
SymbolstringToken symbol
Decimalsint32Decimals
HasURIboolHas URI
FungibleboolIs fungible
AssetIdstringAsset ID

Supported Token Standards

StandardDescription
ERC-20Fungible tokens
ERC-721Non-fungible tokens (NFT)
ERC-1155Multi-asset tokens

DEX Data

TradeEvents provides DEX trade data (Topic: {chain}.dex.trades).

TradeEvent Structure

message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // included in processed topic
}

Trade Core Fields

FieldDescription
token_a_address / token_b_addressTrading pair token addresses
token_a_decimals / token_b_decimalsToken decimals
user_a_amount / user_b_amountUser trade amounts
user_a_pre_amount / user_a_post_amountUser balance before/after
pool_addressPool address
vault_a / vault_bPool vault addresses
vault_a_amount / vault_b_amountVault amounts
vault_a_pre_amount / vault_a_post_amountVault balance before/after
was_original_directionIs original direction
pool_config_addressPool config address

TradeProcessed Enhanced Fields

.processed topic includes additional fields:
FieldDescription
token_a_price_in_usd / token_b_price_in_usdUSD price
token_a_price_in_native / token_b_price_in_nativeNative currency price
is_token_a_price_in_usd_suspectIs price suspicious
is_token_a_price_in_usd_suspect_reasonSuspicious reason (e.g., price volatility, small amount)

DexPoolEvent - Liquidity Pools

DexPoolEvents provides liquidity pool events (Topic: {chain}.dex.pools).
FieldDescription
typeEvent type (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
addressPool address
token_a_address / token_b_addressToken addresses
token_a_vault_address / token_b_vault_addressVault addresses
token_a_amount / token_b_amountToken amounts
lp_walletLP wallet address

EVM Chain Characteristics

Gas & EIP-1559 Fee Model

EVM uses Gas mechanism to measure computational resources. EIP-1559 introduced dynamic BaseFee adjustment and fee burning, transaction fees split into:
  • Validator reward: MinerReward
  • Burnt portion: Burnt

Layer 2 Support

EVM Streams provides dedicated fields for L2 chains:
FieldDescriptionApplicable Chains
L1HeaderCorresponding L1 block informationAll L2
SequenceNumberSequence numberOptimism
BatcherAddrBatcher addressOptimism
L1FeeOverheadL1 fee overheadOptimism
GasL1L1 data costArbitrum

Topic → Message Type Mapping

TopicProto FileMessage TypeDescription
{chain}.dex.tradestrade_event.protoTradeEventsDEX trade events
{chain}.dex.trades.processedtrade_event.protoTradeEventsWith USD price, suspicious flag
{chain}.tokenstoken_event.protoTokenEventsToken events
{chain}.tokens.processedtoken_event.protoTokenEventsWith description, image, social links
{chain}.balancesbalance_event.protoBalanceEventsBalance change events
{chain}.balances.processedbalance_event.protoBalanceEventsWith USD value
{chain}.dex.poolsdex_pool_event.protoDexPoolEventsLiquidity pool events
{chain}.v1.transfers.prototransfers_message.protoTransfersMessageEVM transfer messages
{chain}.candlestickscandlestick.protoCandlestickEventsCandlestick data
Replace {chain} with eth or bsc. E.g., eth.dex.trades, bsc.tokens.processed.

Code Examples

Python Example: Consume DEX Trade Data

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# Consume and parse messages
for message in consumer:
    # Parse protobuf message
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # Iterate DEX trades
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
        print("---")

JavaScript Example

const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['<your_broker_address>'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: 'your_username',
    password: 'your_password'
  }
});

const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });

async function run() {
  // Load protobuf definitions
  const root = await protobuf.load('common/trade_event.proto');
  const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');

  await consumer.connect();
  await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const tradeEvents = TradeEvents.decode(message.value);
      
      tradeEvents.events.forEach(event => {
        console.log(`Pool: ${event.trade.poolAddress}`);
        console.log(`Token A: ${event.trade.tokenAAddress}`);
        console.log(`Token B: ${event.trade.tokenBAddress}`);
        console.log(`Amount A: ${event.trade.userAAmount}`);
        console.log(`Amount B: ${event.trade.userBAmount}`);
        console.log(`Block: ${event.block.height}`);
      });
    }
  });
}

run().catch(console.error);