跳转到主要内容
ChainStream 提供强大的实时数据流处理能力,让开发者能够即时接收链上事件、交易和状态变化。本文档介绍 WebSocket 连接、订阅机制和最佳实践。

连接方式

WebSocket 端点

wss://realtime-dex.chainstream.io/connection/websocket

连接认证

建立连接时需要在 URL 中提供 Access Token:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK 已内置连接和认证处理,直接调用订阅方法即可:
import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';

const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);

// 直接订阅,SDK 自动处理连接和认证
client.stream.subscribeTokenCandles({
  chain: 'sol',
  tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
  resolution: Resolution._1m,
  callback: (data) => {
    console.log('收到数据:', data);
  }
});
SDK 会自动检测连接状态,未连接时自动建立连接,无需手动调用 connect()

连接响应

认证成功后会收到如下响应:
{
  "id": 1,
  "connect": {
    "client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
    "version": "0.0.0 OSS",
    "expires": true,
    "ttl": 86002,
    "ping": 25,
    "pong": true
  }
}
字段说明
client客户端唯一标识
ttlToken 剩余有效时间(秒)
ping心跳间隔(秒)
pong是否支持 pong 响应

订阅类型

ChainStream WebSocket 支持多种数据订阅类型:
类别订阅频道前缀说明
K线数据dex-candle:代币价格 K 线
代币统计dex-token-stats:代币市场统计
持有者统计dex-token-holders-stats:代币持有者分布
新代币dex-token-new:新上线代币
代币交易dex-token-trade:代币交易记录
钱包余额dex-wallet-balance:钱包资产变化
钱包交易dex-wallet-trade:钱包交易记录
排名数据dex-ranking-token-stats:代币排名统计
流动池dex-dex-pool-balance:DEX 流动池数据
完整的订阅类型、参数说明和响应格式请参考 WebSocket API 文档

订阅格式示例

// K线数据订阅
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 2
}));

// 代币统计订阅
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
  },
  id: 3
}));

// 新代币订阅
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-new:sol'
  },
  id: 4
}));

取消订阅

ws.send(JSON.stringify({
  unsubscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 5
}));

消息格式

请求消息

Connect 消息(认证):
{
  "connect": {
    "token": "YOUR_ACCESS_TOKEN",
    "name": "client_name"
  },
  "id": 1
}
Subscribe 消息(订阅):
{
  "subscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 2
}
Unsubscribe 消息(取消订阅):
{
  "unsubscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 3
}

响应消息

订阅确认:
{
  "id": 2,
  "subscribe": {}
}
数据推送:
{
  "push": {
    "channel": "dex-candle:sol_xxx_1m",
    "pub": {
      "data": {
        "o": 0.001234,
        "c": 0.001256,
        "h": 0.001280,
        "l": 0.001200,
        "v": 1234567,
        "t": 1706745600
      }
    }
  }
}
错误消息:
{
  "id": 2,
  "error": {
    "code": 100,
    "message": "invalid channel"
  }
}

心跳保活

WebSocket 连接需要定期发送心跳消息以保持活跃。根据 connect 响应中的 ping 字段(通常为 25 秒),在此间隔内发送心跳:
// 心跳消息
ws.send(JSON.stringify({}));

// 或者发送 ping
ws.send(JSON.stringify({ ping: {} }));
如果在指定时间内(通常为 ping 间隔的 3 倍)未发送任何消息,服务器将主动断开连接。

完整示例

const WebSocket = require('ws');

class ChainStreamWebSocket {
  constructor(accessToken) {
    this.accessToken = accessToken;
    this.ws = null;
    this.messageId = 0;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.subscriptions = new Set();
    this.pingInterval = null;
  }

  connect() {
    const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('WebSocket 连接已建立');
      this.reconnectAttempts = 0;
      
      // 发送 connect 消息
      this.send({
        connect: {
          token: this.accessToken,
          name: 'nodejs'
        }
      });
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onclose = (event) => {
      console.log(`连接关闭: ${event.code}`);
      this.stopPing();
      
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket 错误:', error.message);
    };
  }

  handleMessage(data) {
    // 处理 connect 响应
    if (data.connect) {
      console.log('✅ 认证成功');
      this.startPing(data.connect.ping || 25);
      this.resubscribe();
      return;
    }

    // 处理数据推送
    if (data.push) {
      console.log(`[${data.push.channel}]`, data.push.pub.data);
      return;
    }

    // 处理错误
    if (data.error) {
      console.error('错误:', data.error.message);
      return;
    }
  }

  send(message) {
    message.id = ++this.messageId;
    this.ws.send(JSON.stringify(message));
  }

  subscribe(channel) {
    this.subscriptions.add(channel);
    this.send({ subscribe: { channel } });
  }

  unsubscribe(channel) {
    this.subscriptions.delete(channel);
    this.send({ unsubscribe: { channel } });
  }

  resubscribe() {
    this.subscriptions.forEach(channel => {
      this.send({ subscribe: { channel } });
    });
  }

  startPing(interval) {
    this.pingInterval = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send('{}');
      }
    }, interval * 1000);
  }

  stopPing() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  scheduleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('❌ 达到最大重连次数');
      return;
    }

    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    console.log(`⏳ ${delay}ms 后重连... (第 ${this.reconnectAttempts + 1} 次)`);
    this.reconnectAttempts++;

    setTimeout(() => this.connect(), delay);
  }

  close() {
    this.stopPing();
    if (this.ws) {
      this.ws.close(1000, 'Normal closure');
    }
  }
}

// 使用示例
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();

// 等待连接成功后订阅
setTimeout(() => {
  client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
  client.subscribe('dex-token-new:sol');
}, 1000);

最佳实践

性能优化

使用过滤条件

只订阅需要的数据,减少带宽消耗。使用 CEL 表达式过滤数据。

批量处理

对高频数据进行批量处理而非逐条处理,使用消息队列缓冲。

本地缓存

缓存 Token 信息等静态数据,减少重复处理。

连接复用

单个连接可订阅多个频道,避免创建多个连接。

错误处理

  1. 监听错误事件 — 及时处理连接错误和数据错误
  2. 实现重试机制 — 使用指数退避策略进行重连
  3. 日志记录 — 记录关键事件便于问题排查
  4. 优雅降级 — WebSocket 不可用时切换到轮询

资源管理

// ✅ 及时取消不需要的订阅
client.unsubscribe('dex-candle:sol_xxx_1m');

// ✅ 优雅关闭连接
function gracefulClose() {
  // 1. 停止心跳
  client.stopPing();
  
  // 2. 关闭连接
  client.close();
}

相关文档