메인 콘텐츠로 건너뛰기
EVM 체인은 네트워크에 따라 블록 간격이 다릅니다 (Ethereum 메인넷 약 12초/블록). Ethereum, BSC, Base, Polygon (Matic), Optimism 등의 네트워크를 지원하며, 통합 Protobuf Schema를 공유합니다. Schema 저장소: github.com/chainstream-io/streaming_protobuf/evm

메시지 유형 개요

EVM Streams는 다음 메시지 유형을 제공합니다:
메시지 유형설명Topic
TradeEventsDEX 거래 이벤트{chain}.dex.trades
TokenEvents토큰 이벤트{chain}.tokens
BalanceEvents잔고 변동 이벤트{chain}.balances
DexPoolEvents유동성 풀 이벤트{chain}.dex.pools
TransfersMessage전송 메시지{chain}.v1.transfers.proto
CandlestickEvents캔들스틱 데이터{chain}.candlesticks

블록 레벨 데이터

각 블록에는 BlockHeader가 포함되며, 핵심 필드는 다음과 같습니다:
필드유형설명
Numberuint64블록 번호
Hashbytes블록 해시
ParentHashbytes부모 블록 해시
TimestampTimestamp블록 시간
BaseFeePerGasuint64EIP-1559 기본 수수료
GasUseduint64블록 Gas 소비량
GasLimituint64블록 Gas 한도
BlockMessage에는 다음도 포함됩니다:
필드설명
Transactions블록 내 모든 트랜잭션
Withdrawals검증자 인출 (Shanghai 업그레이드 이후)
BlobGasUsedBlob Gas 소비량 (EIP-4844)

트랜잭션 레벨 데이터

TransactionHeader — 핵심 트랜잭션 메타데이터

필드유형설명
Hashbytes트랜잭션 해시
Indexuint32블록 내 인덱스
Frombytes발신자 주소
Tobytes수신자 주소
ValueBigInt전송 금액 (wei)
Nonceuint64발신자 nonce
Typeuint32트랜잭션 유형 (0/1/2)

ReceiptHeader — 실행 결과

필드설명
Status실행 상태 (1=성공, 0=실패)
GasUsed실제 Gas 소비량
CumulativeGasUsed누적 Gas 소비량
ContractAddress생성된 컨트랙트 주소 (있는 경우)

TransactionFee — 수수료 상세

필드설명
SenderFee발신자가 지불한 총 수수료
MinerReward마이너/검증자 보상
BurntEIP-1559 소각분
Savings절약된 Gas 수수료

Calls — 내부 호출 트레이스s

모든 중첩된 컨트랙트 호출을 포함하며, 각 call에는 다음이 포함됩니다:
  • From, To: 호출자와 피호출자
  • Input, Output: 입출력 데이터
  • GasUsed: Gas 소비량
  • Opcode: 연산 코드 (CALL/DELEGATECALL/STATICCALL)
  • Signature: 함수 시그니처
  • Logs: 이벤트 로그
  • StateChanges: 상태 변경
  • ReturnValues: 반환 값

Balance Updates

ERC-20/721/1155 토큰 잔고 변동:
필드설명
Token토큰 정보 (주소, 대체 가능 여부, 소수점 자릿수, 총 공급량)
Wallet지갑 주소
PostBalance트랜잭션 후 잔고

전송 데이터

TransfersMessage는 EVM 체인 전송 정보를 제공합니다 (Topic: {chain}.v1.transfers.proto).

TransfersMessage 구조

message TransfersMessage {
  Chain Chain = 1;
  BlockHeader Header = 2;
  repeated Transfer Transfers = 5;
  optional BlockHeader L1Header = 6;  // Included for L2 chains
}

Transfer 구조

필드유형설명
CallIndexuint64호출 인덱스
LogIndexuint64로그 인덱스
Senderstring발신자 주소
Receiverstring수신자 주소
Amountstring전송 금액
Idstring토큰 ID (NFT)
URIstring토큰 URI
CurrencyTokenInfo토큰 정보
Successbool성공 상태
Indexuint32전송 인덱스
TransactionHeaderTransactionHeader트랜잭션 헤더

TokenInfo 구조

필드유형설명
SmartContractstring컨트랙트 주소
Delegatedbool위임 여부
DelegatedTostring위임 주소
ProtocolNamestring프로토콜 이름
Namestring토큰 이름
Symbolstring토큰 심볼
Decimalsint32소수점 자릿수
HasURIboolURI 존재 여부
Fungiblebool대체 가능 여부
AssetIdstring자산 ID

지원하는 토큰 표준

표준설명
ERC-20대체 가능 토큰
ERC-721대체 불가능 토큰 (NFT)
ERC-1155멀티 자산 토큰

DEX 데이터

TradeEvents는 DEX 거래 데이터를 제공합니다 (Topic: {chain}.dex.trades).

