メインコンテンツへスキップ
ChainStream は強力なリアルタイムデータストリーミング機能を提供し、開発者がオンチェーンイベント、トランザクション、状態変更を即座に受信できるようにします。このドキュメントでは、WebSocket 接続、サブスクリプションメカニズム、ベストプラクティスについて説明します。

接続

WebSocket エンドポイント

wss://realtime-dex.chainstream.io/connection/websocket

認証

接続確立時に URL で Access Token を提供します:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK は接続と認証を自動的に処理します。subscribe メソッドを呼び出すだけです:
import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';

const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);

// 直接サブスクライブ。SDK が接続と認証を自動処理
client.stream.subscribeTokenCandles({
  chain: 'sol',
  tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
  resolution: Resolution._1m,
  callback: (data) => {
    console.log('Received data:', data);
  }
});
SDK は接続状態を自動検出し、必要に応じて接続を確立します。手動で connect() を呼び出す必要はありません。

接続レスポンス

認証成功時、以下のようなレスポンスを受信します:
{
  "id": 1,
  "connect": {
    "client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
    "version": "0.0.0 OSS",
    "expires": true,
    "ttl": 86002,
    "ping": 25,
    "pong": true
  }
}
フィールド説明
clientユニークなクライアント識別子
ttlトークンの残り有効期間(秒)
pingハートビート間隔(秒)
pongpong レスポンスのサポート有無

サブスクリプションタイプ

ChainStream WebSocket は複数のデータサブスクリプションタイプをサポートしています:
カテゴリチャンネルプレフィックス説明
ローソク足dex-candle:トークン価格のローソク足
トークン統計dex-token-stats:トークンマーケット統計
保有者統計dex-token-holders-stats:トークン保有者の分布
新規トークンdex-token-new:新しくローンチされたトークン
トークン取引dex-token-trade:トークンの取引記録
ウォレット残高dex-wallet-balance:ウォレット資産の変動
ウォレット取引dex-wallet-trade:ウォレットの取引記録
ランキングdex-ranking-token-stats:トークンランキング統計
DEX プールdex-dex-pool-balance:DEX 流動性プールデータ
完全なサブスクリプションタイプ、パラメータ、レスポンス形式については WebSocket API ドキュメントを参照してください。

サブスクリプション形式の例

// ローソク足データをサブスクライブ
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 2
}));

// トークン統計をサブスクライブ
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
  },
  id: 3
}));

// 新規トークンをサブスクライブ
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-new:sol'
  },
  id: 4
}));

サブスクリプション解除

ws.send(JSON.stringify({
  unsubscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 5
}));

メッセージ形式

リクエストメッセージ

接続メッセージ(認証):
{
  "connect": {
    "token": "YOUR_ACCESS_TOKEN",
    "name": "client_name"
  },
  "id": 1
}
サブスクリプションメッセージ:
{
  "subscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 2
}
サブスクリプション解除メッセージ:
{
  "unsubscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 3
}

レスポンスメッセージ

サブスクリプション確認:
{
  "id": 2,
  "subscribe": {}
}
データプッシュ:
{
  "push": {
    "channel": "dex-candle:sol_xxx_1m",
    "pub": {
      "data": {
        "o": 0.001234,
        "c": 0.001256,
        "h": 0.001280,
        "l": 0.001200,
        "v": 1234567,
        "t": 1706745600
      }
    }
  }
}
エラーメッセージ:
{
  "id": 2,
  "error": {
    "code": 100,
    "message": "invalid channel"
  }
}

ハートビート

WebSocket 接続を維持するためには、定期的なハートビートメッセージが必要です。connect レスポンスの ping フィールド(通常 25 秒)に基づいて、この間隔内にハートビートを送信してください:
// ハートビートメッセージ
ws.send(JSON.stringify({}));

// または ping を送信
ws.send(JSON.stringify({ ping: {} }));
指定時間内にメッセージが送信されない場合(通常は ping 間隔の 3 倍)、サーバーが接続を切断します。

完全な例

const WebSocket = require('ws');

class ChainStreamWebSocket {
  constructor(accessToken) {
    this.accessToken = accessToken;
    this.ws = null;
    this.messageId = 0;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.subscriptions = new Set();
    this.pingInterval = null;
  }

  connect() {
    const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('WebSocket connection established');
      this.reconnectAttempts = 0;
      
      // connect メッセージを送信
      this.send({
        connect: {
          token: this.accessToken,
          name: 'nodejs'
        }
      });
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onclose = (event) => {
      console.log(`Connection closed: ${event.code}`);
      this.stopPing();
      
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error.message);
    };
  }

  handleMessage(data) {
    if (data.connect) {
      console.log('✅ Authentication successful');
      this.startPing(data.connect.ping || 25);
      this.resubscribe();
      return;
    }

    if (data.push) {
      console.log(`[${data.push.channel}]`, data.push.pub.data);
      return;
    }

    if (data.error) {
      console.error('Error:', data.error.message);
      return;
    }
  }

  send(message) {
    message.id = ++this.messageId;
    this.ws.send(JSON.stringify(message));
  }

  subscribe(channel) {
    this.subscriptions.add(channel);
    this.send({ subscribe: { channel } });
  }

  unsubscribe(channel) {
    this.subscriptions.delete(channel);
    this.send({ unsubscribe: { channel } });
  }

  resubscribe() {
    this.subscriptions.forEach(channel => {
      this.send({ subscribe: { channel } });
    });
  }

  startPing(interval) {
    this.pingInterval = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send('{}');
      }
    }, interval * 1000);
  }

  stopPing() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  scheduleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('❌ Max reconnect attempts reached');
      return;
    }

    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    console.log(`⏳ Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts + 1})`);
    this.reconnectAttempts++;

    setTimeout(() => this.connect(), delay);
  }

  close() {
    this.stopPing();
    if (this.ws) {
      this.ws.close(1000, 'Normal closure');
    }
  }
}

// 使用例
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();

// 接続確立後にサブスクライブ
setTimeout(() => {
  client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
  client.subscribe('dex-token-new:sol');
}, 1000);

ベストプラクティス

パフォーマンス最適化

フィルターの使用

必要なデータのみサブスクライブして帯域幅を削減。CEL 式を使用してデータをフィルタリングできます。

バッチ処理

高頻度データは 1 件ずつ処理するのではなく、バッチで処理。メッセージキューをバッファとして使用してください。

ローカルキャッシュ

トークン情報などの静的データをキャッシュして、重複処理を削減。

接続の再利用

1 つの接続で複数のチャンネルをサブスクライブ可能。複数の接続の作成を避けてください。

エラー処理

  1. エラーイベントの監視 — 接続やデータのエラーを迅速に処理
  2. リトライメカニズムの実装 — 指数バックオフによる再接続
  3. ログ記録 — トラブルシューティングのために重要なイベントを記録
  4. グレースフルデグラデーション — WebSocket が利用できない場合はポーリングに切り替え

リソース管理

// ✅ 不要なチャンネルはすぐにサブスクリプション解除
client.unsubscribe('dex-candle:sol_xxx_1m');

// ✅ 接続のグレースフルクローズ
function gracefulClose() {
  // 1. ハートビートを停止
  client.stopPing();
  
  // 2. 接続を閉じる
  client.close();
}

関連ドキュメント

WebSocket API リファレンス

完全なサブスクリプションタイプとパラメータ

価格アラートボット

ハンズオン:価格監視ボットの構築