Solana는 400ms 블록 시간을 목표로 하며, 실제 처리량은 약 4,000 TPS, 이론적 피크는 65,000 TPS입니다. 매우 높은 메시지 양으로 인해 컨슈머 처리 능력에 높은 요구사항이 필요합니다.
Schema 저장소: github.com/chainstream-io/streaming_protobuf/solana
메시지 유형 개요
Solana Streams는 다음 메시지 유형을 제공합니다:
| 메시지 유형 | 설명 | Topic |
|---|
| TradeEvents | DEX 거래 이벤트 | sol.dex.trades |
| TokenEvents | 토큰 이벤트 | sol.tokens |
| BalanceEvents | 잔고 변동 이벤트 | sol.balances |
| DexPoolEvents | 유동성 풀 이벤트 | sol.dex.pools |
| TransferEvents | 전송 이벤트 | sol.transfers |
| CandlestickEvents | 캔들스틱 데이터 | sol.candlesticks |
블록 레벨 데이터
Solana는 기존 블록 번호 대신 Slot을 타임라인 식별자로 사용합니다.
| 필드 | 유형 | 설명 |
|---|
Slot | uint64 | Slot 번호 (기본 식별자) |
BlockHeight | uint64 | 블록 높이 |
BlockHash | bytes | 블록 해시 |
ParentSlot | uint64 | 부모 Slot |
Timestamp | Timestamp | 블록 시간 |
Solana의 shred stream에서 Block Header가 불완전할 수 있으며, Slot 필드만 정확함이 보장됩니다.
트랜잭션 레벨 데이터
트랜잭션 핵심 필드
| 필드 | 유형 | 설명 |
|---|
Signature | bytes | 트랜잭션 서명 (고유 식별자) |
Status | Status | 실행 상태 |
Header | TransactionHeader | 메타데이터 (수수료 및 서명자 포함) |
Index | uint32 | 블록 내 위치 |
Instructions
트랜잭션에는 여러 Instruction이 포함되며, 이는 Solana 실행 모델의 핵심입니다:
| 필드 | 설명 |
|---|
ProgramAccountIndex | 호출된 프로그램 |
Data | 인코딩된 명령어 데이터 |
AccountIndexes | 참조된 계정 |
BalanceUpdates | 이 명령어로 인한 SOL 잔고 변동 |
하나의 Solana 트랜잭션에는 여러 Instruction이 포함될 수 있으며, 각 Instruction은 서로 다른 프로그램을 호출합니다.
전송 데이터
TransferEvents는 Solana 전송 정보를 제공합니다 (Topic: sol.transfers).
TransferEvent 구조
message TransferEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Transfer transfer = 100;
TransferProcessed transfer_processed = 200; // included in processed topic
}
Transfer 핵심 필드
| 필드 | 유형 | 설명 |
|---|
sender_token_account_address | string | 발신자 토큰 계정 주소 |
sender_account_owner_address | string | 발신자 계정 소유자 주소 |
receiver_token_account_address | string | 수신자 토큰 계정 주소 |
receiver_account_owner_address | string | 수신자 계정 소유자 주소 |
token_address | string | 토큰 주소 (Mint) |
token_amount | string | 전송 수량 |
토큰 메타데이터 (TokenEvent)
토큰 이벤트는 공통 TokenEvents (Topic: sol.tokens)를 사용하며, 다음을 포함합니다:
| 필드 | 설명 |
|---|
address | 토큰 주소 |
name / symbol | 이름 및 심볼 |
decimals | 소수점 자릿수 |
uri | 메타데이터 URI |
metadata_address | 메타데이터 주소 |
creators | 생성자 목록 (주소, 검증 상태, 배분 비율) |
Solana 토큰에는 추가 메타데이터 필드가 포함됩니다:
| 필드 | 설명 |
|---|
collection_address | 컬렉션 주소 (NFT) |
fungible | 대체 가능 여부 |
is_mutable | 변경 가능 여부 |
is_native | 네이티브 SOL 여부 |
program_address | 프로그램 주소 |
seller_fee_basis_points | 크리에이터 로열티 기준점 (NFT) |
token_standard | 토큰 표준 |
mint_authority / freeze_authority / update_authority | 권한 계정 |
is_verified_collection | 검증된 컬렉션 여부 |
Balance Updates 계층 메커니즘 (Solana 고유)
이는 Solana 스트림 데이터의 중요한 특성으로, 잔고 업데이트가 두 수준에서 제공됩니다:
각 instruction 실행 후 즉시 잔고 변동으로, 단일 단계 작업의 직접적인 영향을 반영합니다.Transaction
└── Instruction 1
└── BalanceUpdate: +100 SOL
└── Instruction 2
└── BalanceUpdate: -50 SOL
└── Instruction 3
└── BalanceUpdate: +25 SOL
트랜잭션 내 모든 instruction 완료 후 최종 잔고 상태.Transaction
└── FinalBalanceUpdate: +75 SOL (net change)
이를 통해 개발자는 세밀한 자금 흐름 추적과 최종 상태 확인을 모두 할 수 있습니다.
DEX 데이터
TradeEvents는 DEX 거래 데이터를 제공합니다 (Topic: sol.dex.trades). 공통 TradeEvent 구조를 사용합니다.
Trade 핵심 필드
| 필드 | 설명 |
|---|
token_a_address / token_b_address | 거래 쌍 토큰 주소 |
user_a_amount / user_b_amount | 사용자 거래 수량 |
pool_address | 풀 주소 |
vault_a / vault_b | 풀 Vault 주소 |
vault_a_amount / vault_b_amount | Vault 수량 |
bonding_curve | 본딩 커브 정보 (해당되는 경우) |
DApp 정보
| 필드 | 설명 |
|---|
program_address | DEX 프로그램 주소 (예: Raydium, Orca) |
inner_program_address | 내부 프로그램 주소 |
chain | 체인 식별자 (CHAIN_SOLANA) |
DexPoolEvent - 유동성 풀
| 필드 | 설명 |
|---|
type | 이벤트 유형 (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | 풀 주소 |
token_a_address / token_b_address | 토큰 주소 |
token_a_amount / token_b_amount | 토큰 수량 |
lp_wallet | LP 지갑 주소 |
Solana 체인 특성
Slot 메커니즘
Solana는 기존 블록 번호 대신 Slot을 타임라인 식별자로 사용합니다:
| 특성 | 설명 |
|---|
| 블록 간격 | 약 400ms |
| 메시지 양 | EVM 체인보다 훨씬 많음 |
| 기본 식별자 | Slot 번호 |
메시지 패키징
트랜잭션은 소규모 배치로 패키징되며, 각 Kafka 메시지는 최대 250개의 트랜잭션을 포함합니다.
Solana의 shred stream에서 Block Header가 불완전할 수 있으며, Slot 필드만 정확함이 보장됩니다. 다른 필드는 비어 있거나 부정확할 수 있습니다.
고처리량 처리 권장 사항
Solana의 매우 높은 처리량으로 인해 다음을 권장합니다:
- 충분한 처리 능력: 컨슈머에 충분한 CPU와 메모리 확보
- 병렬 처리: 멀티스레드/코루틴으로 병렬 메시지 처리
- 효율적인 파싱: Protobuf 파싱 코드 최적화
- 배치 쓰기: 영속화가 필요한 경우 데이터베이스 배치 쓰기
Topic → 메시지 유형 매핑
| Topic | Proto File | 메시지 유형 | 설명 |
|---|
sol.dex.trades | trade_event.proto | TradeEvents | DEX 거래 이벤트 |
sol.dex.trades.processed | trade_event.proto | TradeEvents | USD 가격, 의심 플래그 포함 |
sol.tokens | token_event.proto | TokenEvents | 토큰 이벤트 |
sol.tokens.processed | token_event.proto | TokenEvents | 설명, 이미지, SNS 링크 포함 |
sol.balances | balance_event.proto | BalanceEvents | 잔고 변동 이벤트 |
sol.transfers | solana/transfer_event.proto | TransferEvents | Solana 전송 이벤트 |
sol.transfers.processed | solana/transfer_processed_event.proto | TransferProcessedEvents | USD 가치 포함 |
sol.dex.pools | dex_pool_event.proto | DexPoolEvents | 유동성 풀 이벤트 |
sol.candlesticks | candlestick.proto | CandlestickEvents | 캔들스틱 데이터 |
코드 예제
Python 예제: Solana DEX 거래 소비
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'sol.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-solana-consumer'
)
# Consume and parse messages
for message in consumer:
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Slot: {event.block.slot}")
print(f"DEX Program: {event.d_app.program_address}")
print("---")
Go 예제: 고성능 소비
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"google.golang.org/protobuf/proto"
common "github.com/chainstream-io/streaming_protobuf/common/messages"
)
func main() {
mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"<your_broker_address>"},
Topic: "sol.dex.trades",
GroupID: "my-solana-consumer",
Dialer: &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
},
})
defer reader.Close()
// Use worker pool for parallel processing
messages := make(chan kafka.Message, 100)
// Start workers
for i := 0; i < 10; i++ {
go func() {
for msg := range messages {
var tradeEvents common.TradeEvents
if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
log.Printf("Failed to parse: %v", err)
continue
}
// Process trades
for _, event := range tradeEvents.Events {
log.Printf("Pool: %s, Token A: %s, Token B: %s",
event.Trade.PoolAddress,
event.Trade.TokenAAddress,
event.Trade.TokenBAddress)
log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
event.Trade.UserAAmount,
event.Trade.UserBAmount,
event.Block.Slot)
}
}
}()
}
// Read messages
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Error reading: %v", err)
continue
}
messages <- msg
}
}
관련 문서
개념 및 통합 가이드
Kafka Streams 통합 기초
EVM Streams
EVM 체인 데이터 스트림
TRON Streams
TRON 네트워크 데이터 스트림
WebSocket 실시간 데이터
WebSocket 통합