TradeEvent 구조

message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // included in processed topic
}

Trade 핵심 필드

필드설명
token_a_address / token_b_address거래 쌍 토큰 주소
token_a_decimals / token_b_decimals토큰 소수점 자릿수
user_a_amount / user_b_amount사용자 거래 수량
user_a_pre_amount / user_a_post_amount사용자 거래 전후 잔고
pool_address풀 주소
vault_a / vault_b풀 Vault 주소
vault_a_amount / vault_b_amountVault 수량
vault_a_pre_amount / vault_a_post_amountVault 거래 전후 잔고
was_original_direction원래 방향 여부
pool_config_address풀 구성 주소

TradeProcessed 확장 필드

.processed topic에는 추가 필드가 포함됩니다:
필드설명
token_a_price_in_usd / token_b_price_in_usdUSD 가격
token_a_price_in_native / token_b_price_in_native네이티브 통화 가격
is_token_a_price_in_usd_suspect가격 의심 여부
is_token_a_price_in_usd_suspect_reason의심 사유 (예: 가격 변동, 소액 거래)

DexPoolEvent - 유동성 풀

DexPoolEvents는 유동성 풀 이벤트를 제공합니다 (Topic: {chain}.dex.pools).
필드설명
type이벤트 유형 (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
address풀 주소
token_a_address / token_b_address토큰 주소
token_a_vault_address / token_b_vault_addressVault 주소
token_a_amount / token_b_amount토큰 수량
lp_walletLP 지갑 주소

EVM 체인 특성

Gas 및 EIP-1559 수수료 모델

EVM은 Gas 메커니즘을 사용하여 계산 리소스를 측정합니다. EIP-1559는 동적 BaseFee 조정과 수수료 소각을 도입했으며, 트랜잭션 수수료는 다음으로 나뉩니다:
  • 검증자 보상: MinerReward
  • 소각분: Burnt

Layer 2 지원

EVM Streams는 L2 체인을 위한 전용 필드를 제공합니다:
필드설명적용 체인
L1Header해당 L1 블록 정보모든 L2
SequenceNumber시퀀스 번호Optimism
BatcherAddrBatcher 주소Optimism
L1FeeOverheadL1 수수료 오버헤드Optimism
GasL1L1 데이터 비용Arbitrum

Topic → 메시지 유형 매핑

TopicProto File메시지 유형설명
{chain}.dex.tradestrade_event.protoTradeEventsDEX 거래 이벤트
{chain}.dex.trades.processedtrade_event.protoTradeEventsUSD 가격, 의심 플래그 포함
{chain}.tokenstoken_event.protoTokenEvents토큰 이벤트
{chain}.tokens.processedtoken_event.protoTokenEvents설명, 이미지, SNS 링크 포함
{chain}.balancesbalance_event.protoBalanceEvents잔고 변동 이벤트
{chain}.balances.processedbalance_event.protoBalanceEventsUSD 가치 포함
{chain}.dex.poolsdex_pool_event.protoDexPoolEvents유동성 풀 이벤트
{chain}.v1.transfers.prototransfers_message.protoTransfersMessageEVM 전송 메시지
{chain}.candlestickscandlestick.protoCandlestickEvents캔들스틱 데이터
{chain}eth 또는 bsc로 교체하세요. 예: eth.dex.trades, bsc.tokens.processed.

코드 예제

Python 예제: DEX 거래 데이터 소비

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# Consume and parse messages
for message in consumer:
    # Parse protobuf message
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # Iterate DEX trades
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}, Tx: {event.transaction.signature}")
        print("---")

JavaScript 예제

const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['<your_broker_address>'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-512',
    username: 'your_username',
    password: 'your_password'
  }
});

const consumer = kafka.consumer({ groupId: 'my-dex-consumer' });

async function run() {
  // Load protobuf definitions
  const root = await protobuf.load('common/trade_event.proto');
  const TradeEvents = root.lookupType('io.chainstream.v1.dex.trade.TradeEvents');

  await consumer.connect();
  await consumer.subscribe({ topic: 'eth.dex.trades', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const tradeEvents = TradeEvents.decode(message.value);
      
      tradeEvents.events.forEach(event => {
        console.log(`Pool: ${event.trade.poolAddress}`);
        console.log(`Token A: ${event.trade.tokenAAddress}`);
        console.log(`Token B: ${event.trade.tokenBAddress}`);
        console.log(`Amount A: ${event.trade.userAAmount}`);
        console.log(`Amount B: ${event.trade.userBAmount}`);
        console.log(`Block: ${event.block.height}`);
      });
    }
  });
}

run().catch(console.error);

관련 문서

개념 및 통합 가이드

Kafka Streams 통합 기초

Solana Streams

Solana 고처리량 데이터 스트림

TRON Streams

TRON 네트워크 데이터 스트림

WebSocket 실시간 데이터

WebSocket 통합