Solana 目標出塊時間為 400ms,實際吞吐量約 4,000 TPS,理論峰值可達 65,000 TPS。極高的訊息量對消費端的處理能力要求較高。
Schema 倉庫:github.com/chainstream-io/streaming_protobuf/solana
Message Types 總覽
Solana Streams 提供以下訊息類型:
| Message Type | 說明 | Topic |
|---|
| TradeEvents | DEX 交易事件 | sol.dex.trades |
| TokenEvents | Token 事件 | sol.tokens |
| BalanceEvents | 餘額變動事件 | sol.balances |
| DexPoolEvents | 流動性池事件 | sol.dex.pools |
| TransferEvents | 轉帳事件 | sol.transfers |
| CandlestickEvents | K線資料 | sol.candlesticks |
Block-Level Data
Solana 使用 Slot 而非傳統區塊號作為時間線標識。
| 欄位 | 類型 | 說明 |
|---|
Slot | uint64 | Slot 號(主要標識) |
BlockHeight | uint64 | 區塊高度 |
BlockHash | bytes | 區塊雜湊 |
ParentSlot | uint64 | 父 Slot |
Timestamp | Timestamp | 出塊時間 |
Solana 的 shred stream 中,Block Header 可能不完整,僅 Slot 欄位保證正確。
Transaction-Level Data
交易核心欄位
| 欄位 | 類型 | 說明 |
|---|
Signature | bytes | 交易簽名(唯一標識) |
Status | Status | 執行狀態 |
Header | TransactionHeader | 中繼資料(含費用和簽名者) |
Index | uint32 | 區塊內位置 |
Instructions
交易包含多種 Instruction,這是 Solana 執行模型的核心:
| 欄位 | 說明 |
|---|
ProgramAccountIndex | 被呼叫的程式 |
Data | 編碼的指令資料 |
AccountIndexes | 引用的帳戶 |
BalanceUpdates | 該指令引起的 SOL 餘額變動 |
一筆 Solana 交易可以包含多個 Instruction,每個 Instruction 呼叫不同的程式(Program)。
Transfer Data
TransferEvents 提供 Solana 的轉帳資訊(Topic: sol.transfers)。
TransferEvent 結構
message TransferEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Transfer transfer = 100;
TransferProcessed transfer_processed = 200; // processed topic 包含
}
Transfer 核心欄位
| 欄位 | 類型 | 說明 |
|---|
sender_token_account_address | string | 發送方 Token 帳戶地址 |
sender_account_owner_address | string | 發送方帳戶所有者地址 |
receiver_token_account_address | string | 接收方 Token 帳戶地址 |
receiver_account_owner_address | string | 接收方帳戶所有者地址 |
token_address | string | Token 地址(Mint) |
token_amount | string | 轉帳數量 |
Token 中繼資料(TokenEvent)
Token 事件使用通用的 TokenEvents(Topic: sol.tokens),包含:
| 欄位 | 說明 |
|---|
address | Token 地址 |
name / symbol | 名稱和符號 |
decimals | 精度 |
uri | 中繼資料 URI |
metadata_address | 中繼資料地址 |
creators | 建立者列表(地址、驗證狀態、分成比例) |
Solana Token 包含額外的中繼資料欄位:
| 欄位 | 說明 |
|---|
collection_address | 集合地址(NFT) |
fungible | 是否同質化 |
is_mutable | 是否可修改 |
is_native | 是否原生 SOL |
program_address | 程式地址 |
seller_fee_basis_points | 創作者版稅基點(NFT) |
token_standard | Token 標準 |
mint_authority / freeze_authority / update_authority | 權限帳戶 |
is_verified_collection | 是否已驗證集合 |
Balance Updates 分層機制(Solana 特有)
這是 Solana 流資料的一個重要特性,餘額更新在兩個層級提供:
指令級 Balance Updates
交易級 Balance Updates
每條 instruction 執行後的即時餘額變動,反映單步操作的直接影響。Transaction
└── Instruction 1
└── BalanceUpdate: +100 SOL
└── Instruction 2
└── BalanceUpdate: -50 SOL
└── Instruction 3
└── BalanceUpdate: +25 SOL
整筆交易所有 instruction 執行完畢後的最終餘額狀態。Transaction
└── FinalBalanceUpdate: +75 SOL (淨變動)
這使得開發者既可以追蹤細粒度的資金流向,也可以取得最終狀態。
DEX Data
TradeEvents 提供 DEX 交易資料(Topic: sol.dex.trades),使用通用的 TradeEvent 結構。
Trade 核心欄位
| 欄位 | 說明 |
|---|
token_a_address / token_b_address | 交易對代幣地址 |
user_a_amount / user_b_amount | 使用者交易數量 |
pool_address | 池子地址 |
vault_a / vault_b | 池子 Vault 地址 |
vault_a_amount / vault_b_amount | Vault 數量 |
bonding_curve | Bonding Curve 資訊(如適用) |
DApp 資訊
| 欄位 | 說明 |
|---|
program_address | DEX 程式地址(如 Raydium、Orca) |
inner_program_address | 內部程式地址 |
chain | 鏈標識(CHAIN_SOLANA) |
DexPoolEvent - 流動性池
| 欄位 | 說明 |
|---|
type | 事件類型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | 池子地址 |
token_a_address / token_b_address | 代幣地址 |
token_a_amount / token_b_amount | 代幣數量 |
lp_wallet | LP 錢包地址 |
Solana 鏈特性
Slot 機制
Solana 使用 Slot 而非傳統區塊號作為時間線標識:
| 特性 | 說明 |
|---|
| 出塊間隔 | 約 400ms |
| 訊息量 | 遠高於 EVM 鏈 |
| 主要標識 | Slot 號 |
訊息打包
交易以小批次打包,每條 Kafka 訊息不超過 250 筆交易。
Solana 的 shred stream 中,Block Header 可能不完整,僅 Slot 欄位保證正確。其他欄位可能為空或不準確。
高吞吐量處理建議
由於 Solana 的極高吞吐量,建議:
- 充足的處理能力:確保消費端有足夠的 CPU 和記憶體
- 並行處理:使用多執行緒/協程並行處理訊息
- 高效解析:最佳化 Protobuf 解析程式碼
- 批次寫入:如需持久化,採用批次寫入資料庫
Topic → Message Type 映射表
| Topic | Proto File | Message Type | 說明 |
|---|
sol.dex.trades | trade_event.proto | TradeEvents | DEX 交易事件 |
sol.dex.trades.processed | trade_event.proto | TradeEvents | 含 USD 價格、可疑標記 |
sol.tokens | token_event.proto | TokenEvents | Token 事件 |
sol.tokens.processed | token_event.proto | TokenEvents | 含描述、圖片、社交連結 |
sol.balances | balance_event.proto | BalanceEvents | 餘額變動事件 |
sol.transfers | solana/transfer_event.proto | TransferEvents | Solana 轉帳事件 |
sol.transfers.processed | solana/transfer_processed_event.proto | TransferProcessedEvents | 含 USD 價值 |
sol.dex.pools | dex_pool_event.proto | DexPoolEvents | 流動性池事件 |
sol.candlesticks | candlestick.proto | CandlestickEvents | K線資料 |
程式碼示例
Python 示例:消費 Solana DEX 交易
from kafka import KafkaConsumer
from common import trade_event_pb2 # 從 streaming_protobuf 倉庫取得
# 建立 consumer
consumer = KafkaConsumer(
'sol.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-solana-consumer'
)
# 消費並解析訊息
for message in consumer:
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Slot: {event.block.slot}")
print(f"DEX Program: {event.d_app.program_address}")
print("---")
Go 示例:高效能消費
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"google.golang.org/protobuf/proto"
common "github.com/chainstream-io/streaming_protobuf/common/messages"
)
func main() {
mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"<your_broker_address>"},
Topic: "sol.dex.trades",
GroupID: "my-solana-consumer",
Dialer: &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
},
})
defer reader.Close()
// 使用 worker pool 並行處理
messages := make(chan kafka.Message, 100)
// 啟動 worker
for i := 0; i < 10; i++ {
go func() {
for msg := range messages {
var tradeEvents common.TradeEvents
if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
log.Printf("Failed to parse: %v", err)
continue
}
// 處理交易
for _, event := range tradeEvents.Events {
log.Printf("Pool: %s, Token A: %s, Token B: %s",
event.Trade.PoolAddress,
event.Trade.TokenAAddress,
event.Trade.TokenBAddress)
log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
event.Trade.UserAAmount,
event.Trade.UserBAmount,
event.Block.Slot)
}
}
}()
}
// 讀取訊息
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Error reading: %v", err)
continue
}
messages <- msg
}
}
相關文件
概念與接入指南
Kafka Streams 接入基礎
WebSocket 即時資料
WebSocket 接入方式