メインコンテンツへスキップ
ChainStream は Kafka Streams を通じてマルチチェーンのリアルタイムオンチェーンデータストリームを提供します。GraphQL Subscriptions や WebSocket と比較して、Kafka Streams はレイテンシに敏感で高信頼性が求められるサーバーサイドアプリケーションシナリオ向けに設計されており、より低いレイテンシと高い耐障害性を備えたデータ消費を提供します。

Protobuf スキーマリポジトリ

公式 ChainStream Protobuf スキーマ定義。Go と Python をサポートし、EVM、Solana、TRON のすべてのメッセージタイプを含みます。

サポートマトリクス

チェーンdex.tradestokensbalancesdex.poolstransferscandlesticks
Ethereum (eth)
BSC (bsc)
Solana (sol)
TRON (tron)
すべてのチェーンで token-suppliestoken-pricestoken-holdingstoken-market-capstrade-stats Topic もサポートしています。詳細は完全な Topic リストを参照してください。

Kafka Streams vs WebSocket 選択ガイド

Kafka Streams を選ぶべきとき

レイテンシ重視

レイテンシが最重要課題で、アプリケーションがクラウドまたは専用サーバーにデプロイされている場合

メッセージの信頼性

メッセージの欠落が許容できず、耐久性があり信頼性の高いデータ消費が必要な場合

複雑な処理

前処理機能を超える複雑な計算、フィルタリング、フォーマットが必要な場合

水平スケーリング

消費能力のためにマルチインスタンスの水平スケーリングが必要な場合

WebSocket を選ぶべきとき

高速プロトタイピング

プロトタイプの構築で、開発速度が最優先の場合

統一インターフェース

アプリケーションが履歴データとリアルタイムデータの両方を統一されたクエリ・サブスクリプションインターフェースで必要とする場合

ブラウザサイド

アプリケーションがブラウザで直接データを消費する場合(Kafka Streams はサーバーサイドのみサポート)

動的フィルタリング

ページコンテンツに基づいてデータを動的にフィルタリングする必要がある場合

比較まとめ

機能Kafka StreamsWebSocket
レイテンシ最低
信頼性永続的、メッセージ欠落なし切断時に欠落の可能性
スケーラビリティネイティブな水平スケーリング追加設計が必要
データフィルタリングクライアント側処理サーバー側事前フィルタリング
クライアントサポートサーバーサイドのみサーバー + ブラウザ
統合の複雑さより高いより低い

認証情報の取得

Kafka Streams は独立した認証情報を使用し、ChainStream チームに連絡してアクセスを申請する必要があります。
1

申請連絡

support@chainstream.io にメールを送信して Kafka Streams へのアクセスを申請
2

認証情報の受領

承認後、以下の認証情報を受け取ります:
  • ユーザー名
  • パスワード
  • ブローカーアドレスリスト
3

接続の設定

受け取った認証情報を使用して Kafka クライアントの接続を設定

接続設定

ブローカーアドレス

ブローカーアドレスは申請承認後に認証情報と一緒に提供されます。未許可のアドレスでの接続は行わないでください。

SASL_SSL 接続設定

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='your_group_id'
)

Topic 命名規則と完全リスト

命名規則

Topic は以下の命名パターンに従います:
{chain}.{message_type}              # 生イベントデータ
{chain}.{message_type}.processed    # 処理済みデータ(価格、フラグ、エンリッチメント付き)
{chain}.{message_type}.created      # 作成イベント(例:トークン作成)
{chain} には solbscethtron が含まれます。

メッセージタイプ

タイプ説明
dex.tradesDEX 取引イベント
dex.pools流動性プールイベント
tokensトークンイベント
balances残高変更イベント
transfers送金イベント
token-suppliesトークン供給イベント
token-pricesトークン価格イベント
token-holdingsトークン保有データ
token-market-capsトークン時価総額イベント
candlesticksOHLCV ローソク足データ
trade-stats取引統計

完全な Topic リスト

以下の Topic はすべてのサポートチェーンに適用されます({chain}solbsceth に置き換え):
# DEX 取引
{chain}.dex.trades
{chain}.dex.trades.processed    # USD/ネイティブ価格、不審フラグ付き

# トークンイベント
{chain}.tokens
{chain}.tokens.created          # トークン作成イベント
{chain}.tokens.processed        # 説明、画像 URL、SNS リンク付き

# 残高変更
{chain}.balances
{chain}.balances.processed      # USD/ネイティブ価値付き

# 流動性プール
{chain}.dex.pools
{chain}.dex.pools.processed     # 流動性 USD/ネイティブ価値付き

# トークンデータ
{chain}.token-supplies
{chain}.token-supplies.processed
{chain}.token-prices
{chain}.token-holdings
{chain}.token-market-caps.processed

# 集計データ
{chain}.candlesticks            # OHLCV ローソク足データ
{chain}.trade-stats             # 取引統計
完全な Protobuf スキーマと Topic マッピングについては、streaming_protobuf リポジトリを参照してください。

