Solana は 400ms のブロック時間を目標とし、実際のスループットは約 4,000 TPS、理論的なピークは 65,000 TPS です。非常に高いメッセージ量により、コンシューマーの処理能力に高い要求が求められます。
Schema リポジトリ:github.com/chainstream-io/streaming_protobuf/solana
メッセージタイプ一覧
Solana Streams は以下のメッセージタイプを提供します:
| メッセージタイプ | 説明 | Topic |
|---|
| TradeEvents | DEX 取引イベント | sol.dex.trades |
| TokenEvents | トークンイベント | sol.tokens |
| BalanceEvents | 残高変動イベント | sol.balances |
| DexPoolEvents | 流動性プールイベント | sol.dex.pools |
| TransferEvents | 送金イベント | sol.transfers |
| CandlestickEvents | ローソク足データ | sol.candlesticks |
ブロックレベルデータ
Solana は従来のブロック番号ではなく Slot をタイムライン識別子として使用します。
| フィールド | 型 | 説明 |
|---|
Slot | uint64 | Slot 番号(主要識別子) |
BlockHeight | uint64 | ブロック高さ |
BlockHash | bytes | ブロックハッシュ |
ParentSlot | uint64 | 親 Slot |
Timestamp | Timestamp | ブロック時刻 |
Solana の shred stream では、Block Header が不完全な場合があり、Slot フィールドのみが正確であることが保証されています。
トランザクションレベルデータ
トランザクションコアフィールド
| フィールド | 型 | 説明 |
|---|
Signature | bytes | トランザクション署名(一意の識別子) |
Status | Status | 実行ステータス |
Header | TransactionHeader | メタデータ(手数料と署名者を含む) |
Index | uint32 | ブロック内の位置 |
Instructions
トランザクションには複数の Instruction が含まれ、これは Solana の実行モデルのコアです:
| フィールド | 説明 |
|---|
ProgramAccountIndex | 呼び出されたプログラム |
Data | エンコードされた命令データ |
AccountIndexes | 参照されたアカウント |
BalanceUpdates | この命令によって発生した SOL 残高変動 |
1 つの Solana トランザクションには複数の Instruction を含めることができ、各 Instruction は異なるプログラムを呼び出します。
送金データ
TransferEvents は Solana の送金情報を提供します(Topic: sol.transfers)。
TransferEvent 構造
message TransferEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Transfer transfer = 100;
TransferProcessed transfer_processed = 200; // included in processed topic
}
Transfer コアフィールド
| フィールド | 型 | 説明 |
|---|
sender_token_account_address | string | 送信者トークンアカウントアドレス |
sender_account_owner_address | string | 送信者アカウントオーナーアドレス |
receiver_token_account_address | string | 受信者トークンアカウントアドレス |
receiver_account_owner_address | string | 受信者アカウントオーナーアドレス |
token_address | string | トークンアドレス(Mint) |
token_amount | string | 送金額 |
トークンメタデータ(TokenEvent)
トークンイベントは共通の TokenEvents(Topic: sol.tokens)を使用し、以下を含みます:
| フィールド | 説明 |
|---|
address | トークンアドレス |
name / symbol | 名前とシンボル |
decimals | 精度 |
uri | メタデータ URI |
metadata_address | メタデータアドレス |
creators | 作成者リスト(アドレス、検証ステータス、分配比率) |
Solana トークンには追加のメタデータフィールドが含まれます:
| フィールド | 説明 |
|---|
collection_address | コレクションアドレス(NFT) |
fungible | ファンジブルか |
is_mutable | 変更可能か |
is_native | ネイティブ SOL か |
program_address | プログラムアドレス |
seller_fee_basis_points | クリエイターロイヤリティ基点(NFT) |
token_standard | トークン標準 |
mint_authority / freeze_authority / update_authority | 権限アカウント |
is_verified_collection | 検証済みコレクションか |
Balance Updates 階層メカニズム(Solana 固有)
これは Solana ストリームデータの重要な特徴です。残高更新は 2 つのレベルで提供されます:
各 instruction 実行後の即時残高変動で、単一ステップ操作の直接的な影響を反映します。Transaction
└── Instruction 1
└── BalanceUpdate: +100 SOL
└── Instruction 2
└── BalanceUpdate: -50 SOL
└── Instruction 3
└── BalanceUpdate: +25 SOL
トランザクション内のすべての instruction が完了した後の最終残高状態。Transaction
└── FinalBalanceUpdate: +75 SOL (net change)
これにより、開発者はきめ細かな資金フローの追跡と最終状態の取得の両方が可能になります。
DEX データ
TradeEvents は DEX 取引データを提供します(Topic: sol.dex.trades)。共通の TradeEvent 構造を使用します。
Trade コアフィールド
| フィールド | 説明 |
|---|
token_a_address / token_b_address | 取引ペアトークンアドレス |
user_a_amount / user_b_amount | ユーザー取引数量 |
pool_address | プールアドレス |
vault_a / vault_b | プール Vault アドレス |
vault_a_amount / vault_b_amount | Vault 数量 |
bonding_curve | ボンディングカーブ情報(該当する場合) |
DApp 情報
| フィールド | 説明 |
|---|
program_address | DEX プログラムアドレス(例:Raydium、Orca) |
inner_program_address | 内部プログラムアドレス |
chain | チェーン識別子(CHAIN_SOLANA) |
DexPoolEvent - 流動性プール
| フィールド | 説明 |
|---|
type | イベントタイプ(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | プールアドレス |
token_a_address / token_b_address | トークンアドレス |
token_a_amount / token_b_amount | トークン数量 |
lp_wallet | LP ウォレットアドレス |
Solana チェーンの特徴
Slot メカニズム
Solana は従来のブロック番号ではなく Slot をタイムライン識別子として使用します:
| 特徴 | 説明 |
|---|
| ブロック間隔 | 約 400ms |
| メッセージ量 | EVM チェーンよりはるかに多い |
| 主要識別子 | Slot 番号 |
メッセージパッケージング
トランザクションは小バッチにパッケージングされ、各 Kafka メッセージには最大 250 件のトランザクションが含まれます。
Solana の shred stream では、Block Header が不完全な場合があり、Slot フィールドのみが正確であることが保証されています。他のフィールドは空または不正確な場合があります。
高スループット処理の推奨事項
Solana の非常に高いスループットのため、以下を推奨します:
- 十分な処理能力:コンシューマーに十分な CPU とメモリを確保
- 並行処理:マルチスレッド/コルーチンで並行メッセージ処理
- 効率的な解析:Protobuf 解析コードの最適化
- バッチ書き込み:永続化が必要な場合はデータベースへのバッチ書き込み
Topic → メッセージタイプ マッピング
| Topic | Proto File | メッセージタイプ | 説明 |
|---|
sol.dex.trades | trade_event.proto | TradeEvents | DEX 取引イベント |
sol.dex.trades.processed | trade_event.proto | TradeEvents | USD 価格、疑わしいフラグ付き |
sol.tokens | token_event.proto | TokenEvents | トークンイベント |
sol.tokens.processed | token_event.proto | TokenEvents | 説明、画像、SNS リンク付き |
sol.balances | balance_event.proto | BalanceEvents | 残高変動イベント |
sol.transfers | solana/transfer_event.proto | TransferEvents | Solana 送金イベント |
sol.transfers.processed | solana/transfer_processed_event.proto | TransferProcessedEvents | USD 価値付き |
sol.dex.pools | dex_pool_event.proto | DexPoolEvents | 流動性プールイベント |
sol.candlesticks | candlestick.proto | CandlestickEvents | ローソク足データ |
コード例
Python 例:Solana DEX 取引を消費
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'sol.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-solana-consumer'
)
# Consume and parse messages
for message in consumer:
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Slot: {event.block.slot}")
print(f"DEX Program: {event.d_app.program_address}")
print("---")
Go 例:高性能コンシューム
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"google.golang.org/protobuf/proto"
common "github.com/chainstream-io/streaming_protobuf/common/messages"
)
func main() {
mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"<your_broker_address>"},
Topic: "sol.dex.trades",
GroupID: "my-solana-consumer",
Dialer: &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
},
})
defer reader.Close()
// Use worker pool for parallel processing
messages := make(chan kafka.Message, 100)
// Start workers
for i := 0; i < 10; i++ {
go func() {
for msg := range messages {
var tradeEvents common.TradeEvents
if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
log.Printf("Failed to parse: %v", err)
continue
}
// Process trades
for _, event := range tradeEvents.Events {
log.Printf("Pool: %s, Token A: %s, Token B: %s",
event.Trade.PoolAddress,
event.Trade.TokenAAddress,
event.Trade.TokenBAddress)
log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
event.Trade.UserAAmount,
event.Trade.UserBAmount,
event.Block.Slot)
}
}
}()
}
// Read messages
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Error reading: %v", err)
continue
}
messages <- msg
}
}
関連ドキュメント
コンセプトと統合ガイド
Kafka Streams 統合の基礎
EVM Streams
EVM チェーンデータストリーム
TRON Streams
TRON ネットワークデータストリーム
WebSocket リアルタイムデータ
WebSocket 統合