跳轉到主要內容
Solana 目標出塊時間為 400ms,實際吞吐量約 4,000 TPS,理論峰值可達 65,000 TPS。極高的訊息量對消費端的處理能力要求較高。 Schema 倉庫github.com/chainstream-io/streaming_protobuf/solana

Message Types 總覽

Solana Streams 提供以下訊息類型:
Message Type說明Topic
TradeEventsDEX 交易事件sol.dex.trades
TokenEventsToken 事件sol.tokens
BalanceEvents餘額變動事件sol.balances
DexPoolEvents流動性池事件sol.dex.pools
TransferEvents轉帳事件sol.transfers
CandlestickEventsK線資料sol.candlesticks

Block-Level Data

Solana 使用 Slot 而非傳統區塊號作為時間線標識。
欄位類型說明
Slotuint64Slot 號(主要標識)
BlockHeightuint64區塊高度
BlockHashbytes區塊雜湊
ParentSlotuint64父 Slot
TimestampTimestamp出塊時間
Solana 的 shred stream 中,Block Header 可能不完整,僅 Slot 欄位保證正確。

Transaction-Level Data

交易核心欄位

欄位類型說明
Signaturebytes交易簽名(唯一標識)
StatusStatus執行狀態
HeaderTransactionHeader中繼資料(含費用和簽名者)
Indexuint32區塊內位置

Instructions

交易包含多種 Instruction,這是 Solana 執行模型的核心:
欄位說明
ProgramAccountIndex被呼叫的程式
Data編碼的指令資料
AccountIndexes引用的帳戶
BalanceUpdates該指令引起的 SOL 餘額變動
一筆 Solana 交易可以包含多個 Instruction,每個 Instruction 呼叫不同的程式(Program)。

Transfer Data

TransferEvents 提供 Solana 的轉帳資訊(Topic: sol.transfers)。

TransferEvent 結構

message TransferEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Transfer transfer = 100;
  TransferProcessed transfer_processed = 200;  // processed topic 包含
}

Transfer 核心欄位

欄位類型說明
sender_token_account_addressstring發送方 Token 帳戶地址
sender_account_owner_addressstring發送方帳戶所有者地址
receiver_token_account_addressstring接收方 Token 帳戶地址
receiver_account_owner_addressstring接收方帳戶所有者地址
token_addressstringToken 地址(Mint)
token_amountstring轉帳數量

Token 中繼資料(TokenEvent)

Token 事件使用通用的 TokenEvents(Topic: sol.tokens),包含:
欄位說明
addressToken 地址
name / symbol名稱和符號
decimals精度
uri中繼資料 URI
metadata_address中繼資料地址
creators建立者列表(地址、驗證狀態、分成比例)

SolanaExtra 特有欄位

Solana Token 包含額外的中繼資料欄位:
欄位說明
collection_address集合地址(NFT)
fungible是否同質化
is_mutable是否可修改
is_native是否原生 SOL
program_address程式地址
seller_fee_basis_points創作者版稅基點(NFT)
token_standardToken 標準
mint_authority / freeze_authority / update_authority權限帳戶
is_verified_collection是否已驗證集合

Balance Updates 分層機制(Solana 特有)

這是 Solana 流資料的一個重要特性,餘額更新在兩個層級提供:
每條 instruction 執行後的即時餘額變動,反映單步操作的直接影響。
Transaction
└── Instruction 1
    └── BalanceUpdate: +100 SOL
└── Instruction 2
    └── BalanceUpdate: -50 SOL
└── Instruction 3
    └── BalanceUpdate: +25 SOL
這使得開發者既可以追蹤細粒度的資金流向,也可以取得最終狀態。

DEX Data

TradeEvents 提供 DEX 交易資料(Topic: sol.dex.trades),使用通用的 TradeEvent 結構。

Trade 核心欄位

欄位說明
token_a_address / token_b_address交易對代幣地址
user_a_amount / user_b_amount使用者交易數量
pool_address池子地址
vault_a / vault_b池子 Vault 地址
vault_a_amount / vault_b_amountVault 數量
bonding_curveBonding Curve 資訊(如適用)