消費モードとオフセット管理

Topic をサブスクライブする際に考慮すべき 2 つのコア設定:

オフセット戦略の選択

コンシューマーは Kafka に接続後、どこからメッセージの読み取りを開始するかを決定する必要があります。2 つの一般的な戦略:
各接続時に現在の最新位置から開始。リアルタイムデータのみを気にするシナリオに適しています。再接続時に過去のメッセージのリプレイはありません。
{
  autoCommit: false,
  fromBeginning: false,
  'auto.offset.reset': 'latest'
}

Group ID ルール

同じ Group ID で複数インスタンスをデプロイすると、フェイルオーバーと負荷分散が可能になります — 同じ Topic のメッセージは Group 内の 1 つのインスタンスのみが消費し、Kafka がインスタンス間でパーティションを自動分配します。
異なる Topic は異なるメッセージパースロジックを持つため、各 Topic に独立したコンシューマーを持つことを推奨します。

クイックスタート:5 分で最初のコンシューマー

以下の例は、eth.dex.trades Topic を消費して DEX 取引データをパースする方法を示しています。
1

Protobuf スキーマの取得

公式リポジトリからスキーマ定義をクローン:
git clone https://github.com/chainstream-io/streaming_protobuf.git
またはプロジェクトに Git サブモジュールとして追加:
git submodule add https://github.com/chainstream-io/streaming_protobuf.git
2

依存関係のインストール

pip install kafka-python protobuf
3

設定と消費

from kafka import KafkaConsumer
from common import trade_event_pb2  # streaming_protobuf リポジトリから取得

# コンシューマーを作成
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# メッセージを消費
for message in consumer:
    # protobuf メッセージをパース
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # DEX 取引情報を表示
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}")
        print("---")

コアデータ構造

すべてのメッセージタイプは以下のベース構造を共有しています(common/common.proto で定義):

ベース構造

ブロック情報:
フィールド説明
timestampint64ブロックタイムスタンプ
hashstringブロックハッシュ
heightuint64ブロック高さ
slotuint64スロット番号(Solana)

主要メッセージタイプ

Topic: {chain}.dex.trades
message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // processed topic に含まれる
}
Trade コアフィールド:
フィールド説明
token_a_address / token_b_address取引ペアのトークンアドレス
user_a_amount / user_b_amountユーザー取引量
pool_addressプールアドレス
vault_a / vault_bプールのボールトアドレス
vault_a_amount / vault_b_amountボールト量
TradeProcessed 拡張フィールド(processed topic):
フィールド説明
token_a_price_in_usd / token_b_price_in_usdUSD 価格
token_a_price_in_native / token_b_price_in_nativeネイティブ通貨価格
is_token_a_price_in_usd_suspect価格が不審かどうか
is_token_a_price_in_usd_suspect_reason不審な理由
Topic: {chain}.tokens{chain}.tokens.created
message TokenEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  EventType type = 100;        // CREATED, UPDATED
  Token token = 101;
  TokenProcessed token_processed = 200;
}
Token コアフィールド:
フィールド説明
addressトークンアドレス
name / symbol名前とシンボル
decimals小数桁数
uriメタデータ URI
metadata_addressメタデータアドレス
creators作成者リスト
solana_extraSolana 固有フィールド
evm_extraEVM 固有フィールド (token_standard)
Topic: {chain}.balances
message BalanceEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Balance balance = 100;
  BalanceProcessed balance_processed = 200;
}
Balance コアフィールド:
フィールド説明
token_account_addressトークンアカウントアドレス
account_owner_addressアカウントオーナーアドレス
token_addressトークンアドレス
pre_amount / post_amount変更前/後の残高
decimals小数桁数
lifecycleアカウントライフサイクル (NEW/EXISTING/CLOSED)
Topic: {chain}.dex.pools
message DexPoolEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  DexPoolEventType type = 100;  // INITIALIZE, INCREASE_LIQUIDITY, DECREASE_LIQUIDITY, SWAP
  DexPool pool = 101;
  DexPoolProcessed pool_processed = 200;
}
DexPool コアフィールド:
フィールド説明
addressプールアドレス
token_a_address / token_b_addressトークンアドレス
token_a_vault_address / token_b_vault_addressボールトアドレス
token_a_amount / token_b_amountトークン量
lp_walletLP ウォレットアドレス
Topic: {chain}.candlesticks
フィールド説明
token_addressトークンアドレス
resolution時間解像度 (1m, 5m, 15m, 1h 等)
timestampタイムスタンプ
open / high / low / closeOHLC 価格 (USD)
open_in_native / high_in_native / low_in_native / close_in_nativeOHLC 価格 (ネイティブ)
volume / volume_in_usd / volume_in_native取引量
trades取引件数
dimensionディメンションタイプ (TOKEN_ADDRESS/POOL_ADDRESS/PAIR)
Topic: {chain}.trade-stats
フィールド説明
token_addressトークンアドレス
resolution時間解像度
buys / sells買い/売り件数
buyers / sellers買い手/売り手数
buy_volume / sell_volume買い/売り出来高
buy_volume_in_usd / sell_volume_in_usdUSD 出来高
high_in_usd / low_in_usd高値/安値
Topic: {chain}.token-holdings
フィールドグループ説明
top10_holders / top10_amount / top10_ratioトップ 10 保有者統計
top50_holders / top50_amount / top50_ratioトップ 50 保有者統計
top100_holders / top100_amount / top100_ratioトップ 100 保有者統計
holders総保有者数
creators_count / creators_amount / creators_ratio作成者保有統計
fresh_count / fresh_amount / fresh_ratioフレッシュウォレット保有統計
smart_count / smart_amount / smart_ratioスマートマネー保有統計
sniper_count / sniper_amount / sniper_ratioスナイパー保有統計
insider_count / insider_amount / insider_ratioインサイダー保有統計
Topic: {chain}.token-prices
フィールド説明
token_addressトークンアドレス
price_in_usdUSD 価格
price_in_nativeネイティブ通貨価格
Topic: {chain}.token-supplies
フィールド説明
typeイベントタイプ (INITIALIZE_MINT/MINT/BURN)
token_addressトークンアドレス
amount
decimals小数桁数
amount_with_decimals小数桁数付きの量
完全な Protobuf 定義については streaming_protobuf リポジトリを参照してください。

