EVM chains have different block intervals depending on the network (Ethereum mainnet ~12 seconds/block), supporting Ethereum, BSC, Base, Polygon (Matic), Optimism and other networks, sharing a unified Protobuf Schema.
Schema Repository : github.com/chainstream-io/streaming_protobuf/evm
Message Types Overview
EVM Streams provides the following message types:
Message Type Description Topic TradeEvents DEX trade events {chain}.dex.tradesTokenEvents Token events {chain}.tokensBalanceEvents Balance change events {chain}.balancesDexPoolEvents Liquidity pool events {chain}.dex.poolsTransfersMessage Transfer messages {chain}.v1.transfers.protoCandlestickEvents Candlestick data {chain}.candlesticks
Block-Level Data
Each block contains BlockHeader with core fields:
Field Type Description Numberuint64 Block number Hashbytes Block hash ParentHashbytes Parent block hash TimestampTimestamp Block time BaseFeePerGasuint64 EIP-1559 base fee GasUseduint64 Block gas consumption GasLimituint64 Block gas limit
BlockMessage also contains:
Field Description TransactionsAll transactions in the block WithdrawalsValidator withdrawals (post-Shanghai upgrade) BlobGasUsedBlob gas consumption (EIP-4844)
Transaction-Level Data
Field Type Description Hashbytes Transaction hash Indexuint32 Index within block Frombytes Sender address Tobytes Recipient address ValueBigInt Transfer amount (wei) Nonceuint64 Sender nonce Typeuint32 Transaction type (0/1/2)
Field Description StatusExecution status (1=success, 0=failure) GasUsedActual gas consumption CumulativeGasUsedCumulative gas consumption ContractAddressCreated contract address (if any)
TransactionFee — Fee Details
Field Description SenderFeeTotal fee paid by sender MinerRewardMiner/validator reward BurntEIP-1559 burnt portion SavingsSaved gas fees
Calls — Internal Call Traces
Contains all nested contract calls, each call includes:
From, To: Caller and callee
Input, Output: Input and output data
GasUsed: Gas consumption
Opcode: Operation code (CALL/DELEGATECALL/STATICCALL)
Signature: Function signature
Logs: Event logs
StateChanges: State changes
ReturnValues: Return values
Balance Updates
Token Balance Updates
Native Balance Updates
ERC-20/721/1155 Token balance changes: Field Description TokenToken info (address, fungibility, decimals, total supply) WalletWallet address PostBalanceBalance after transaction
Native currency (e.g., ETH) balance changes: Field Description AddressAddress PreBalanceBalance before transaction PostBalanceBalance after transaction ReasonCodeChange reason code
Transfer Data
TransfersMessage provides EVM chain transfer information (Topic: {chain}.v1.transfers.proto).
TransfersMessage Structure
message TransfersMessage {
Chain Chain = 1 ;
BlockHeader Header = 2 ;
repeated Transfer Transfers = 5 ;
optional BlockHeader L1Header = 6 ; // Included for L2 chains
}
Transfer Structure
Field Type Description CallIndexuint64 Call index LogIndexuint64 Log index Senderstring Sender address Receiverstring Receiver address Amountstring Transfer amount Idstring Token ID (NFT) URIstring Token URI CurrencyTokenInfo Token information Successbool Success status Indexuint32 Transfer index TransactionHeaderTransactionHeader Transaction header
TokenInfo Structure
Field Type Description SmartContractstring Contract address Delegatedbool Is delegated DelegatedTostring Delegation address ProtocolNamestring Protocol name Namestring Token name Symbolstring Token symbol Decimalsint32 Decimals HasURIbool Has URI Fungiblebool Is fungible AssetIdstring Asset ID
Supported Token Standards
Standard Description ERC-20 Fungible tokens ERC-721 Non-fungible tokens (NFT) ERC-1155 Multi-asset tokens
DEX Data
TradeEvents provides DEX trade data (Topic: {chain}.dex.trades).
TradeEvent Structure
message TradeEvent {
Instruction instruction = 1 ;
Block block = 2 ;
Transaction transaction = 3 ;
DApp d_app = 4 ;
Trade trade = 100 ;
BondingCurve bonding_curve = 110 ;
TradeProcessed trade_processed = 200 ; // included in processed topic
}
Trade Core Fields
Field Description token_a_address / token_b_addressTrading pair token addresses token_a_decimals / token_b_decimalsToken decimals user_a_amount / user_b_amountUser trade amounts user_a_pre_amount / user_a_post_amountUser balance before/after pool_addressPool address vault_a / vault_bPool vault addresses vault_a_amount / vault_b_amountVault amounts vault_a_pre_amount / vault_a_post_amountVault balance before/after was_original_directionIs original direction pool_config_addressPool config address
TradeProcessed Enhanced Fields
.processed topic includes additional fields:
Field Description token_a_price_in_usd / token_b_price_in_usdUSD price token_a_price_in_native / token_b_price_in_nativeNative currency price is_token_a_price_in_usd_suspectIs price suspicious is_token_a_price_in_usd_suspect_reasonSuspicious reason (e.g., price volatility, small amount)
DexPoolEvent - Liquidity Pools
DexPoolEvents provides liquidity pool events (Topic: {chain}.dex.pools).
Field Description typeEvent type (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) addressPool address token_a_address / token_b_addressToken addresses token_a_vault_address / token_b_vault_addressVault addresses token_a_amount / token_b_amountToken amounts lp_walletLP wallet address
EVM Chain Characteristics
Gas & EIP-1559 Fee Model
EVM uses Gas mechanism to measure computational resources. EIP-1559 introduced dynamic BaseFee adjustment and fee burning, transaction fees split into:
Validator reward : MinerReward
Burnt portion : Burnt
Layer 2 Support
EVM Streams provides dedicated fields for L2 chains:
Field Description Applicable Chains L1HeaderCorresponding L1 block information All L2 SequenceNumberSequence number Optimism BatcherAddrBatcher address Optimism L1FeeOverheadL1 fee overhead Optimism GasL1L1 data cost Arbitrum
Topic → Message Type Mapping
Topic Proto File Message Type Description {chain}.dex.tradestrade_event.proto TradeEvents DEX trade events {chain}.dex.trades.processedtrade_event.proto TradeEvents With USD price, suspicious flag {chain}.tokenstoken_event.proto TokenEvents Token events {chain}.tokens.processedtoken_event.proto TokenEvents With description, image, social links {chain}.balancesbalance_event.proto BalanceEvents Balance change events {chain}.balances.processedbalance_event.proto BalanceEvents With USD value {chain}.dex.poolsdex_pool_event.proto DexPoolEvents Liquidity pool events {chain}.v1.transfers.prototransfers_message.proto TransfersMessage EVM transfer messages {chain}.candlestickscandlestick.proto CandlestickEvents Candlestick data
Replace {chain} with eth or bsc. E.g., eth.dex.trades, bsc.tokens.processed.
Code Examples
Python Example: Consume DEX Trade Data
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'eth.dex.trades' ,
bootstrap_servers = [ '<your_broker_address>' ],
security_protocol = 'SASL_SSL' ,
sasl_mechanism = 'SCRAM-SHA-512' ,
sasl_plain_username = 'your_username' ,
sasl_plain_password = 'your_password' ,
auto_offset_reset = 'latest' ,
enable_auto_commit = False ,
group_id = 'my-dex-consumer'
)
# Consume and parse messages
for message in consumer:
# Parse protobuf message
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
# Iterate DEX trades
for event in trade_events.events:
print ( f "Pool: { event.trade.pool_address } " )
print ( f "Token A: { event.trade.token_a_address } " )
print ( f "Token B: { event.trade.token_b_address } " )
print ( f "Amount A: { event.trade.user_a_amount } " )
print ( f "Amount B: { event.trade.user_b_amount } " )
print ( f "Block: { event.block.height } , Tx: { event.transaction.signature } " )
print ( "---" )
JavaScript Example
const { Kafka } = require ( 'kafkajs' );
const protobuf = require ( 'protobufjs' );
const kafka = new Kafka ({
clientId: 'my-app' ,
brokers: [ '<your_broker_address>' ],
ssl: true ,
sasl: {
mechanism: 'scram-sha-512' ,
username: 'your_username' ,
password: 'your_password'
}
});
const consumer = kafka . consumer ({ groupId: 'my-dex-consumer' });
async function run () {
// Load protobuf definitions
const root = await protobuf . load ( 'common/trade_event.proto' );
const TradeEvents = root . lookupType ( 'io.chainstream.v1.dex.trade.TradeEvents' );
await consumer . connect ();
await consumer . subscribe ({ topic: 'eth.dex.trades' , fromBeginning: false });
await consumer . run ({
eachMessage : async ({ message }) => {
const tradeEvents = TradeEvents . decode ( message . value );
tradeEvents . events . forEach ( event => {
console . log ( `Pool: ${ event . trade . poolAddress } ` );
console . log ( `Token A: ${ event . trade . tokenAAddress } ` );
console . log ( `Token B: ${ event . trade . tokenBAddress } ` );
console . log ( `Amount A: ${ event . trade . userAAmount } ` );
console . log ( `Amount B: ${ event . trade . userBAmount } ` );
console . log ( `Block: ${ event . block . height } ` );
});
}
});
}
run (). catch ( console . error );
Concepts & Integration Guide Kafka Streams integration basics
Solana Streams Solana high-throughput data streams
TRON Streams TRON network data streams
WebSocket Real-time Data WebSocket integration