EVM 체인은 네트워크에 따라 블록 간격이 다릅니다 (Ethereum 메인넷 약 12초/블록). Ethereum, BSC, Base, Polygon (Matic), Optimism 등의 네트워크를 지원하며, 통합 Protobuf Schema를 공유합니다.
Schema 저장소 : github.com/chainstream-io/streaming_protobuf/evm
메시지 유형 개요
EVM Streams는 다음 메시지 유형을 제공합니다:
메시지 유형 설명 Topic TradeEvents DEX 거래 이벤트 {chain}.dex.tradesTokenEvents 토큰 이벤트 {chain}.tokensBalanceEvents 잔고 변동 이벤트 {chain}.balancesDexPoolEvents 유동성 풀 이벤트 {chain}.dex.poolsTransfersMessage 전송 메시지 {chain}.v1.transfers.protoCandlestickEvents 캔들스틱 데이터 {chain}.candlesticks
블록 레벨 데이터
각 블록에는 BlockHeader가 포함되며, 핵심 필드는 다음과 같습니다:
필드 유형 설명 Numberuint64 블록 번호 Hashbytes 블록 해시 ParentHashbytes 부모 블록 해시 TimestampTimestamp 블록 시간 BaseFeePerGasuint64 EIP-1559 기본 수수료 GasUseduint64 블록 Gas 소비량 GasLimituint64 블록 Gas 한도
BlockMessage에는 다음도 포함됩니다:
필드 설명 Transactions블록 내 모든 트랜잭션 Withdrawals검증자 인출 (Shanghai 업그레이드 이후) BlobGasUsedBlob Gas 소비량 (EIP-4844)
트랜잭션 레벨 데이터
필드 유형 설명 Hashbytes 트랜잭션 해시 Indexuint32 블록 내 인덱스 Frombytes 발신자 주소 Tobytes 수신자 주소 ValueBigInt 전송 금액 (wei) Nonceuint64 발신자 nonce Typeuint32 트랜잭션 유형 (0/1/2)
필드 설명 Status실행 상태 (1=성공, 0=실패) GasUsed실제 Gas 소비량 CumulativeGasUsed누적 Gas 소비량 ContractAddress생성된 컨트랙트 주소 (있는 경우)
TransactionFee — 수수료 상세
필드 설명 SenderFee발신자가 지불한 총 수수료 MinerReward마이너/검증자 보상 BurntEIP-1559 소각분 Savings절약된 Gas 수수료
Calls — 내부 호출 트레이스s
모든 중첩된 컨트랙트 호출을 포함하며, 각 call에는 다음이 포함됩니다:
From, To: 호출자와 피호출자
Input, Output: 입출력 데이터
GasUsed: Gas 소비량
Opcode: 연산 코드 (CALL/DELEGATECALL/STATICCALL)
Signature: 함수 시그니처
Logs: 이벤트 로그
StateChanges: 상태 변경
ReturnValues: 반환 값
Balance Updates
Token Balance Updates
Native Balance Updates
ERC-20/721/1155 토큰 잔고 변동: 필드 설명 Token토큰 정보 (주소, 대체 가능 여부, 소수점 자릿수, 총 공급량) Wallet지갑 주소 PostBalance트랜잭션 후 잔고
네이티브 통화 (예: ETH) 잔고 변동: 필드 설명 Address주소 PreBalance트랜잭션 전 잔고 PostBalance트랜잭션 후 잔고 ReasonCode변동 사유 코드
전송 데이터
TransfersMessage는 EVM 체인 전송 정보를 제공합니다 (Topic: {chain}.v1.transfers.proto).
TransfersMessage 구조
message TransfersMessage {
Chain Chain = 1 ;
BlockHeader Header = 2 ;
repeated Transfer Transfers = 5 ;
optional BlockHeader L1Header = 6 ; // Included for L2 chains
}
Transfer 구조
필드 유형 설명 CallIndexuint64 호출 인덱스 LogIndexuint64 로그 인덱스 Senderstring 발신자 주소 Receiverstring 수신자 주소 Amountstring 전송 금액 Idstring 토큰 ID (NFT) URIstring 토큰 URI CurrencyTokenInfo 토큰 정보 Successbool 성공 상태 Indexuint32 전송 인덱스 TransactionHeaderTransactionHeader 트랜잭션 헤더
TokenInfo 구조
필드 유형 설명 SmartContractstring 컨트랙트 주소 Delegatedbool 위임 여부 DelegatedTostring 위임 주소 ProtocolNamestring 프로토콜 이름 Namestring 토큰 이름 Symbolstring 토큰 심볼 Decimalsint32 소수점 자릿수 HasURIbool URI 존재 여부 Fungiblebool 대체 가능 여부 AssetIdstring 자산 ID
지원하는 토큰 표준
표준 설명 ERC-20 대체 가능 토큰 ERC-721 대체 불가능 토큰 (NFT) ERC-1155 멀티 자산 토큰
DEX 데이터
TradeEvents는 DEX 거래 데이터를 제공합니다 (Topic: {chain}.dex.trades).
TradeEvent 구조
message TradeEvent {
Instruction instruction = 1 ;
Block block = 2 ;
Transaction transaction = 3 ;
DApp d_app = 4 ;
Trade trade = 100 ;
BondingCurve bonding_curve = 110 ;
TradeProcessed trade_processed = 200 ; // included in processed topic
}
Trade 핵심 필드
필드 설명 token_a_address / token_b_address거래 쌍 토큰 주소 token_a_decimals / token_b_decimals토큰 소수점 자릿수 user_a_amount / user_b_amount사용자 거래 수량 user_a_pre_amount / user_a_post_amount사용자 거래 전후 잔고 pool_address풀 주소 vault_a / vault_b풀 Vault 주소 vault_a_amount / vault_b_amountVault 수량 vault_a_pre_amount / vault_a_post_amountVault 거래 전후 잔고 was_original_direction원래 방향 여부 pool_config_address풀 구성 주소
TradeProcessed 확장 필드
.processed topic에는 추가 필드가 포함됩니다:
필드 설명 token_a_price_in_usd / token_b_price_in_usdUSD 가격 token_a_price_in_native / token_b_price_in_native네이티브 통화 가격 is_token_a_price_in_usd_suspect가격 의심 여부 is_token_a_price_in_usd_suspect_reason의심 사유 (예: 가격 변동, 소액 거래)
DexPoolEvent - 유동성 풀
DexPoolEvents는 유동성 풀 이벤트를 제공합니다 (Topic: {chain}.dex.pools).
필드 설명 type이벤트 유형 (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) address풀 주소 token_a_address / token_b_address토큰 주소 token_a_vault_address / token_b_vault_addressVault 주소 token_a_amount / token_b_amount토큰 수량 lp_walletLP 지갑 주소
EVM 체인 특성
Gas 및 EIP-1559 수수료 모델
EVM은 Gas 메커니즘을 사용하여 계산 리소스를 측정합니다. EIP-1559는 동적 BaseFee 조정과 수수료 소각을 도입했으며, 트랜잭션 수수료는 다음으로 나뉩니다:
검증자 보상 : MinerReward
소각분 : Burnt
Layer 2 지원
EVM Streams는 L2 체인을 위한 전용 필드를 제공합니다:
필드 설명 적용 체인 L1Header해당 L1 블록 정보 모든 L2 SequenceNumber시퀀스 번호 Optimism BatcherAddrBatcher 주소 Optimism L1FeeOverheadL1 수수료 오버헤드 Optimism GasL1L1 데이터 비용 Arbitrum
Topic → 메시지 유형 매핑
Topic Proto File 메시지 유형 설명 {chain}.dex.tradestrade_event.proto TradeEvents DEX 거래 이벤트 {chain}.dex.trades.processedtrade_event.proto TradeEvents USD 가격, 의심 플래그 포함 {chain}.tokenstoken_event.proto TokenEvents 토큰 이벤트 {chain}.tokens.processedtoken_event.proto TokenEvents 설명, 이미지, SNS 링크 포함 {chain}.balancesbalance_event.proto BalanceEvents 잔고 변동 이벤트 {chain}.balances.processedbalance_event.proto BalanceEvents USD 가치 포함 {chain}.dex.poolsdex_pool_event.proto DexPoolEvents 유동성 풀 이벤트 {chain}.v1.transfers.prototransfers_message.proto TransfersMessage EVM 전송 메시지 {chain}.candlestickscandlestick.proto CandlestickEvents 캔들스틱 데이터
{chain}을 eth 또는 bsc로 교체하세요. 예: eth.dex.trades, bsc.tokens.processed.
코드 예제
Python 예제: DEX 거래 데이터 소비
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'eth.dex.trades' ,
bootstrap_servers = [ '<your_broker_address>' ],
security_protocol = 'SASL_SSL' ,
sasl_mechanism = 'SCRAM-SHA-512' ,
sasl_plain_username = 'your_username' ,
sasl_plain_password = 'your_password' ,
auto_offset_reset = 'latest' ,
enable_auto_commit = False ,
group_id = 'my-dex-consumer'
)
# Consume and parse messages
for message in consumer:
# Parse protobuf message
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
# Iterate DEX trades
for event in trade_events.events:
print ( f "Pool: { event.trade.pool_address } " )
print ( f "Token A: { event.trade.token_a_address } " )
print ( f "Token B: { event.trade.token_b_address } " )
print ( f "Amount A: { event.trade.user_a_amount } " )
print ( f "Amount B: { event.trade.user_b_amount } " )
print ( f "Block: { event.block.height } , Tx: { event.transaction.signature } " )
print ( "---" )
JavaScript 예제
const { Kafka } = require ( 'kafkajs' );
const protobuf = require ( 'protobufjs' );
const kafka = new Kafka ({
clientId: 'my-app' ,
brokers: [ '<your_broker_address>' ],
ssl: true ,
sasl: {
mechanism: 'scram-sha-512' ,
username: 'your_username' ,
password: 'your_password'
}
});
const consumer = kafka . consumer ({ groupId: 'my-dex-consumer' });
async function run () {
// Load protobuf definitions
const root = await protobuf . load ( 'common/trade_event.proto' );
const TradeEvents = root . lookupType ( 'io.chainstream.v1.dex.trade.TradeEvents' );
await consumer . connect ();
await consumer . subscribe ({ topic: 'eth.dex.trades' , fromBeginning: false });
await consumer . run ({
eachMessage : async ({ message }) => {
const tradeEvents = TradeEvents . decode ( message . value );
tradeEvents . events . forEach ( event => {
console . log ( `Pool: ${ event . trade . poolAddress } ` );
console . log ( `Token A: ${ event . trade . tokenAAddress } ` );
console . log ( `Token B: ${ event . trade . tokenBAddress } ` );
console . log ( `Amount A: ${ event . trade . userAAmount } ` );
console . log ( `Amount B: ${ event . trade . userBAmount } ` );
console . log ( `Block: ${ event . block . height } ` );
});
}
});
}
run (). catch ( console . error );
관련 문서
개념 및 통합 가이드 Kafka Streams 통합 기초
Solana Streams Solana 고처리량 데이터 스트림
TRON Streams TRON 네트워크 데이터 스트림
WebSocket 실시간 데이터 WebSocket 통합