メッセージ特性と注意事項

Kafka Streams を消費する際、開発者は以下のメッセージ特性を認識しておく必要があります:
ストリームは事前フィルタリングを行わず、Topic 内のすべてのメッセージと完全なデータを含みます。コンシューマーには十分なネットワークスループット、サーバー性能、効率的なパースコードが必要です。
同じトークンまたは同じアカウントのメッセージは厳密にブロック順序で到着します。特定のトークンやウォレットアドレスのイベントストリームは順序が保証され、状態変更の追跡が容易です。ただし、異なるトークン/アカウント間のメッセージ到着順序は保証されません。
同じメッセージが複数回配信される場合があります。重複処理が問題を引き起こす場合、コンシューマーはキャッシュやストレージを使用した冪等処理を実装する必要があります。
ChainStream は各メッセージの完全性を保証します。メッセージが分割されることはありません。ブロックに含まれるトランザクション数に関係なく、受信するメッセージは完全なデータ単位です。
メッセージは Protobuf エンコーディングを使用し、JSON よりコンパクトです。コンシューマーは対応する言語の Protobuf ライブラリを使用してパースする必要があります。

レイテンシモデル

Kafka Streams のレイテンシは、データがパイプライン内で通過する処理ステージに依存します。同じチェーンの異なる Topic は異なるレイテンシを持ちます:

Broadcasted vs Committed

タイプ説明レイテンシデータ確実性
Broadcastedブロードキャスト段階で消費可能、ブロック確認不要最低低め
Committedブロック確認後にストリームに入る高め最高

パイプラインレイテンシ

ブロックチェーンノードから Kafka Topic までの各変換レイヤー(パーシング、構造化、エンリッチメント)がおよそ 100〜1000ms のレイテンシを追加します:
  • raw topic: 最低レイテンシ、生のノードデータに最も近い
  • transactions topic: パース・構造化済み
  • dextrades topic: 相対的に高いレイテンシだが、よりリッチなデータ
レイテンシが最優先の場合は、効果的にパースできる生データに最も近い Topic を選択してください。

ベストプラクティス

パーティション並列消費

Kafka Topic は複数のパーティションに分割されており、各パーティションの並列読み取りがスループットを最大化します。 メッセージパーティションキーはトークンアドレスまたはウォレットアドレス(全チェーンで統一)に設定されており、以下が保証されます:
  • 同じトークンのすべてのイベントが同じパーティションにルーティングされ、順序を保証
  • 同じウォレットのすべての残高変更が同じパーティションにルーティングされ、状態追跡が容易
各パーティションに独立したスレッドを割り当てて負荷分散することを推奨します。

継続的消費、メインループをブロックしない

コンシューマーの読み取りループは継続的に実行し、メッセージ処理のブロックによるバックログを避けてください。メッセージの処理が必要な場合は非同期処理モードを採用:メインループは読み取りを担当し、処理ロジックはワーカースレッドに委任します。

メッセージ処理効率

バッチ処理はオーバーヘッドを削減できますが、バッチサイズとレイテンシのバランスが必要です。Go では channel + ワーカーグループを使用した並行処理が効果的です。

チェーン別ドキュメント

EVM Streams

Ethereum、BSC、Base、Polygon、Optimism

Solana Streams

Solana 高スループットデータストリーム

TRON Streams

TRON ネットワークデータストリーム

関連ドキュメント

リアルタイムストリーミング

WebSocket リアルタイムデータ統合ガイド

認証ガイド

Access Token の取得