ChainStream は強力なリアルタイムデータストリーミング機能を提供し、開発者がオンチェーンイベント、トランザクション、状態変更を即座に受信できるようにします。このドキュメントでは、WebSocket 接続、サブスクリプションメカニズム、ベストプラクティスについて説明します。
WebSocket エンドポイント
wss://realtime-dex.chainstream.io/connection/websocket
接続確立時に URL で Access Token を提供します:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK を使用(推奨)
ネイティブ WebSocket を使用
コマンドラインテスト
SDK は接続と認証を自動的に処理します。subscribe メソッドを呼び出すだけです: import { ChainStreamClient } from '@chainstream-io/sdk' ;
import { Resolution } from '@chainstream-io/sdk/openapi' ;
const client = new ChainStreamClient ( process . env . CHAINSTREAM_ACCESS_TOKEN );
// 直接サブスクライブ。SDK が接続と認証を自動処理
client . stream . subscribeTokenCandles ({
chain: 'sol' ,
tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN' ,
resolution: Resolution . _1m ,
callback : ( data ) => {
console . log ( 'Received data:' , data );
}
});
SDK は接続状態を自動検出し、必要に応じて接続を確立します。手動で connect() を呼び出す必要はありません。
ネイティブ WebSocket を使用する場合、接続後に connect メッセージを送信して認証を完了します: const token = process . env . CHAINSTREAM_ACCESS_TOKEN ;
const ws = new WebSocket (
`wss://realtime-dex.chainstream.io/connection/websocket?token= ${ token } `
);
ws . onopen = () => {
console . log ( 'WebSocket connection established' );
// connect メッセージを送信して認証を完了
ws . send ( JSON . stringify ({
connect: {
token: token ,
name: 'js'
},
id: 1
}));
};
ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
// connect レスポンスを処理
if ( data . connect ) {
console . log ( '✅ Authentication successful, client ID:' , data . connect . client );
// 認証後にサブスクリプションを開始
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
}
// サブスクリプションデータを処理
if ( data . push ) {
console . log ( 'Received data:' , data . push . pub . data );
}
};
wscat を使用してテスト:wscat -c "wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN"
接続後に connect メッセージを送信: { "connect" :{ "token" : "YOUR_ACCESS_TOKEN" , "name" : "test" }, "id" : 1 }
接続レスポンス
認証成功時、以下のようなレスポンスを受信します:
{
"id" : 1 ,
"connect" : {
"client" : "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb" ,
"version" : "0.0.0 OSS" ,
"expires" : true ,
"ttl" : 86002 ,
"ping" : 25 ,
"pong" : true
}
}
フィールド 説明 clientユニークなクライアント識別子 ttlトークンの残り有効期間(秒) pingハートビート間隔(秒) pongpong レスポンスのサポート有無
サブスクリプションタイプ
ChainStream WebSocket は複数のデータサブスクリプションタイプをサポートしています:
カテゴリ チャンネルプレフィックス 説明 ローソク足(USD) dex-candle:トークンのローソク足(トークン/プール/ペア単位) ローソク足(ネイティブ) dex-candle-in-native:上記と同じだがチェーンのネイティブ資産建ての価格 プールローソク足 dex-pool-candle: / dex-pair-candle:プール単位またはペア単位のローソク足 トークン統計 dex-token-stats:複数時間枠の取引統計(1m、5m、… 1W、1M) 保有者統計 dex-token-holding:保有者の分布と残高タグ トークンサプライ dex-token-supply:供給量・時価総額の更新 流動性 dex-token-liquidity: / dex-token-total-liquidity:最大プール/合計流動性 新規トークン dex-new-token: / dex-new-tokens: / dex-new-tokens-metadata:新規上場(単発イベント/バッチ/メタデータ) トークン取引 dex-trade:トークンアドレスでフィルタした取引 ウォレット残高 dex-wallet-balance:ウォレットのトークン残高変動 ウォレット取引 dex-wallet-trade:ウォレットアドレスでフィルタした取引 ウォレット PnL dex-wallet-token-pnl: / dex-wallet-pnl-list:トークン別/集計ウォレット PnL ランキング dex-ranking-list: / dex-ranking-token-stats-list: / dex-ranking-token-holding-list: / dex-ranking-token-supply-list: / dex-ranking-token-bounding-curve-list:ランキングメンバーシップ+トークン別統計 DEX プール dex-pool-balance:プール流動性スナップショット
完全なサブスクリプションタイプ、パラメータ、レスポンス形式については WebSocket API リファレンス を参照してください。SDK(client.stream.subscribeTokenCandles、subscribeTokenStats、subscribeTokenTrade、…)はこれらのチャンネル文字列をラップしているため、手動で組み立てる必要はありません。
サブスクリプション形式の例
// ローソク足データをサブスクライブ
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
// トークン統計をサブスクライブ
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
},
id: 3
}));
// 新規トークンをサブスクライブ
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-new-token:sol'
},
id: 4
}));
サブスクリプション解除
ws . send ( JSON . stringify ({
unsubscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 5
}));
メッセージ形式
リクエストメッセージ
接続メッセージ(認証):
{
"connect" : {
"token" : "YOUR_ACCESS_TOKEN" ,
"name" : "client_name"
},
"id" : 1
}
サブスクリプションメッセージ:
{
"subscribe" : {
"channel" : "dex-candle:sol_xxx_1m"
},
"id" : 2
}
サブスクリプション解除メッセージ:
{
"unsubscribe" : {
"channel" : "dex-candle:sol_xxx_1m"
},
"id" : 3
}
レスポンスメッセージ
サブスクリプション確認:
{
"id" : 2 ,
"subscribe" : {}
}
データプッシュ:
{
"push" : {
"channel" : "dex-candle:sol_xxx_1m" ,
"pub" : {
"data" : {
"o" : 0.001234 ,
"c" : 0.001256 ,
"h" : 0.001280 ,
"l" : 0.001200 ,
"v" : 1234567 ,
"t" : 1706745600
}
}
}
}
エラーメッセージ:
{
"id" : 2 ,
"error" : {
"code" : 100 ,
"message" : "invalid channel"
}
}
ハートビート
WebSocket 接続を維持するためには、定期的なハートビートメッセージが必要です。connect レスポンスの ping フィールド(通常 25 秒)に基づいて、この間隔内にハートビートを送信してください:
// ハートビートメッセージ
ws . send ( JSON . stringify ({}));
// または ping を送信
ws . send ( JSON . stringify ({ ping: {} }));
指定時間内にメッセージが送信されない場合(通常は ping 間隔の 3 倍)、サーバーが接続を切断します。
完全な例
const WebSocket = require ( 'ws' );
class ChainStreamWebSocket {
constructor ( accessToken ) {
this . accessToken = accessToken ;
this . ws = null ;
this . messageId = 0 ;
this . reconnectAttempts = 0 ;
this . maxReconnectAttempts = 10 ;
this . subscriptions = new Set ();
this . pingInterval = null ;
}
connect () {
const url = `wss://realtime-dex.chainstream.io/connection/websocket?token= ${ this . accessToken } ` ;
this . ws = new WebSocket ( url );
this . ws . onopen = () => {
console . log ( 'WebSocket connection established' );
this . reconnectAttempts = 0 ;
// connect メッセージを送信
this . send ({
connect: {
token: this . accessToken ,
name: 'nodejs'
}
});
};
this . ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
this . handleMessage ( data );
};
this . ws . onclose = ( event ) => {
console . log ( `Connection closed: ${ event . code } ` );
this . stopPing ();
if ( event . code !== 1000 ) {
this . scheduleReconnect ();
}
};
this . ws . onerror = ( error ) => {
console . error ( 'WebSocket error:' , error . message );
};
}
handleMessage ( data ) {
if ( data . connect ) {
console . log ( '✅ Authentication successful' );
this . startPing ( data . connect . ping || 25 );
this . resubscribe ();
return ;
}
if ( data . push ) {
console . log ( `[ ${ data . push . channel } ]` , data . push . pub . data );
return ;
}
if ( data . error ) {
console . error ( 'Error:' , data . error . message );
return ;
}
}
send ( message ) {
message . id = ++ this . messageId ;
this . ws . send ( JSON . stringify ( message ));
}
subscribe ( channel ) {
this . subscriptions . add ( channel );
this . send ({ subscribe: { channel } });
}
unsubscribe ( channel ) {
this . subscriptions . delete ( channel );
this . send ({ unsubscribe: { channel } });
}
resubscribe () {
this . subscriptions . forEach ( channel => {
this . send ({ subscribe: { channel } });
});
}
startPing ( interval ) {
this . pingInterval = setInterval (() => {
if ( this . ws . readyState === WebSocket . OPEN ) {
this . ws . send ( '{}' );
}
}, interval * 1000 );
}
stopPing () {
if ( this . pingInterval ) {
clearInterval ( this . pingInterval );
this . pingInterval = null ;
}
}
scheduleReconnect () {
if ( this . reconnectAttempts >= this . maxReconnectAttempts ) {
console . error ( '❌ Max reconnect attempts reached' );
return ;
}
const delay = Math . min ( 1000 * Math . pow ( 2 , this . reconnectAttempts ), 30000 );
console . log ( `⏳ Reconnecting in ${ delay } ms... (attempt ${ this . reconnectAttempts + 1 } )` );
this . reconnectAttempts ++ ;
setTimeout (() => this . connect (), delay );
}
close () {
this . stopPing ();
if ( this . ws ) {
this . ws . close ( 1000 , 'Normal closure' );
}
}
}
// 使用例
const client = new ChainStreamWebSocket ( process . env . CHAINSTREAM_ACCESS_TOKEN );
client . connect ();
// 接続確立後にサブスクライブ
setTimeout (() => {
client . subscribe ( 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m' );
client . subscribe ( 'dex-new-token:sol' );
}, 1000 );
ベストプラクティス
パフォーマンス最適化
フィルターの使用 必要なデータのみサブスクライブして帯域幅を削減。CEL 式を使用してデータをフィルタリングできます。
バッチ処理 高頻度データは 1 件ずつ処理するのではなく、バッチで処理。メッセージキューをバッファとして使用してください。
ローカルキャッシュ トークン情報などの静的データをキャッシュして、重複処理を削減。
接続の再利用 1 つの接続で複数のチャンネルをサブスクライブ可能。複数の接続の作成を避けてください。
エラー処理
エラーイベントの監視 — 接続やデータのエラーを迅速に処理
リトライメカニズムの実装 — 指数バックオフによる再接続
ログ記録 — トラブルシューティングのために重要なイベントを記録
グレースフルデグラデーション — WebSocket が利用できない場合はポーリングに切り替え
リソース管理
// ✅ 不要なチャンネルはすぐにサブスクリプション解除
client . unsubscribe ( 'dex-candle:sol_xxx_1m' );
// ✅ 接続のグレースフルクローズ
function gracefulClose () {
// 1. ハートビートを停止
client . stopPing ();
// 2. 接続を閉じる
client . close ();
}
関連ドキュメント
WebSocket API リファレンス 完全なサブスクリプションタイプとパラメータ
価格アラートボット ハンズオン:価格監視ボットの構築