ChainStream 提供强大的实时数据流处理能力,让开发者能够即时接收链上事件、交易和状态变化。本文档介绍 WebSocket 连接、订阅机制和最佳实践。
连接方式
WebSocket 端点
wss://realtime-dex.chainstream.io/connection/websocket
连接认证
建立连接时需要在 URL 中提供 Access Token:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
使用 SDK(推荐)
使用原生 WebSocket
命令行测试
SDK 已内置连接和认证处理,直接调用订阅方法即可:import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';
const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);
// 直接订阅,SDK 自动处理连接和认证
client.stream.subscribeTokenCandles({
chain: 'sol',
tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
resolution: Resolution._1m,
callback: (data) => {
console.log('收到数据:', data);
}
});
SDK 会自动检测连接状态,未连接时自动建立连接,无需手动调用 connect()。
使用原生 WebSocket 时,连接后需要发送 connect 消息完成认证:const token = process.env.CHAINSTREAM_ACCESS_TOKEN;
const ws = new WebSocket(
`wss://realtime-dex.chainstream.io/connection/websocket?token=${token}`
);
ws.onopen = () => {
console.log('WebSocket 连接已建立');
// 发送 connect 消息完成认证
ws.send(JSON.stringify({
connect: {
token: token,
name: 'js'
},
id: 1
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// 处理 connect 响应
if (data.connect) {
console.log('✅ 认证成功,客户端 ID:', data.connect.client);
// 认证成功后开始订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
}
// 处理订阅数据
if (data.push) {
console.log('收到数据:', data.push.pub.data);
}
};
使用 wscat 进行测试:wscat -c "wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN"
连接后发送 connect 消息:{"connect":{"token":"YOUR_ACCESS_TOKEN","name":"test"},"id":1}
连接响应
认证成功后会收到如下响应:
{
"id": 1,
"connect": {
"client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
"version": "0.0.0 OSS",
"expires": true,
"ttl": 86002,
"ping": 25,
"pong": true
}
}
| 字段 | 说明 |
|---|
client | 客户端唯一标识 |
ttl | Token 剩余有效时间(秒) |
ping | 心跳间隔(秒) |
pong | 是否支持 pong 响应 |
订阅类型
ChainStream WebSocket 支持多种数据订阅类型:
| 类别 | 订阅频道前缀 | 说明 |
|---|
| K线数据 | dex-candle: | 代币价格 K 线 |
| 代币统计 | dex-token-stats: | 代币市场统计 |
| 持有者统计 | dex-token-holders-stats: | 代币持有者分布 |
| 新代币 | dex-token-new: | 新上线代币 |
| 代币交易 | dex-token-trade: | 代币交易记录 |
| 钱包余额 | dex-wallet-balance: | 钱包资产变化 |
| 钱包交易 | dex-wallet-trade: | 钱包交易记录 |
| 排名数据 | dex-ranking-token-stats: | 代币排名统计 |
| 流动池 | dex-dex-pool-balance: | DEX 流动池数据 |
订阅格式示例
// K线数据订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
// 代币统计订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
},
id: 3
}));
// 新代币订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-new:sol'
},
id: 4
}));
取消订阅
ws.send(JSON.stringify({
unsubscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 5
}));
消息格式
请求消息
Connect 消息(认证):
{
"connect": {
"token": "YOUR_ACCESS_TOKEN",
"name": "client_name"
},
"id": 1
}
Subscribe 消息(订阅):
{
"subscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 2
}
Unsubscribe 消息(取消订阅):
{
"unsubscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 3
}
响应消息
订阅确认:
{
"id": 2,
"subscribe": {}
}
数据推送:
{
"push": {
"channel": "dex-candle:sol_xxx_1m",
"pub": {
"data": {
"o": 0.001234,
"c": 0.001256,
"h": 0.001280,
"l": 0.001200,
"v": 1234567,
"t": 1706745600
}
}
}
}
错误消息:
{
"id": 2,
"error": {
"code": 100,
"message": "invalid channel"
}
}
心跳保活
WebSocket 连接需要定期发送心跳消息以保持活跃。根据 connect 响应中的 ping 字段(通常为 25 秒),在此间隔内发送心跳:
// 心跳消息
ws.send(JSON.stringify({}));
// 或者发送 ping
ws.send(JSON.stringify({ ping: {} }));
如果在指定时间内(通常为 ping 间隔的 3 倍)未发送任何消息,服务器将主动断开连接。
完整示例
const WebSocket = require('ws');
class ChainStreamWebSocket {
constructor(accessToken) {
this.accessToken = accessToken;
this.ws = null;
this.messageId = 0;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.subscriptions = new Set();
this.pingInterval = null;
}
connect() {
const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('WebSocket 连接已建立');
this.reconnectAttempts = 0;
// 发送 connect 消息
this.send({
connect: {
token: this.accessToken,
name: 'nodejs'
}
});
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.ws.onclose = (event) => {
console.log(`连接关闭: ${event.code}`);
this.stopPing();
if (event.code !== 1000) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error.message);
};
}
handleMessage(data) {
// 处理 connect 响应
if (data.connect) {
console.log('✅ 认证成功');
this.startPing(data.connect.ping || 25);
this.resubscribe();
return;
}
// 处理数据推送
if (data.push) {
console.log(`[${data.push.channel}]`, data.push.pub.data);
return;
}
// 处理错误
if (data.error) {
console.error('错误:', data.error.message);
return;
}
}
send(message) {
message.id = ++this.messageId;
this.ws.send(JSON.stringify(message));
}
subscribe(channel) {
this.subscriptions.add(channel);
this.send({ subscribe: { channel } });
}
unsubscribe(channel) {
this.subscriptions.delete(channel);
this.send({ unsubscribe: { channel } });
}
resubscribe() {
this.subscriptions.forEach(channel => {
this.send({ subscribe: { channel } });
});
}
startPing(interval) {
this.pingInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send('{}');
}
}, interval * 1000);
}
stopPing() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('❌ 达到最大重连次数');
return;
}
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`⏳ ${delay}ms 后重连... (第 ${this.reconnectAttempts + 1} 次)`);
this.reconnectAttempts++;
setTimeout(() => this.connect(), delay);
}
close() {
this.stopPing();
if (this.ws) {
this.ws.close(1000, 'Normal closure');
}
}
}
// 使用示例
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();
// 等待连接成功后订阅
setTimeout(() => {
client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
client.subscribe('dex-token-new:sol');
}, 1000);
最佳实践
性能优化
使用过滤条件
只订阅需要的数据,减少带宽消耗。使用 CEL 表达式过滤数据。
批量处理
对高频数据进行批量处理而非逐条处理,使用消息队列缓冲。
本地缓存
缓存 Token 信息等静态数据,减少重复处理。
连接复用
单个连接可订阅多个频道,避免创建多个连接。
错误处理
- 监听错误事件 — 及时处理连接错误和数据错误
- 实现重试机制 — 使用指数退避策略进行重连
- 日志记录 — 记录关键事件便于问题排查
- 优雅降级 — WebSocket 不可用时切换到轮询
资源管理
// ✅ 及时取消不需要的订阅
client.unsubscribe('dex-candle:sol_xxx_1m');
// ✅ 优雅关闭连接
function gracefulClose() {
// 1. 停止心跳
client.stopPing();
// 2. 关闭连接
client.close();
}
相关文档