ChainStream 提供強大的實時資料流處理能力,讓開發者能夠即時接收鏈上事件、交易和狀態變化。本文件介紹 WebSocket 連線、訂閱機制和最佳實踐。
連線方式
WebSocket 端點
wss://realtime-dex.chainstream.io/connection/websocket
連線認證
建立連線時需要在 URL 中提供 Access Token:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
使用 SDK(推薦)
使用原生 WebSocket
命令列測試
SDK 已內建連線和認證處理,直接呼叫訂閱方法即可:import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';
const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);
// 直接订阅,SDK 自动处理连接和认证
client.stream.subscribeTokenCandles({
chain: 'sol',
tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
resolution: Resolution._1m,
callback: (data) => {
console.log('收到数据:', data);
}
});
SDK 會自動檢測連線狀態,未連線時自動建立連線,無需手動呼叫 connect()。
使用原生 WebSocket 時,連線後需要傳送 connect 訊息完成認證:const token = process.env.CHAINSTREAM_ACCESS_TOKEN;
const ws = new WebSocket(
`wss://realtime-dex.chainstream.io/connection/websocket?token=${token}`
);
ws.onopen = () => {
console.log('WebSocket 连接已建立');
// 发送 connect 消息完成认证
ws.send(JSON.stringify({
connect: {
token: token,
name: 'js'
},
id: 1
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// 处理 connect 响应
if (data.connect) {
console.log('✅ 认证成功,客户端 ID:', data.connect.client);
// 认证成功后开始订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
}
// 处理订阅数据
if (data.push) {
console.log('收到数据:', data.push.pub.data);
}
};
使用 wscat 進行測試:wscat -c "wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN"
連線後傳送 connect 訊息:{"connect":{"token":"YOUR_ACCESS_TOKEN","name":"test"},"id":1}
連線響應
認證成功後會收到如下響應:
{
"id": 1,
"connect": {
"client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
"version": "0.0.0 OSS",
"expires": true,
"ttl": 86002,
"ping": 25,
"pong": true
}
}
| 欄位 | 說明 |
|---|
client | 客戶端唯一標識 |
ttl | Token 剩餘有效時間(秒) |
ping | 心跳間隔(秒) |
pong | 是否支援 pong 響應 |
訂閱型別
ChainStream WebSocket 支援多種資料訂閱型別:
| 類別 | 訂閱頻道字首 | 說明 |
|---|
| K線資料 | dex-candle: | 代幣價格 K 線 |
| 代幣統計 | dex-token-stats: | 代幣市場統計 |
| 持有者統計 | dex-token-holders-stats: | 代幣持有者分佈 |
| 新代幣 | dex-token-new: | 新上線代幣 |
| 代幣交易 | dex-token-trade: | 代幣交易記錄 |
| 錢包餘額 | dex-wallet-balance: | 錢包資產變化 |
| 錢包交易 | dex-wallet-trade: | 錢包交易記錄 |
| 排名資料 | dex-ranking-token-stats: | 代幣排名統計 |
| 流動池 | dex-dex-pool-balance: | DEX 流動池資料 |
訂閱格式示例
// K线数据订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
// 代币统计订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
},
id: 3
}));
// 新代币订阅
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-new:sol'
},
id: 4
}));
取消訂閱
ws.send(JSON.stringify({
unsubscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 5
}));
訊息格式
請求訊息
Connect 訊息(認證):
{
"connect": {
"token": "YOUR_ACCESS_TOKEN",
"name": "client_name"
},
"id": 1
}
Subscribe 訊息(訂閱):
{
"subscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 2
}
Unsubscribe 訊息(取消訂閱):
{
"unsubscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 3
}
響應訊息
訂閱確認:
{
"id": 2,
"subscribe": {}
}
資料推送:
{
"push": {
"channel": "dex-candle:sol_xxx_1m",
"pub": {
"data": {
"o": 0.001234,
"c": 0.001256,
"h": 0.001280,
"l": 0.001200,
"v": 1234567,
"t": 1706745600
}
}
}
}
錯誤訊息:
{
"id": 2,
"error": {
"code": 100,
"message": "invalid channel"
}
}
心跳保活
WebSocket 連線需要定期傳送心跳訊息以保持活躍。根據 connect 響應中的 ping 欄位(通常為 25 秒),在此間隔內傳送心跳:
// 心跳消息
ws.send(JSON.stringify({}));
// 或者发送 ping
ws.send(JSON.stringify({ ping: {} }));
如果在指定時間內(通常為 ping 間隔的 3 倍)未傳送任何訊息,伺服器將主動斷開連線。
完整示例
const WebSocket = require('ws');
class ChainStreamWebSocket {
constructor(accessToken) {
this.accessToken = accessToken;
this.ws = null;
this.messageId = 0;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.subscriptions = new Set();
this.pingInterval = null;
}
connect() {
const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('WebSocket 连接已建立');
this.reconnectAttempts = 0;
// 发送 connect 消息
this.send({
connect: {
token: this.accessToken,
name: 'nodejs'
}
});
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.ws.onclose = (event) => {
console.log(`连接关闭: ${event.code}`);
this.stopPing();
if (event.code !== 1000) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error.message);
};
}
handleMessage(data) {
// 处理 connect 响应
if (data.connect) {
console.log('✅ 认证成功');
this.startPing(data.connect.ping || 25);
this.resubscribe();
return;
}
// 处理数据推送
if (data.push) {
console.log(`[${data.push.channel}]`, data.push.pub.data);
return;
}
// 处理错误
if (data.error) {
console.error('错误:', data.error.message);
return;
}
}
send(message) {
message.id = ++this.messageId;
this.ws.send(JSON.stringify(message));
}
subscribe(channel) {
this.subscriptions.add(channel);
this.send({ subscribe: { channel } });
}
unsubscribe(channel) {
this.subscriptions.delete(channel);
this.send({ unsubscribe: { channel } });
}
resubscribe() {
this.subscriptions.forEach(channel => {
this.send({ subscribe: { channel } });
});
}
startPing(interval) {
this.pingInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send('{}');
}
}, interval * 1000);
}
stopPing() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('❌ 达到最大重连次数');
return;
}
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`⏳ ${delay}ms 后重连... (第 ${this.reconnectAttempts + 1} 次)`);
this.reconnectAttempts++;
setTimeout(() => this.connect(), delay);
}
close() {
this.stopPing();
if (this.ws) {
this.ws.close(1000, 'Normal closure');
}
}
}
// 使用示例
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();
// 等待连接成功后订阅
setTimeout(() => {
client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
client.subscribe('dex-token-new:sol');
}, 1000);
最佳實踐
效能最佳化
使用過濾條件
只訂閱需要的資料,減少頻寬消耗。使用 CEL 表示式過濾資料。
批次處理
對高頻資料進行批次處理而非逐條處理,使用訊息佇列緩衝。
本地快取
快取 Token 資訊等靜態資料,減少重複處理。
連線複用
單個連線可訂閱多個頻道,避免建立多個連線。
錯誤處理
- 監聽錯誤事件 — 及時處理連線錯誤和資料錯誤
- 實現重試機制 — 使用指數退避策略進行重連
- 日誌記錄 — 記錄關鍵事件便於問題排查
- 優雅降級 — WebSocket 不可用時切換到輪詢
資源管理
// ✅ 及时取消不需要的订阅
client.unsubscribe('dex-candle:sol_xxx_1m');
// ✅ 优雅关闭连接
function gracefulClose() {
// 1. 停止心跳
client.stopPing();
// 2. 关闭连接
client.close();
}
相關文件
WebSocket API 參考
完整的訂閱型別和引數說明