ChainStream は強力なリアルタイムデータストリーミング機能を提供し、開発者がオンチェーンイベント、トランザクション、状態変更を即座に受信できるようにします。このドキュメントでは、WebSocket 接続、サブスクリプションメカニズム、ベストプラクティスについて説明します。
WebSocket エンドポイント
wss://realtime-dex.chainstream.io/connection/websocket
接続確立時に URL で Access Token を提供します:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK を使用(推奨)
ネイティブ WebSocket を使用
コマンドラインテスト
SDK は接続と認証を自動的に処理します。subscribe メソッドを呼び出すだけです:import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';
const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);
// 直接サブスクライブ。SDK が接続と認証を自動処理
client.stream.subscribeTokenCandles({
chain: 'sol',
tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
resolution: Resolution._1m,
callback: (data) => {
console.log('Received data:', data);
}
});
SDK は接続状態を自動検出し、必要に応じて接続を確立します。手動で connect() を呼び出す必要はありません。
ネイティブ WebSocket を使用する場合、接続後に connect メッセージを送信して認証を完了します:const token = process.env.CHAINSTREAM_ACCESS_TOKEN;
const ws = new WebSocket(
`wss://realtime-dex.chainstream.io/connection/websocket?token=${token}`
);
ws.onopen = () => {
console.log('WebSocket connection established');
// connect メッセージを送信して認証を完了
ws.send(JSON.stringify({
connect: {
token: token,
name: 'js'
},
id: 1
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// connect レスポンスを処理
if (data.connect) {
console.log('✅ Authentication successful, client ID:', data.connect.client);
// 認証後にサブスクリプションを開始
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
}
// サブスクリプションデータを処理
if (data.push) {
console.log('Received data:', data.push.pub.data);
}
};
wscat を使用してテスト:wscat -c "wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN"
接続後に connect メッセージを送信:{"connect":{"token":"YOUR_ACCESS_TOKEN","name":"test"},"id":1}
接続レスポンス
認証成功時、以下のようなレスポンスを受信します:
{
"id": 1,
"connect": {
"client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
"version": "0.0.0 OSS",
"expires": true,
"ttl": 86002,
"ping": 25,
"pong": true
}
}
| フィールド | 説明 |
|---|
client | ユニークなクライアント識別子 |
ttl | トークンの残り有効期間(秒) |
ping | ハートビート間隔(秒) |
pong | pong レスポンスのサポート有無 |
サブスクリプションタイプ
ChainStream WebSocket は複数のデータサブスクリプションタイプをサポートしています:
| カテゴリ | チャンネルプレフィックス | 説明 |
|---|
| ローソク足 | dex-candle: | トークン価格のローソク足 |
| トークン統計 | dex-token-stats: | トークンマーケット統計 |
| 保有者統計 | dex-token-holders-stats: | トークン保有者の分布 |
| 新規トークン | dex-token-new: | 新しくローンチされたトークン |
| トークン取引 | dex-token-trade: | トークンの取引記録 |
| ウォレット残高 | dex-wallet-balance: | ウォレット資産の変動 |
| ウォレット取引 | dex-wallet-trade: | ウォレットの取引記録 |
| ランキング | dex-ranking-token-stats: | トークンランキング統計 |
| DEX プール | dex-dex-pool-balance: | DEX 流動性プールデータ |
サブスクリプション形式の例
// ローソク足データをサブスクライブ
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
// トークン統計をサブスクライブ
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
},
id: 3
}));
// 新規トークンをサブスクライブ
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-new:sol'
},
id: 4
}));
サブスクリプション解除
ws.send(JSON.stringify({
unsubscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 5
}));
メッセージ形式
リクエストメッセージ
接続メッセージ(認証):
{
"connect": {
"token": "YOUR_ACCESS_TOKEN",
"name": "client_name"
},
"id": 1
}
サブスクリプションメッセージ:
{
"subscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 2
}
サブスクリプション解除メッセージ:
{
"unsubscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 3
}
レスポンスメッセージ
サブスクリプション確認:
{
"id": 2,
"subscribe": {}
}
データプッシュ:
{
"push": {
"channel": "dex-candle:sol_xxx_1m",
"pub": {
"data": {
"o": 0.001234,
"c": 0.001256,
"h": 0.001280,
"l": 0.001200,
"v": 1234567,
"t": 1706745600
}
}
}
}
エラーメッセージ:
{
"id": 2,
"error": {
"code": 100,
"message": "invalid channel"
}
}
ハートビート
WebSocket 接続を維持するためには、定期的なハートビートメッセージが必要です。connect レスポンスの ping フィールド(通常 25 秒)に基づいて、この間隔内にハートビートを送信してください:
// ハートビートメッセージ
ws.send(JSON.stringify({}));
// または ping を送信
ws.send(JSON.stringify({ ping: {} }));
指定時間内にメッセージが送信されない場合(通常は ping 間隔の 3 倍)、サーバーが接続を切断します。
完全な例
const WebSocket = require('ws');
class ChainStreamWebSocket {
constructor(accessToken) {
this.accessToken = accessToken;
this.ws = null;
this.messageId = 0;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.subscriptions = new Set();
this.pingInterval = null;
}
connect() {
const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('WebSocket connection established');
this.reconnectAttempts = 0;
// connect メッセージを送信
this.send({
connect: {
token: this.accessToken,
name: 'nodejs'
}
});
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.ws.onclose = (event) => {
console.log(`Connection closed: ${event.code}`);
this.stopPing();
if (event.code !== 1000) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error.message);
};
}
handleMessage(data) {
if (data.connect) {
console.log('✅ Authentication successful');
this.startPing(data.connect.ping || 25);
this.resubscribe();
return;
}
if (data.push) {
console.log(`[${data.push.channel}]`, data.push.pub.data);
return;
}
if (data.error) {
console.error('Error:', data.error.message);
return;
}
}
send(message) {
message.id = ++this.messageId;
this.ws.send(JSON.stringify(message));
}
subscribe(channel) {
this.subscriptions.add(channel);
this.send({ subscribe: { channel } });
}
unsubscribe(channel) {
this.subscriptions.delete(channel);
this.send({ unsubscribe: { channel } });
}
resubscribe() {
this.subscriptions.forEach(channel => {
this.send({ subscribe: { channel } });
});
}
startPing(interval) {
this.pingInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send('{}');
}
}, interval * 1000);
}
stopPing() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('❌ Max reconnect attempts reached');
return;
}
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`⏳ Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts + 1})`);
this.reconnectAttempts++;
setTimeout(() => this.connect(), delay);
}
close() {
this.stopPing();
if (this.ws) {
this.ws.close(1000, 'Normal closure');
}
}
}
// 使用例
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();
// 接続確立後にサブスクライブ
setTimeout(() => {
client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
client.subscribe('dex-token-new:sol');
}, 1000);
ベストプラクティス
パフォーマンス最適化
フィルターの使用
必要なデータのみサブスクライブして帯域幅を削減。CEL 式を使用してデータをフィルタリングできます。
バッチ処理
高頻度データは 1 件ずつ処理するのではなく、バッチで処理。メッセージキューをバッファとして使用してください。
ローカルキャッシュ
トークン情報などの静的データをキャッシュして、重複処理を削減。
接続の再利用
1 つの接続で複数のチャンネルをサブスクライブ可能。複数の接続の作成を避けてください。
エラー処理
- エラーイベントの監視 — 接続やデータのエラーを迅速に処理
- リトライメカニズムの実装 — 指数バックオフによる再接続
- ログ記録 — トラブルシューティングのために重要なイベントを記録
- グレースフルデグラデーション — WebSocket が利用できない場合はポーリングに切り替え
リソース管理
// ✅ 不要なチャンネルはすぐにサブスクリプション解除
client.unsubscribe('dex-candle:sol_xxx_1m');
// ✅ 接続のグレースフルクローズ
function gracefulClose() {
// 1. ハートビートを停止
client.stopPing();
// 2. 接続を閉じる
client.close();
}
関連ドキュメント
WebSocket API リファレンス
完全なサブスクリプションタイプとパラメータ
価格アラートボット
ハンズオン:価格監視ボットの構築