メインコンテンツへスキップ
Solana は 400ms のブロック時間を目標とし、実際のスループットは約 4,000 TPS、理論的なピークは 65,000 TPS です。非常に高いメッセージ量により、コンシューマーの処理能力に高い要求が求められます。 Schema リポジトリgithub.com/chainstream-io/streaming_protobuf/solana

メッセージタイプ一覧

Solana Streams は以下のメッセージタイプを提供します:
メッセージタイプ説明Topic
TradeEventsDEX 取引イベントsol.dex.trades
TokenEventsトークンイベントsol.tokens
BalanceEvents残高変動イベントsol.balances
DexPoolEvents流動性プールイベントsol.dex.pools
TransferEvents送金イベントsol.transfers
CandlestickEventsローソク足データsol.candlesticks

ブロックレベルデータ

Solana は従来のブロック番号ではなく Slot をタイムライン識別子として使用します。
フィールド説明
Slotuint64Slot 番号(主要識別子)
BlockHeightuint64ブロック高さ
BlockHashbytesブロックハッシュ
ParentSlotuint64親 Slot
TimestampTimestampブロック時刻
Solana の shred stream では、Block Header が不完全な場合があり、Slot フィールドのみが正確であることが保証されています。

トランザクションレベルデータ

トランザクションコアフィールド

フィールド説明
Signaturebytesトランザクション署名(一意の識別子)
StatusStatus実行ステータス
HeaderTransactionHeaderメタデータ(手数料と署名者を含む)
Indexuint32ブロック内の位置

Instructions

トランザクションには複数の Instruction が含まれ、これは Solana の実行モデルのコアです:
フィールド説明
ProgramAccountIndex呼び出されたプログラム
Dataエンコードされた命令データ
AccountIndexes参照されたアカウント
BalanceUpdatesこの命令によって発生した SOL 残高変動
1 つの Solana トランザクションには複数の Instruction を含めることができ、各 Instruction は異なるプログラムを呼び出します。

送金データ

TransferEvents は Solana の送金情報を提供します(Topic: sol.transfers)。

TransferEvent 構造

message TransferEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Transfer transfer = 100;
  TransferProcessed transfer_processed = 200;  // included in processed topic
}

Transfer コアフィールド

フィールド説明
sender_token_account_addressstring送信者トークンアカウントアドレス
sender_account_owner_addressstring送信者アカウントオーナーアドレス
receiver_token_account_addressstring受信者トークンアカウントアドレス
receiver_account_owner_addressstring受信者アカウントオーナーアドレス
token_addressstringトークンアドレス(Mint)
token_amountstring送金額

トークンメタデータ(TokenEvent)

トークンイベントは共通の TokenEvents(Topic: sol.tokens)を使用し、以下を含みます:
フィールド説明
addressトークンアドレス
name / symbol名前とシンボル
decimals精度
uriメタデータ URI
metadata_addressメタデータアドレス
creators作成者リスト(アドレス、検証ステータス、分配比率)

SolanaExtra 固有フィールド

Solana トークンには追加のメタデータフィールドが含まれます:
フィールド説明
collection_addressコレクションアドレス(NFT)
fungibleファンジブルか
is_mutable変更可能か
is_nativeネイティブ SOL か
program_addressプログラムアドレス
seller_fee_basis_pointsクリエイターロイヤリティ基点(NFT)
token_standardトークン標準
mint_authority / freeze_authority / update_authority権限アカウント
is_verified_collection検証済みコレクションか

Balance Updates 階層メカニズム(Solana 固有)

これは Solana ストリームデータの重要な特徴です。残高更新は 2 つのレベルで提供されます:
各 instruction 実行後の即時残高変動で、単一ステップ操作の直接的な影響を反映します。
Transaction
└── Instruction 1
    └── BalanceUpdate: +100 SOL
└── Instruction 2
    └── BalanceUpdate: -50 SOL
└── Instruction 3
    └── BalanceUpdate: +25 SOL
これにより、開発者はきめ細かな資金フローの追跡と最終状態の取得の両方が可能になります。

DEX データ

TradeEvents は DEX 取引データを提供します(Topic: sol.dex.trades)。共通の TradeEvent 構造を使用します。

Trade コアフィールド

