Solana 目标出块时间为 400ms,实际吞吐量约 4,000 TPS,理论峰值可达 65,000 TPS。极高的消息量对消费端的处理能力要求较高。
Schema 仓库:github.com/chainstream-io/streaming_protobuf/solana
Message Types 总览
Solana Streams 提供以下消息类型:
| Message Type | 说明 | Topic |
|---|
| TradeEvents | DEX 交易事件 | sol.dex.trades |
| TokenEvents | Token 事件 | sol.tokens |
| BalanceEvents | 余额变动事件 | sol.balances |
| DexPoolEvents | 流动性池事件 | sol.dex.pools |
| TransferEvents | 转账事件 | sol.transfers |
| CandlestickEvents | K线数据 | sol.candlesticks |
Block-Level Data
Solana 使用 Slot 而非传统区块号作为时间线标识。
| 字段 | 类型 | 说明 |
|---|
Slot | uint64 | Slot 号(主要标识) |
BlockHeight | uint64 | 区块高度 |
BlockHash | bytes | 区块哈希 |
ParentSlot | uint64 | 父 Slot |
Timestamp | Timestamp | 出块时间 |
Solana 的 shred stream 中,Block Header 可能不完整,仅 Slot 字段保证正确。
Transaction-Level Data
交易核心字段
| 字段 | 类型 | 说明 |
|---|
Signature | bytes | 交易签名(唯一标识) |
Status | Status | 执行状态 |
Header | TransactionHeader | 元数据(含费用和签名者) |
Index | uint32 | 区块内位置 |
Instructions
交易包含多种 Instruction,这是 Solana 执行模型的核心:
| 字段 | 说明 |
|---|
ProgramAccountIndex | 被调用的程序 |
Data | 编码的指令数据 |
AccountIndexes | 引用的账户 |
BalanceUpdates | 该指令引起的 SOL 余额变动 |
一笔 Solana 交易可以包含多个 Instruction,每个 Instruction 调用不同的程序(Program)。
Transfer Data
TransferEvents 提供 Solana 的转账信息(Topic: sol.transfers)。
TransferEvent 结构
message TransferEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Transfer transfer = 100;
TransferProcessed transfer_processed = 200; // processed topic 包含
}
Transfer 核心字段
| 字段 | 类型 | 说明 |
|---|
sender_token_account_address | string | 发送方 Token 账户地址 |
sender_account_owner_address | string | 发送方账户所有者地址 |
receiver_token_account_address | string | 接收方 Token 账户地址 |
receiver_account_owner_address | string | 接收方账户所有者地址 |
token_address | string | Token 地址(Mint) |
token_amount | string | 转账数量 |
Token 元数据(TokenEvent)
Token 事件使用通用的 TokenEvents(Topic: sol.tokens),包含:
| 字段 | 说明 |
|---|
address | Token 地址 |
name / symbol | 名称和符号 |
decimals | 精度 |
uri | 元数据 URI |
metadata_address | 元数据地址 |
creators | 创建者列表(地址、验证状态、分成比例) |
Solana Token 包含额外的元数据字段:
| 字段 | 说明 |
|---|
collection_address | 集合地址(NFT) |
fungible | 是否同质化 |
is_mutable | 是否可修改 |
is_native | 是否原生 SOL |
program_address | 程序地址 |
seller_fee_basis_points | 创作者版税基点(NFT) |
token_standard | Token 标准 |
mint_authority / freeze_authority / update_authority | 权限账户 |
is_verified_collection | 是否已验证集合 |
Balance Updates 分层机制(Solana 特有)
这是 Solana 流数据的一个重要特性,余额更新在两个层级提供:
指令级 Balance Updates
交易级 Balance Updates
每条 instruction 执行后的即时余额变动,反映单步操作的直接影响。Transaction
└── Instruction 1
└── BalanceUpdate: +100 SOL
└── Instruction 2
└── BalanceUpdate: -50 SOL
└── Instruction 3
└── BalanceUpdate: +25 SOL
整笔交易所有 instruction 执行完毕后的最终余额状态。Transaction
└── FinalBalanceUpdate: +75 SOL (净变动)
这使得开发者既可以追踪细粒度的资金流向,也可以获取最终状态。
DEX Data
TradeEvents 提供 DEX 交易数据(Topic: sol.dex.trades),使用通用的 TradeEvent 结构。
Trade 核心字段
| 字段 | 说明 |
|---|
token_a_address / token_b_address | 交易对代币地址 |
user_a_amount / user_b_amount | 用户交易数量 |
pool_address | 池子地址 |
vault_a / vault_b | 池子 Vault 地址 |
vault_a_amount / vault_b_amount | Vault 数量 |
bonding_curve | Bonding Curve 信息(如适用) |
DApp 信息
| 字段 | 说明 |
|---|
program_address | DEX 程序地址(如 Raydium、Orca) |
inner_program_address | 内部程序地址 |
chain | 链标识(CHAIN_SOLANA) |
DexPoolEvent - 流动性池
| 字段 | 说明 |
|---|
type | 事件类型(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | 池子地址 |
token_a_address / token_b_address | 代币地址 |
token_a_amount / token_b_amount | 代币数量 |
lp_wallet | LP 钱包地址 |
Solana 链特性
Slot 机制
Solana 使用 Slot 而非传统区块号作为时间线标识:
| 特性 | 说明 |
|---|
| 出块间隔 | 约 400ms |
| 消息量 | 远高于 EVM 链 |
| 主要标识 | Slot 号 |
消息打包
交易以小批次打包,每条 Kafka 消息不超过 250 笔交易。
Solana 的 shred stream 中,Block Header 可能不完整,仅 Slot 字段保证正确。其他字段可能为空或不准确。
高吞吐量处理建议
由于 Solana 的极高吞吐量,建议:
- 充足的处理能力:确保消费端有足够的 CPU 和内存
- 并行处理:使用多线程/协程并行处理消息
- 高效解析:优化 Protobuf 解析代码
- 批量写入:如需持久化,采用批量写入数据库
Topic → Message Type 映射表
| Topic | Proto File | Message Type | 说明 |
|---|
sol.dex.trades | trade_event.proto | TradeEvents | DEX 交易事件 |
sol.dex.trades.processed | trade_event.proto | TradeEvents | 含 USD 价格、可疑标记 |
sol.tokens | token_event.proto | TokenEvents | Token 事件 |
sol.tokens.processed | token_event.proto | TokenEvents | 含描述、图片、社交链接 |
sol.balances | balance_event.proto | BalanceEvents | 余额变动事件 |
sol.transfers | solana/transfer_event.proto | TransferEvents | Solana 转账事件 |
sol.transfers.processed | solana/transfer_processed_event.proto | TransferProcessedEvents | 含 USD 价值 |
sol.dex.pools | dex_pool_event.proto | DexPoolEvents | 流动性池事件 |
sol.candlesticks | candlestick.proto | CandlestickEvents | K线数据 |
代码示例
Python 示例:消费 Solana DEX 交易
from kafka import KafkaConsumer
from common import trade_event_pb2 # 从 streaming_protobuf 仓库获取
# 创建 consumer
consumer = KafkaConsumer(
'sol.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-solana-consumer'
)
# 消费并解析消息
for message in consumer:
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Slot: {event.block.slot}")
print(f"DEX Program: {event.d_app.program_address}")
print("---")
Go 示例:高性能消费
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"google.golang.org/protobuf/proto"
common "github.com/chainstream-io/streaming_protobuf/common/messages"
)
func main() {
mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"<your_broker_address>"},
Topic: "sol.dex.trades",
GroupID: "my-solana-consumer",
Dialer: &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
},
})
defer reader.Close()
// 使用 worker pool 并行处理
messages := make(chan kafka.Message, 100)
// 启动 worker
for i := 0; i < 10; i++ {
go func() {
for msg := range messages {
var tradeEvents common.TradeEvents
if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
log.Printf("Failed to parse: %v", err)
continue
}
// 处理交易
for _, event := range tradeEvents.Events {
log.Printf("Pool: %s, Token A: %s, Token B: %s",
event.Trade.PoolAddress,
event.Trade.TokenAAddress,
event.Trade.TokenBAddress)
log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
event.Trade.UserAAmount,
event.Trade.UserBAmount,
event.Block.Slot)
}
}
}()
}
// 读取消息
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Error reading: %v", err)
continue
}
messages <- msg
}
}
相关文档