跳轉到主要內容
ChainStream 提供強大的實時資料流處理能力,讓開發者能夠即時接收鏈上事件、交易和狀態變化。本文件介紹 WebSocket 連線、訂閱機制和最佳實踐。

連線方式

WebSocket 端點

wss://realtime-dex.chainstream.io/connection/websocket

連線認證

建立連線時需要在 URL 中提供 Access Token:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK 已內建連線和認證處理,直接呼叫訂閱方法即可:
import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';

const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);

// 直接订阅,SDK 自动处理连接和认证
client.stream.subscribeTokenCandles({
  chain: 'sol',
  tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
  resolution: Resolution._1m,
  callback: (data) => {
    console.log('收到数据:', data);
  }
});
SDK 會自動檢測連線狀態,未連線時自動建立連線,無需手動呼叫 connect()

連線響應

認證成功後會收到如下響應:
{
  "id": 1,
  "connect": {
    "client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
    "version": "0.0.0 OSS",
    "expires": true,
    "ttl": 86002,
    "ping": 25,
    "pong": true
  }
}
欄位說明
client客戶端唯一標識
ttlToken 剩餘有效時間(秒)
ping心跳間隔(秒)
pong是否支援 pong 響應

訂閱型別

ChainStream WebSocket 支援多種資料訂閱型別:
類別訂閱頻道字首說明
K線資料dex-candle:代幣價格 K 線
代幣統計dex-token-stats:代幣市場統計
持有者統計dex-token-holders-stats:代幣持有者分佈
新代幣dex-token-new:新上線代幣
代幣交易dex-token-trade:代幣交易記錄
錢包餘額dex-wallet-balance:錢包資產變化
錢包交易dex-wallet-trade:錢包交易記錄
排名資料dex-ranking-token-stats:代幣排名統計
流動池dex-dex-pool-balance:DEX 流動池資料
完整的訂閱型別、引數說明和響應格式請參考 WebSocket API 文件

訂閱格式示例

// K线数据订阅
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 2
}));

// 代币统计订阅
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
  },
  id: 3
}));

// 新代币订阅
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-new:sol'
  },
  id: 4
}));

取消訂閱

ws.send(JSON.stringify({
  unsubscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 5
}));

訊息格式

請求訊息

Connect 訊息(認證):
{
  "connect": {
    "token": "YOUR_ACCESS_TOKEN",
    "name": "client_name"
  },
  "id": 1
}
Subscribe 訊息(訂閱):
{
  "subscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 2
}
Unsubscribe 訊息(取消訂閱):
{
  "unsubscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 3
}

響應訊息

訂閱確認:
{
  "id": 2,
  "subscribe": {}
}
資料推送:
{
  "push": {
    "channel": "dex-candle:sol_xxx_1m",
    "pub": {
      "data": {
        "o": 0.001234,
        "c": 0.001256,
        "h": 0.001280,
        "l": 0.001200,
        "v": 1234567,
        "t": 1706745600
      }
    }
  }
}
錯誤訊息:
{
  "id": 2,
  "error": {
    "code": 100,
    "message": "invalid channel"
  }
}

心跳保活

WebSocket 連線需要定期傳送心跳訊息以保持活躍。根據 connect 響應中的 ping 欄位(通常為 25 秒),在此間隔內傳送心跳:
// 心跳消息
ws.send(JSON.stringify({}));

// 或者发送 ping
ws.send(JSON.stringify({ ping: {} }));
如果在指定時間內(通常為 ping 間隔的 3 倍)未傳送任何訊息,伺服器將主動斷開連線。

完整示例

const WebSocket = require('ws');

class ChainStreamWebSocket {
  constructor(accessToken) {
    this.accessToken = accessToken;
    this.ws = null;
    this.messageId = 0;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.subscriptions = new Set();
    this.pingInterval = null;
  }

  connect() {
    const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('WebSocket 连接已建立');
      this.reconnectAttempts = 0;
      
      // 发送 connect 消息
      this.send({
        connect: {
          token: this.accessToken,
          name: 'nodejs'
        }
      });
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onclose = (event) => {
      console.log(`连接关闭: ${event.code}`);
      this.stopPing();
      
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket 错误:', error.message);
    };
  }

  handleMessage(data) {
    // 处理 connect 响应
    if (data.connect) {
      console.log('✅ 认证成功');
      this.startPing(data.connect.ping || 25);
      this.resubscribe();
      return;
    }

    // 处理数据推送
    if (data.push) {
      console.log(`[${data.push.channel}]`, data.push.pub.data);
      return;
    }

    // 处理错误
    if (data.error) {
      console.error('错误:', data.error.message);
      return;
    }
  }

  send(message) {
    message.id = ++this.messageId;
    this.ws.send(JSON.stringify(message));
  }

  subscribe(channel) {
    this.subscriptions.add(channel);
    this.send({ subscribe: { channel } });
  }

  unsubscribe(channel) {
    this.subscriptions.delete(channel);
    this.send({ unsubscribe: { channel } });
  }

  resubscribe() {
    this.subscriptions.forEach(channel => {
      this.send({ subscribe: { channel } });
    });
  }

  startPing(interval) {
    this.pingInterval = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send('{}');
      }
    }, interval * 1000);
  }

  stopPing() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  scheduleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('❌ 达到最大重连次数');
      return;
    }

    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    console.log(`⏳ ${delay}ms 后重连... (第 ${this.reconnectAttempts + 1} 次)`);
    this.reconnectAttempts++;

    setTimeout(() => this.connect(), delay);
  }

  close() {
    this.stopPing();
    if (this.ws) {
      this.ws.close(1000, 'Normal closure');
    }
  }
}

// 使用示例
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();

// 等待连接成功后订阅
setTimeout(() => {
  client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
  client.subscribe('dex-token-new:sol');
}, 1000);

最佳實踐

效能最佳化

使用過濾條件

只訂閱需要的資料,減少頻寬消耗。使用 CEL 表示式過濾資料。

批次處理

對高頻資料進行批次處理而非逐條處理,使用訊息佇列緩衝。

本地快取

快取 Token 資訊等靜態資料,減少重複處理。

連線複用

單個連線可訂閱多個頻道,避免建立多個連線。

錯誤處理

  1. 監聽錯誤事件 — 及時處理連線錯誤和資料錯誤
  2. 實現重試機制 — 使用指數退避策略進行重連
  3. 日誌記錄 — 記錄關鍵事件便於問題排查
  4. 優雅降級 — WebSocket 不可用時切換到輪詢

資源管理

// ✅ 及时取消不需要的订阅
client.unsubscribe('dex-candle:sol_xxx_1m');

// ✅ 优雅关闭连接
function gracefulClose() {
  // 1. 停止心跳
  client.stopPing();
  
  // 2. 关闭连接
  client.close();
}

相關文件

WebSocket API 參考

完整的訂閱型別和引數說明

價格預警機器人

實戰:構建價格監控 Bot