跳转到主要内容
Solana 目标出块时间为 400ms,实际吞吐量约 4,000 TPS,理论峰值可达 65,000 TPS。极高的消息量对消费端的处理能力要求较高。 Schema 仓库github.com/chainstream-io/streaming_protobuf/solana

Message Types 总览

Solana Streams 提供以下消息类型:
Message Type说明Topic
TradeEventsDEX 交易事件sol.dex.trades
TokenEventsToken 事件sol.tokens
BalanceEvents余额变动事件sol.balances
DexPoolEvents流动性池事件sol.dex.pools
TransferEvents转账事件sol.transfers
CandlestickEventsK线数据sol.candlesticks

Block-Level Data

Solana 使用 Slot 而非传统区块号作为时间线标识。
字段类型说明
Slotuint64Slot 号(主要标识)
BlockHeightuint64区块高度
BlockHashbytes区块哈希
ParentSlotuint64父 Slot
TimestampTimestamp出块时间
Solana 的 shred stream 中,Block Header 可能不完整,仅 Slot 字段保证正确。

Transaction-Level Data

交易核心字段

字段类型说明
Signaturebytes交易签名(唯一标识)
StatusStatus执行状态
HeaderTransactionHeader元数据(含费用和签名者)
Indexuint32区块内位置

Instructions

交易包含多种 Instruction,这是 Solana 执行模型的核心:
字段说明
ProgramAccountIndex被调用的程序
Data编码的指令数据
AccountIndexes引用的账户
BalanceUpdates该指令引起的 SOL 余额变动
一笔 Solana 交易可以包含多个 Instruction,每个 Instruction 调用不同的程序(Program)。

Transfer Data

TransferEvents 提供 Solana 的转账信息(Topic: sol.transfers)。

TransferEvent 结构

message TransferEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Transfer transfer = 100;
  TransferProcessed transfer_processed = 200;  // processed topic 包含
}

Transfer 核心字段

字段类型说明
sender_token_account_addressstring发送方 Token 账户地址
sender_account_owner_addressstring发送方账户所有者地址
receiver_token_account_addressstring接收方 Token 账户地址
receiver_account_owner_addressstring接收方账户所有者地址
token_addressstringToken 地址(Mint)
token_amountstring转账数量

Token 元数据(TokenEvent)

Token 事件使用通用的 TokenEvents(Topic: sol.tokens),包含:
字段说明
addressToken 地址
name / symbol名称和符号
decimals精度
uri元数据 URI
metadata_address元数据地址
creators创建者列表(地址、验证状态、分成比例)

SolanaExtra 特有字段

Solana Token 包含额外的元数据字段:
字段说明
collection_address集合地址(NFT)
fungible是否同质化
is_mutable是否可修改
is_native是否原生 SOL
program_address程序地址
seller_fee_basis_points创作者版税基点(NFT)
token_standardToken 标准
mint_authority / freeze_authority / update_authority权限账户
is_verified_collection是否已验证集合

Balance Updates 分层机制(Solana 特有)

这是 Solana 流数据的一个重要特性,余额更新在两个层级提供:
每条 instruction 执行后的即时余额变动,反映单步操作的直接影响。
Transaction
└── Instruction 1
    └── BalanceUpdate: +100 SOL
└── Instruction 2
    └── BalanceUpdate: -50 SOL
└── Instruction 3
    └── BalanceUpdate: +25 SOL
这使得开发者既可以追踪细粒度的资金流向,也可以获取最终状态。

DEX Data

TradeEvents 提供 DEX 交易数据(Topic: sol.dex.trades),使用通用的 TradeEvent 结构。

Trade 核心字段

字段说明
token_a_address / token_b_address交易对代币地址
user_a_amount / user_b_amount用户交易数量
pool_address池子地址
vault_a / vault_b池子 Vault 地址
vault_a_amount / vault_b_amountVault 数量
bonding_curveBonding Curve 信息(如适用)

DApp 信息

字段说明
program_addressDEX 程序地址(如 Raydium、Orca)
inner_program_address内部程序地址
chain链标识(CHAIN_SOLANA)

DexPoolEvent - 流动性池

字段说明
type事件类型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
address池子地址
token_a_address / token_b_address代币地址
token_a_amount / token_b_amount代币数量
lp_walletLP 钱包地址

Solana 链特性

Slot 机制

Solana 使用 Slot 而非传统区块号作为时间线标识:
特性说明
出块间隔约 400ms
消息量远高于 EVM 链
主要标识Slot 号

消息打包

交易以小批次打包,每条 Kafka 消息不超过 250 笔交易。

Block Header 完整性

Solana 的 shred stream 中,Block Header 可能不完整,仅 Slot 字段保证正确。其他字段可能为空或不准确。

高吞吐量处理建议

由于 Solana 的极高吞吐量,建议:
  1. 充足的处理能力:确保消费端有足够的 CPU 和内存
  2. 并行处理:使用多线程/协程并行处理消息
  3. 高效解析:优化 Protobuf 解析代码
  4. 批量写入:如需持久化,采用批量写入数据库

Topic → Message Type 映射表

TopicProto FileMessage Type说明
sol.dex.tradestrade_event.protoTradeEventsDEX 交易事件
sol.dex.trades.processedtrade_event.protoTradeEvents含 USD 价格、可疑标记
sol.tokenstoken_event.protoTokenEventsToken 事件
sol.tokens.processedtoken_event.protoTokenEvents含描述、图片、社交链接
sol.balancesbalance_event.protoBalanceEvents余额变动事件
sol.transferssolana/transfer_event.protoTransferEventsSolana 转账事件
sol.transfers.processedsolana/transfer_processed_event.protoTransferProcessedEvents含 USD 价值
sol.dex.poolsdex_pool_event.protoDexPoolEvents流动性池事件
sol.candlestickscandlestick.protoCandlestickEventsK线数据

代码示例

Python 示例:消费 Solana DEX 交易

from kafka import KafkaConsumer
from common import trade_event_pb2  # 从 streaming_protobuf 仓库获取

# 创建 consumer
consumer = KafkaConsumer(
    'sol.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-solana-consumer'
)

# 消费并解析消息
for message in consumer:
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Slot: {event.block.slot}")
        print(f"DEX Program: {event.d_app.program_address}")
        print("---")

Go 示例:高性能消费

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/scram"
    "google.golang.org/protobuf/proto"
    
    common "github.com/chainstream-io/streaming_protobuf/common/messages"
)

func main() {
    mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
    
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"<your_broker_address>"},
        Topic:   "sol.dex.trades",
        GroupID: "my-solana-consumer",
        Dialer: &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        },
    })
    defer reader.Close()

    // 使用 worker pool 并行处理
    messages := make(chan kafka.Message, 100)
    
    // 启动 worker
    for i := 0; i < 10; i++ {
        go func() {
            for msg := range messages {
                var tradeEvents common.TradeEvents
                if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
                    log.Printf("Failed to parse: %v", err)
                    continue
                }
                
                // 处理交易
                for _, event := range tradeEvents.Events {
                    log.Printf("Pool: %s, Token A: %s, Token B: %s",
                        event.Trade.PoolAddress,
                        event.Trade.TokenAAddress,
                        event.Trade.TokenBAddress)
                    log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
                        event.Trade.UserAAmount,
                        event.Trade.UserBAmount,
                        event.Block.Slot)
                }
            }
        }()
    }

    // 读取消息
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("Error reading: %v", err)
            continue
        }
        messages <- msg
    }
}

相关文档