DApp 資訊

欄位說明
program_addressDEX 程式地址(如 Raydium、Orca)
inner_program_address內部程式地址
chain鏈標識(CHAIN_SOLANA)

DexPoolEvent - 流動性池

欄位說明
type事件類型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
address池子地址
token_a_address / token_b_address代幣地址
token_a_amount / token_b_amount代幣數量
lp_walletLP 錢包地址

Solana 鏈特性

Slot 機制

Solana 使用 Slot 而非傳統區塊號作為時間線標識:
特性說明
出塊間隔約 400ms
訊息量遠高於 EVM 鏈
主要標識Slot 號

訊息打包

交易以小批次打包,每條 Kafka 訊息不超過 250 筆交易。

Block Header 完整性

Solana 的 shred stream 中,Block Header 可能不完整,僅 Slot 欄位保證正確。其他欄位可能為空或不準確。

高吞吐量處理建議

由於 Solana 的極高吞吐量,建議:
  1. 充足的處理能力:確保消費端有足夠的 CPU 和記憶體
  2. 並行處理:使用多執行緒/協程並行處理訊息
  3. 高效解析:最佳化 Protobuf 解析程式碼
  4. 批次寫入:如需持久化,採用批次寫入資料庫

Topic → Message Type 映射表

TopicProto FileMessage Type說明
sol.dex.tradestrade_event.protoTradeEventsDEX 交易事件
sol.dex.trades.processedtrade_event.protoTradeEvents含 USD 價格、可疑標記
sol.tokenstoken_event.protoTokenEventsToken 事件
sol.tokens.processedtoken_event.protoTokenEvents含描述、圖片、社交連結
sol.balancesbalance_event.protoBalanceEvents餘額變動事件
sol.transferssolana/transfer_event.protoTransferEventsSolana 轉帳事件
sol.transfers.processedsolana/transfer_processed_event.protoTransferProcessedEvents含 USD 價值
sol.dex.poolsdex_pool_event.protoDexPoolEvents流動性池事件
sol.candlestickscandlestick.protoCandlestickEventsK線資料

程式碼示例

Python 示例:消費 Solana DEX 交易

from kafka import KafkaConsumer
from common import trade_event_pb2  # 從 streaming_protobuf 倉庫取得

# 建立 consumer
consumer = KafkaConsumer(
    'sol.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-solana-consumer'
)

# 消費並解析訊息
for message in consumer:
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Slot: {event.block.slot}")
        print(f"DEX Program: {event.d_app.program_address}")
        print("---")

Go 示例:高效能消費

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/scram"
    "google.golang.org/protobuf/proto"
    
    common "github.com/chainstream-io/streaming_protobuf/common/messages"
)

func main() {
    mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
    
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"<your_broker_address>"},
        Topic:   "sol.dex.trades",
        GroupID: "my-solana-consumer",
        Dialer: &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        },
    })
    defer reader.Close()

    // 使用 worker pool 並行處理
    messages := make(chan kafka.Message, 100)
    
    // 啟動 worker
    for i := 0; i < 10; i++ {
        go func() {
            for msg := range messages {
                var tradeEvents common.TradeEvents
                if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
                    log.Printf("Failed to parse: %v", err)
                    continue
                }
                
                // 處理交易
                for _, event := range tradeEvents.Events {
                    log.Printf("Pool: %s, Token A: %s, Token B: %s",
                        event.Trade.PoolAddress,
                        event.Trade.TokenAAddress,
                        event.Trade.TokenBAddress)
                    log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
                        event.Trade.UserAAmount,
                        event.Trade.UserBAmount,
                        event.Block.Slot)
                }
            }
        }()
    }

    // 讀取訊息
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("Error reading: %v", err)
            continue
        }
        messages <- msg
    }
}

相關文件

概念與接入指南

Kafka Streams 接入基礎

EVM Streams

EVM 鏈資料流

TRON Streams

TRON 網路資料流

WebSocket 即時資料

WebSocket 接入方式