ChainStream provides powerful real-time data streaming capabilities, allowing developers to instantly receive on-chain events, transactions, and state changes. This document covers WebSocket connection, subscription mechanisms, and best practices.
Connection
WebSocket Endpoint
wss://realtime-dex.chainstream.io/connection/websocket
Authentication
Provide the Access Token in the URL when establishing a connection:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
Using SDK (Recommended)
Using Native WebSocket
Command Line Testing
The SDK handles connection and authentication automatically. Just call the subscribe method: import { ChainStreamClient } from '@chainstream-io/sdk' ;
import { Resolution } from '@chainstream-io/sdk/openapi' ;
const client = new ChainStreamClient ( process . env . CHAINSTREAM_ACCESS_TOKEN );
// Subscribe directly, SDK handles connection and authentication automatically
client . stream . subscribeTokenCandles ({
chain: 'sol' ,
tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN' ,
resolution: Resolution . _1m ,
callback : ( data ) => {
console . log ( 'Received data:' , data );
}
});
The SDK automatically detects connection status and establishes connection when needed. No need to manually call connect().
When using native WebSocket, send a connect message after connection to complete authentication: const token = process . env . CHAINSTREAM_ACCESS_TOKEN ;
const ws = new WebSocket (
`wss://realtime-dex.chainstream.io/connection/websocket?token= ${ token } `
);
ws . onopen = () => {
console . log ( 'WebSocket connection established' );
// Send connect message to complete authentication
ws . send ( JSON . stringify ({
connect: {
token: token ,
name: 'js'
},
id: 1
}));
};
ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
// Handle connect response
if ( data . connect ) {
console . log ( '✅ Authentication successful, client ID:' , data . connect . client );
// Start subscribing after authentication
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
}
// Handle subscription data
if ( data . push ) {
console . log ( 'Received data:' , data . push . pub . data );
}
};
Test using wscat: wscat -c "wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN"
Send connect message after connection: { "connect" :{ "token" : "YOUR_ACCESS_TOKEN" , "name" : "test" }, "id" : 1 }
Connection Response
Upon successful authentication, you’ll receive a response like:
{
"id" : 1 ,
"connect" : {
"client" : "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb" ,
"version" : "0.0.0 OSS" ,
"expires" : true ,
"ttl" : 86002 ,
"ping" : 25 ,
"pong" : true
}
}
Field Description clientUnique client identifier ttlToken remaining validity (seconds) pingHeartbeat interval (seconds) pongWhether pong response is supported
Subscription Types
ChainStream WebSocket supports multiple data subscription types:
Category Channel Prefix Description Candles dex-candle:Token price candles Token Stats dex-token-stats:Token market statistics Holder Stats dex-token-holders-stats:Token holder distribution New Tokens dex-token-new:Newly launched tokens Token Trades dex-token-trade:Token trade records Wallet Balance dex-wallet-balance:Wallet asset changes Wallet Trades dex-wallet-trade:Wallet trade records Rankings dex-ranking-token-stats:Token ranking statistics DEX Pools dex-dex-pool-balance:DEX liquidity pool data
// Subscribe to candle data
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
// Subscribe to token stats
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
},
id: 3
}));
// Subscribe to new tokens
ws . send ( JSON . stringify ({
subscribe: {
channel: 'dex-token-new:sol'
},
id: 4
}));
Unsubscribe
ws . send ( JSON . stringify ({
unsubscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 5
}));
Request Messages
Connect Message (Authentication):
{
"connect" : {
"token" : "YOUR_ACCESS_TOKEN" ,
"name" : "client_name"
},
"id" : 1
}
Subscribe Message:
{
"subscribe" : {
"channel" : "dex-candle:sol_xxx_1m"
},
"id" : 2
}
Unsubscribe Message:
{
"unsubscribe" : {
"channel" : "dex-candle:sol_xxx_1m"
},
"id" : 3
}
Response Messages
Subscription Confirmation:
{
"id" : 2 ,
"subscribe" : {}
}
Data Push:
{
"push" : {
"channel" : "dex-candle:sol_xxx_1m" ,
"pub" : {
"data" : {
"o" : 0.001234 ,
"c" : 0.001256 ,
"h" : 0.001280 ,
"l" : 0.001200 ,
"v" : 1234567 ,
"t" : 1706745600
}
}
}
}
Error Message:
{
"id" : 2 ,
"error" : {
"code" : 100 ,
"message" : "invalid channel"
}
}
Heartbeat
WebSocket connections require periodic heartbeat messages to stay active. Based on the ping field in the connect response (usually 25 seconds), send heartbeats within this interval:
// Heartbeat message
ws . send ( JSON . stringify ({}));
// Or send ping
ws . send ( JSON . stringify ({ ping: {} }));
If no messages are sent within the specified time (typically 3x the ping interval), the server will disconnect.
Complete Example
const WebSocket = require ( 'ws' );
class ChainStreamWebSocket {
constructor ( accessToken ) {
this . accessToken = accessToken ;
this . ws = null ;
this . messageId = 0 ;
this . reconnectAttempts = 0 ;
this . maxReconnectAttempts = 10 ;
this . subscriptions = new Set ();
this . pingInterval = null ;
}
connect () {
const url = `wss://realtime-dex.chainstream.io/connection/websocket?token= ${ this . accessToken } ` ;
this . ws = new WebSocket ( url );
this . ws . onopen = () => {
console . log ( 'WebSocket connection established' );
this . reconnectAttempts = 0 ;
// Send connect message
this . send ({
connect: {
token: this . accessToken ,
name: 'nodejs'
}
});
};
this . ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
this . handleMessage ( data );
};
this . ws . onclose = ( event ) => {
console . log ( `Connection closed: ${ event . code } ` );
this . stopPing ();
if ( event . code !== 1000 ) {
this . scheduleReconnect ();
}
};
this . ws . onerror = ( error ) => {
console . error ( 'WebSocket error:' , error . message );
};
}
handleMessage ( data ) {
// Handle connect response
if ( data . connect ) {
console . log ( '✅ Authentication successful' );
this . startPing ( data . connect . ping || 25 );
this . resubscribe ();
return ;
}
// Handle data push
if ( data . push ) {
console . log ( `[ ${ data . push . channel } ]` , data . push . pub . data );
return ;
}
// Handle error
if ( data . error ) {
console . error ( 'Error:' , data . error . message );
return ;
}
}
send ( message ) {
message . id = ++ this . messageId ;
this . ws . send ( JSON . stringify ( message ));
}
subscribe ( channel ) {
this . subscriptions . add ( channel );
this . send ({ subscribe: { channel } });
}
unsubscribe ( channel ) {
this . subscriptions . delete ( channel );
this . send ({ unsubscribe: { channel } });
}
resubscribe () {
this . subscriptions . forEach ( channel => {
this . send ({ subscribe: { channel } });
});
}
startPing ( interval ) {
this . pingInterval = setInterval (() => {
if ( this . ws . readyState === WebSocket . OPEN ) {
this . ws . send ( '{}' );
}
}, interval * 1000 );
}
stopPing () {
if ( this . pingInterval ) {
clearInterval ( this . pingInterval );
this . pingInterval = null ;
}
}
scheduleReconnect () {
if ( this . reconnectAttempts >= this . maxReconnectAttempts ) {
console . error ( '❌ Max reconnect attempts reached' );
return ;
}
const delay = Math . min ( 1000 * Math . pow ( 2 , this . reconnectAttempts ), 30000 );
console . log ( `⏳ Reconnecting in ${ delay } ms... (attempt ${ this . reconnectAttempts + 1 } )` );
this . reconnectAttempts ++ ;
setTimeout (() => this . connect (), delay );
}
close () {
this . stopPing ();
if ( this . ws ) {
this . ws . close ( 1000 , 'Normal closure' );
}
}
}
// Usage example
const client = new ChainStreamWebSocket ( process . env . CHAINSTREAM_ACCESS_TOKEN );
client . connect ();
// Subscribe after connection is established
setTimeout (() => {
client . subscribe ( 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m' );
client . subscribe ( 'dex-token-new:sol' );
}, 1000 );
Best Practices
Use Filters Subscribe only to needed data to reduce bandwidth. Use CEL expressions to filter data.
Batch Processing Batch process high-frequency data instead of processing one by one. Use message queues for buffering.
Local Caching Cache static data like token information to reduce repeated processing.
Connection Reuse A single connection can subscribe to multiple channels. Avoid creating multiple connections.
Error Handling
Listen for error events — Handle connection and data errors promptly
Implement retry mechanism — Use exponential backoff for reconnection
Log recording — Record key events for troubleshooting
Graceful degradation — Switch to polling when WebSocket is unavailable
Resource Management
// ✅ Unsubscribe from unneeded channels promptly
client . unsubscribe ( 'dex-candle:sol_xxx_1m' );
// ✅ Close connection gracefully
function gracefulClose () {
// 1. Stop heartbeat
client . stopPing ();
// 2. Close connection
client . close ();
}