EVM chains have different block intervals depending on the network (Ethereum mainnet ~12 seconds/block), supporting Ethereum, BSC, Base, Polygon (Matic), Optimism and other networks, sharing a unified Protobuf Schema.
Schema Repository: github.com/chainstream-io/streaming_protobuf/evm
Message Types Overview
EVM Streams provides the following message types:
| Message Type | Description | Topic |
|---|
| TradeEvents | DEX trade events | {chain}.dex.trades |
| TokenEvents | Token events | {chain}.tokens |
| BalanceEvents | Balance change events | {chain}.balances |
| DexPoolEvents | Liquidity pool events | {chain}.dex.pools |
| TransfersMessage | Transfer messages | {chain}.v1.transfers.proto |
| CandlestickEvents | Candlestick data | {chain}.candlesticks |
Block-Level Data
Each block contains BlockHeader with core fields:
| Field | Type | Description |
|---|
Number | uint64 | Block number |
Hash | bytes | Block hash |
ParentHash | bytes | Parent block hash |
Timestamp | Timestamp | Block time |
BaseFeePerGas | uint64 | EIP-1559 base fee |
GasUsed | uint64 | Block gas consumption |
GasLimit | uint64 | Block gas limit |
BlockMessage also contains:
| Field | Description |
|---|
Transactions | All transactions in the block |
Withdrawals | Validator withdrawals (post-Shanghai upgrade) |
BlobGasUsed | Blob gas consumption (EIP-4844) |
Transaction-Level Data
| Field | Type | Description |
|---|
Hash | bytes | Transaction hash |
Index | uint32 | Index within block |
From | bytes | Sender address |
To | bytes | Recipient address |
Value | BigInt | Transfer amount (wei) |
Nonce | uint64 | Sender nonce |
Type | uint32 | Transaction type (0/1/2) |
| Field | Description |
|---|
Status | Execution status (1=success, 0=failure) |
GasUsed | Actual gas consumption |
CumulativeGasUsed | Cumulative gas consumption |
ContractAddress | Created contract address (if any) |
TransactionFee — Fee Details
| Field | Description |
|---|
SenderFee | Total fee paid by sender |
MinerReward | Miner/validator reward |
Burnt | EIP-1559 burnt portion |
Savings | Saved gas fees |
Calls — Internal Call Traces
Contains all nested contract calls, each call includes:
From, To: Caller and callee
Input, Output: Input and output data
GasUsed: Gas consumption
Opcode: Operation code (CALL/DELEGATECALL/STATICCALL)
Signature: Function signature
Logs: Event logs
StateChanges: State changes
ReturnValues: Return values
Balance Updates
Token Balance Updates
Native Balance Updates
ERC-20/721/1155 Token balance changes:| Field | Description |
|---|
Token | Token info (address, fungibility, decimals, total supply) |
Wallet | Wallet address |
PostBalance | Balance after transaction |
Native currency (e.g., ETH) balance changes:| Field | Description |
|---|
Address | Address |
PreBalance | Balance before transaction |
PostBalance | Balance after transaction |
ReasonCode | Change reason code |
Transfer Data
TransfersMessage provides EVM chain transfer information (Topic: {chain}.v1.transfers.proto).
TransfersMessage Structure
message TransfersMessage {
Chain Chain = 1;
BlockHeader Header = 2;
repeated Transfer Transfers = 5;
optional BlockHeader L1Header = 6; // Included for L2 chains
}
Transfer Structure
| Field | Type | Description |
|---|
CallIndex | uint64 | Call index |
LogIndex | uint64 | Log index |
Sender | string | Sender address |
Receiver | string | Receiver address |
Amount | string | Transfer amount |
Id | string | Token ID (NFT) |
URI | string | Token URI |
Currency | TokenInfo | Token information |
Success | bool | Success status |
Index | uint32 | Transfer index |
TransactionHeader | TransactionHeader | Transaction header |
TokenInfo Structure
| Field | Type | Description |
|---|
SmartContract | string | Contract address |
Delegated | bool | Is delegated |
DelegatedTo | string | Delegation address |
ProtocolName | string | Protocol name |
Name | string | Token name |
Symbol | string | Token symbol |
Decimals | int32 | Decimals |
HasURI | bool | Has URI |
Fungible | bool | Is fungible |
AssetId | string | Asset ID |
Supported Token Standards
| Standard | Description |
|---|
| ERC-20 | Fungible tokens |
| ERC-721 | Non-fungible tokens (NFT) |
| ERC-1155 | Multi-asset tokens |
DEX Data
TradeEvents provides DEX trade data (Topic: {chain}.dex.trades).
TradeEvent Structure
message TradeEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Trade trade = 100;
BondingCurve bonding_curve = 110;
TradeProcessed trade_processed = 200; // included in processed topic
}
Trade Core Fields
| Field | Description |
|---|
token_a_address / token_b_address | Trading pair token addresses |
token_a_decimals / token_b_decimals | Token decimals |
user_a_amount / user_b_amount | User trade amounts |
user_a_pre_amount / user_a_post_amount | User balance before/after |
pool_address | Pool address |
vault_a / vault_b | Pool vault addresses |
vault_a_amount / vault_b_amount | Vault amounts |
vault_a_pre_amount / vault_a_post_amount | Vault balance before/after |
was_original_direction | Is original direction |
pool_config_address | Pool config address |
TradeProcessed Enhanced Fields
.processed topic includes additional fields:
| Field | Description |
|---|
token_a_price_in_usd / token_b_price_in_usd | USD price |
token_a_price_in_native / token_b_price_in_native | Native currency price |
is_token_a_price_in_usd_suspect | Is price suspicious |
is_token_a_price_in_usd_suspect_reason | Suspicious reason (e.g., price volatility, small amount) |
DexPoolEvent - Liquidity Pools
DexPoolEvents provides liquidity pool events (Topic: {chain}.dex.pools).
| Field | Description |
|---|
type | Event type (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | Pool address |
token_a_address / token_b_address | Token addresses |
token_a_vault_address / token_b_vault_address | Vault addresses |
token_a_amount / token_b_amount | Token amounts |
lp_wallet | LP wallet address |
EVM Chain Characteristics
Gas & EIP-1559 Fee Model
EVM uses Gas mechanism to measure computational resources. EIP-1559 introduced dynamic BaseFee adjustment and fee burning, transaction fees split into:
- Validator reward:
MinerReward
- Burnt portion:
Burnt
Layer 2 Support
EVM Streams provides dedicated fields for L2 chains:
| Field | Description | Applicable Chains |
|---|
L1Header | Corresponding L1 block information | All L2 |
SequenceNumber | Sequence number | Optimism |
BatcherAddr | Batcher address | Optimism |
L1FeeOverhead | L1 fee overhead | Optimism |
GasL1 | L1 data cost | Arbitrum |
Topic → Message Type Mapping
| Topic | Proto File | Message Type | Description |
|---|
{chain}.dex.trades | trade_event.proto | TradeEvents | DEX trade events |
{chain}.dex.trades.processed | trade_event.proto | TradeEvents | With USD price, suspicious flag |
{chain}.tokens | token_event.proto | TokenEvents | Token events |
{chain}.tokens.processed | token_event.proto | TokenEvents | With description, image, social links |
{chain}.balances | balance_event.proto | BalanceEvents | Balance change events |
{chain}.balances.processed | balance_event.proto | BalanceEvents | With USD value |
{chain}.dex.pools | dex_pool_event.proto | DexPoolEvents | Liquidity pool events |
{chain}.v1.transfers.proto | transfers_message.proto | TransfersMessage | EVM transfer messages |
{chain}.candlesticks | candlestick.proto | CandlestickEvents | Candlestick data |
Replace {chain} with eth or bsc. E.g., eth.dex.trades, bsc.tokens.processed.
Code Examples
Python Example: Consume DEX Trade Data
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'eth.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-dex-consumer'
)
# Consume and parse messages
for message in consumer:
# Parse protobuf message
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
# Iterate DEX trades
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
print("---")
JavaScript Example
const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['<your_broker_address>'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: 'your_username',
password: 'your_password'
}
});
const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });
async function run() {
// Load protobuf definitions
const root = await protobuf.load('common/trade_event.proto');
const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');
await consumer.connect();
await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const tradeEvents = TradeEvents.decode(message.value);
tradeEvents.events.forEach(event => {
console.log(`Pool: ${event.trade.poolAddress}`);
console.log(`Token A: ${event.trade.tokenAAddress}`);
console.log(`Token B: ${event.trade.tokenBAddress}`);
console.log(`Amount A: ${event.trade.userAAmount}`);
console.log(`Amount B: ${event.trade.userBAmount}`);
console.log(`Block: ${event.block.height}`);
});
}
});
}
run().catch(console.error);