메인 콘텐츠로 건너뛰기
Solana는 400ms 블록 시간을 목표로 하며, 실제 처리량은 약 4,000 TPS, 이론적 피크는 65,000 TPS입니다. 매우 높은 메시지 양으로 인해 컨슈머 처리 능력에 높은 요구사항이 필요합니다. Schema 저장소: github.com/chainstream-io/streaming_protobuf/solana

메시지 유형 개요

Solana Streams는 다음 메시지 유형을 제공합니다:
메시지 유형설명Topic
TradeEventsDEX 거래 이벤트sol.dex.trades
TokenEvents토큰 이벤트sol.tokens
BalanceEvents잔고 변동 이벤트sol.balances
DexPoolEvents유동성 풀 이벤트sol.dex.pools
TransferEvents전송 이벤트sol.transfers
CandlestickEvents캔들스틱 데이터sol.candlesticks

블록 레벨 데이터

Solana는 기존 블록 번호 대신 Slot을 타임라인 식별자로 사용합니다.
필드유형설명
Slotuint64Slot 번호 (기본 식별자)
BlockHeightuint64블록 높이
BlockHashbytes블록 해시
ParentSlotuint64부모 Slot
TimestampTimestamp블록 시간
Solana의 shred stream에서 Block Header가 불완전할 수 있으며, Slot 필드만 정확함이 보장됩니다.

트랜잭션 레벨 데이터

트랜잭션 핵심 필드

필드유형설명
Signaturebytes트랜잭션 서명 (고유 식별자)
StatusStatus실행 상태
HeaderTransactionHeader메타데이터 (수수료 및 서명자 포함)
Indexuint32블록 내 위치

Instructions

트랜잭션에는 여러 Instruction이 포함되며, 이는 Solana 실행 모델의 핵심입니다:
필드설명
ProgramAccountIndex호출된 프로그램
Data인코딩된 명령어 데이터
AccountIndexes참조된 계정
BalanceUpdates이 명령어로 인한 SOL 잔고 변동
하나의 Solana 트랜잭션에는 여러 Instruction이 포함될 수 있으며, 각 Instruction은 서로 다른 프로그램을 호출합니다.

전송 데이터

TransferEvents는 Solana 전송 정보를 제공합니다 (Topic: sol.transfers).

TransferEvent 구조

message TransferEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Transfer transfer = 100;
  TransferProcessed transfer_processed = 200;  // included in processed topic
}

Transfer 핵심 필드

필드유형설명
sender_token_account_addressstring발신자 토큰 계정 주소
sender_account_owner_addressstring발신자 계정 소유자 주소
receiver_token_account_addressstring수신자 토큰 계정 주소
receiver_account_owner_addressstring수신자 계정 소유자 주소
token_addressstring토큰 주소 (Mint)
token_amountstring전송 수량

토큰 메타데이터 (TokenEvent)

토큰 이벤트는 공통 TokenEvents (Topic: sol.tokens)를 사용하며, 다음을 포함합니다:
필드설명
address토큰 주소
name / symbol이름 및 심볼
decimals소수점 자릿수
uri메타데이터 URI
metadata_address메타데이터 주소
creators생성자 목록 (주소, 검증 상태, 배분 비율)

SolanaExtra 고유 필드

Solana 토큰에는 추가 메타데이터 필드가 포함됩니다:
필드설명
collection_address컬렉션 주소 (NFT)
fungible대체 가능 여부
is_mutable변경 가능 여부
is_native네이티브 SOL 여부
program_address프로그램 주소
seller_fee_basis_points크리에이터 로열티 기준점 (NFT)
token_standard토큰 표준
mint_authority / freeze_authority / update_authority권한 계정
is_verified_collection검증된 컬렉션 여부

Balance Updates 계층 메커니즘 (Solana 고유)

이는 Solana 스트림 데이터의 중요한 특성으로, 잔고 업데이트가 두 수준에서 제공됩니다:
각 instruction 실행 후 즉시 잔고 변동으로, 단일 단계 작업의 직접적인 영향을 반영합니다.
Transaction
└── Instruction 1
    └── BalanceUpdate: +100 SOL
└── Instruction 2
    └── BalanceUpdate: -50 SOL
└── Instruction 3
    └── BalanceUpdate: +25 SOL
이를 통해 개발자는 세밀한 자금 흐름 추적과 최종 상태 확인을 모두 할 수 있습니다.

DEX 데이터

TradeEvents는 DEX 거래 데이터를 제공합니다 (Topic: sol.dex.trades). 공통 TradeEvent 구조를 사용합니다.