フィールド説明
token_a_address / token_b_address取引ペアトークンアドレス
user_a_amount / user_b_amountユーザー取引数量
pool_addressプールアドレス
vault_a / vault_bプール Vault アドレス
vault_a_amount / vault_b_amountVault 数量
bonding_curveボンディングカーブ情報(該当する場合)

DApp 情報

フィールド説明
program_addressDEX プログラムアドレス(例:Raydium、Orca)
inner_program_address内部プログラムアドレス
chainチェーン識別子(CHAIN_SOLANA)

DexPoolEvent - 流動性プール

フィールド説明
typeイベントタイプ(INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
addressプールアドレス
token_a_address / token_b_addressトークンアドレス
token_a_amount / token_b_amountトークン数量
lp_walletLP ウォレットアドレス

Solana チェーンの特徴

Slot メカニズム

Solana は従来のブロック番号ではなく Slot をタイムライン識別子として使用します:
特徴説明
ブロック間隔約 400ms
メッセージ量EVM チェーンよりはるかに多い
主要識別子Slot 番号

メッセージパッケージング

トランザクションは小バッチにパッケージングされ、各 Kafka メッセージには最大 250 件のトランザクションが含まれます。

Block Header の完全性

Solana の shred stream では、Block Header が不完全な場合があり、Slot フィールドのみが正確であることが保証されています。他のフィールドは空または不正確な場合があります。

高スループット処理の推奨事項

Solana の非常に高いスループットのため、以下を推奨します:
  1. 十分な処理能力:コンシューマーに十分な CPU とメモリを確保
  2. 並行処理:マルチスレッド/コルーチンで並行メッセージ処理
  3. 効率的な解析:Protobuf 解析コードの最適化
  4. バッチ書き込み:永続化が必要な場合はデータベースへのバッチ書き込み

Topic → メッセージタイプ マッピング

TopicProto Fileメッセージタイプ説明
sol.dex.tradestrade_event.protoTradeEventsDEX 取引イベント
sol.dex.trades.processedtrade_event.protoTradeEventsUSD 価格、疑わしいフラグ付き
sol.tokenstoken_event.protoTokenEventsトークンイベント
sol.tokens.processedtoken_event.protoTokenEvents説明、画像、SNS リンク付き
sol.balancesbalance_event.protoBalanceEvents残高変動イベント
sol.transferssolana/transfer_event.protoTransferEventsSolana 送金イベント
sol.transfers.processedsolana/transfer_processed_event.protoTransferProcessedEventsUSD 価値付き
sol.dex.poolsdex_pool_event.protoDexPoolEvents流動性プールイベント
sol.candlestickscandlestick.protoCandlestickEventsローソク足データ

コード例

Python 例:Solana DEX 取引を消費

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'sol.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-solana-consumer'
)

# Consume and parse messages
for message in consumer:
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Slot: {event.block.slot}")
        print(f"DEX Program: {event.d_app.program_address}")
        print("---")

Go 例:高性能コンシューム

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/scram"
    "google.golang.org/protobuf/proto"
    
    common "github.com/chainstream-io/streaming_protobuf/common/messages"
)

func main() {
    mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
    
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"<your_broker_address>"},
        Topic:   "sol.dex.trades",
        GroupID: "my-solana-consumer",
        Dialer: &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        },
    })
    defer reader.Close()

    // Use worker pool for parallel processing
    messages := make(chan kafka.Message, 100)
    
    // Start workers
    for i := 0; i < 10; i++ {
        go func() {
            for msg := range messages {
                var tradeEvents common.TradeEvents
                if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
                    log.Printf("Failed to parse: %v", err)
                    continue
                }
                
                // Process trades
                for _, event := range tradeEvents.Events {
                    log.Printf("Pool: %s, Token A: %s, Token B: %s",
                        event.Trade.PoolAddress,
                        event.Trade.TokenAAddress,
                        event.Trade.TokenBAddress)
                    log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
                        event.Trade.UserAAmount,
                        event.Trade.UserBAmount,
                        event.Block.Slot)
                }
            }
        }()
    }

    // Read messages
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("Error reading: %v", err)
            continue
        }
        messages <- msg
    }
}

関連ドキュメント

コンセプトと統合ガイド

Kafka Streams 統合の基礎

EVM Streams

EVM チェーンデータストリーム

TRON Streams

TRON ネットワークデータストリーム

WebSocket リアルタイムデータ

WebSocket 統合