Trade 핵심 필드

필드설명
token_a_address / token_b_address거래 쌍 토큰 주소
user_a_amount / user_b_amount사용자 거래 수량
pool_address풀 주소
vault_a / vault_b풀 Vault 주소
vault_a_amount / vault_b_amountVault 수량
bonding_curve본딩 커브 정보 (해당되는 경우)

DApp 정보

필드설명
program_addressDEX 프로그램 주소 (예: Raydium, Orca)
inner_program_address내부 프로그램 주소
chain체인 식별자 (CHAIN_SOLANA)

DexPoolEvent - 유동성 풀

필드설명
type이벤트 유형 (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
address풀 주소
token_a_address / token_b_address토큰 주소
token_a_amount / token_b_amount토큰 수량
lp_walletLP 지갑 주소

Solana 체인 특성

Slot 메커니즘

Solana는 기존 블록 번호 대신 Slot을 타임라인 식별자로 사용합니다:
특성설명
블록 간격약 400ms
메시지 양EVM 체인보다 훨씬 많음
기본 식별자Slot 번호

메시지 패키징

트랜잭션은 소규모 배치로 패키징되며, 각 Kafka 메시지는 최대 250개의 트랜잭션을 포함합니다.

Block Header 완전성

Solana의 shred stream에서 Block Header가 불완전할 수 있으며, Slot 필드만 정확함이 보장됩니다. 다른 필드는 비어 있거나 부정확할 수 있습니다.

고처리량 처리 권장 사항

Solana의 매우 높은 처리량으로 인해 다음을 권장합니다:
  1. 충분한 처리 능력: 컨슈머에 충분한 CPU와 메모리 확보
  2. 병렬 처리: 멀티스레드/코루틴으로 병렬 메시지 처리
  3. 효율적인 파싱: Protobuf 파싱 코드 최적화
  4. 배치 쓰기: 영속화가 필요한 경우 데이터베이스 배치 쓰기

Topic → 메시지 유형 매핑

TopicProto File메시지 유형설명
sol.dex.tradestrade_event.protoTradeEventsDEX 거래 이벤트
sol.dex.trades.processedtrade_event.protoTradeEventsUSD 가격, 의심 플래그 포함
sol.tokenstoken_event.protoTokenEvents토큰 이벤트
sol.tokens.processedtoken_event.protoTokenEvents설명, 이미지, SNS 링크 포함
sol.balancesbalance_event.protoBalanceEvents잔고 변동 이벤트
sol.transferssolana/transfer_event.protoTransferEventsSolana 전송 이벤트
sol.transfers.processedsolana/transfer_processed_event.protoTransferProcessedEventsUSD 가치 포함
sol.dex.poolsdex_pool_event.protoDexPoolEvents유동성 풀 이벤트
sol.candlestickscandlestick.protoCandlestickEvents캔들스틱 데이터

코드 예제

Python 예제: Solana DEX 거래 소비

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'sol.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-solana-consumer'
)

# Consume and parse messages
for message in consumer:
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Slot: {event.block.slot}")
        print(f"DEX Program: {event.d_app.program_address}")
        print("---")

Go 예제: 고성능 소비

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/scram"
    "google.golang.org/protobuf/proto"
    
    common "github.com/chainstream-io/streaming_protobuf/common/messages"
)

func main() {
    mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
    
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"<your_broker_address>"},
        Topic:   "sol.dex.trades",
        GroupID: "my-solana-consumer",
        Dialer: &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        },
    })
    defer reader.Close()

    // Use worker pool for parallel processing
    messages := make(chan kafka.Message, 100)
    
    // Start workers
    for i := 0; i < 10; i++ {
        go func() {
            for msg := range messages {
                var tradeEvents common.TradeEvents
                if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
                    log.Printf("Failed to parse: %v", err)
                    continue
                }
                
                // Process trades
                for _, event := range tradeEvents.Events {
                    log.Printf("Pool: %s, Token A: %s, Token B: %s",
                        event.Trade.PoolAddress,
                        event.Trade.TokenAAddress,
                        event.Trade.TokenBAddress)
                    log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
                        event.Trade.UserAAmount,
                        event.Trade.UserBAmount,
                        event.Block.Slot)
                }
            }
        }()
    }

    // Read messages
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("Error reading: %v", err)
            continue
        }
        messages <- msg
    }
}

관련 문서

개념 및 통합 가이드

Kafka Streams 통합 기초

EVM Streams

EVM 체인 데이터 스트림

TRON Streams

TRON 네트워크 데이터 스트림

WebSocket 실시간 데이터

WebSocket 통합