Skip to main content
ChainStream provides powerful real-time data streaming capabilities, allowing developers to instantly receive on-chain events, transactions, and state changes. This document covers WebSocket connection, subscription mechanisms, and best practices.

Connection

WebSocket Endpoint

wss://realtime-dex.chainstream.io/connection/websocket

Authentication

Provide the Access Token in the URL when establishing a connection:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN

Connection Response

Upon successful authentication, you’ll receive a response like:
{
  "id": 1,
  "connect": {
    "client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
    "version": "0.0.0 OSS",
    "expires": true,
    "ttl": 86002,
    "ping": 25,
    "pong": true
  }
}
FieldDescription
clientUnique client identifier
ttlToken remaining validity (seconds)
pingHeartbeat interval (seconds)
pongWhether pong response is supported

Subscription Types

ChainStream WebSocket supports multiple data subscription types:
CategoryChannel PrefixDescription
Candlesdex-candle:Token price candles
Token Statsdex-token-stats:Token market statistics
Holder Statsdex-token-holders-stats:Token holder distribution
New Tokensdex-token-new:Newly launched tokens
Token Tradesdex-token-trade:Token trade records
Wallet Balancedex-wallet-balance:Wallet asset changes
Wallet Tradesdex-wallet-trade:Wallet trade records
Rankingsdex-ranking-token-stats:Token ranking statistics
DEX Poolsdex-dex-pool-balance:DEX liquidity pool data
For complete subscription types, parameters, and response formats, see the WebSocket API documentation.

Subscription Format Examples

// Subscribe to candle data
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 2
}));

// Subscribe to token stats
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
  },
  id: 3
}));

// Subscribe to new tokens
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-new:sol'
  },
  id: 4
}));

Unsubscribe

ws.send(JSON.stringify({
  unsubscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 5
}));

Message Format

Request Messages

Connect Message (Authentication):
{
  "connect": {
    "token": "YOUR_ACCESS_TOKEN",
    "name": "client_name"
  },
  "id": 1
}
Subscribe Message:
{
  "subscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 2
}
Unsubscribe Message:
{
  "unsubscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 3
}

Response Messages

Subscription Confirmation:
{
  "id": 2,
  "subscribe": {}
}
Data Push:
{
  "push": {
    "channel": "dex-candle:sol_xxx_1m",
    "pub": {
      "data": {
        "o": 0.001234,
        "c": 0.001256,
        "h": 0.001280,
        "l": 0.001200,
        "v": 1234567,
        "t": 1706745600
      }
    }
  }
}
Error Message:
{
  "id": 2,
  "error": {
    "code": 100,
    "message": "invalid channel"
  }
}

Heartbeat

WebSocket connections require periodic heartbeat messages to stay active. Based on the ping field in the connect response (usually 25 seconds), send heartbeats within this interval:
// Heartbeat message
ws.send(JSON.stringify({}));

// Or send ping
ws.send(JSON.stringify({ ping: {} }));
If no messages are sent within the specified time (typically 3x the ping interval), the server will disconnect.

Complete Example

const WebSocket = require('ws');

class ChainStreamWebSocket {
  constructor(accessToken) {
    this.accessToken = accessToken;
    this.ws = null;
    this.messageId = 0;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.subscriptions = new Set();
    this.pingInterval = null;
  }

  connect() {
    const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('WebSocket connection established');
      this.reconnectAttempts = 0;
      
      // Send connect message
      this.send({
        connect: {
          token: this.accessToken,
          name: 'nodejs'
        }
      });
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onclose = (event) => {
      console.log(`Connection closed: ${event.code}`);
      this.stopPing();
      
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error.message);
    };
  }

  handleMessage(data) {
    // Handle connect response
    if (data.connect) {
      console.log('✅ Authentication successful');
      this.startPing(data.connect.ping || 25);
      this.resubscribe();
      return;
    }

    // Handle data push
    if (data.push) {
      console.log(`[${data.push.channel}]`, data.push.pub.data);
      return;
    }

    // Handle error
    if (data.error) {
      console.error('Error:', data.error.message);
      return;
    }
  }

  send(message) {
    message.id = ++this.messageId;
    this.ws.send(JSON.stringify(message));
  }

  subscribe(channel) {
    this.subscriptions.add(channel);
    this.send({ subscribe: { channel } });
  }

  unsubscribe(channel) {
    this.subscriptions.delete(channel);
    this.send({ unsubscribe: { channel } });
  }

  resubscribe() {
    this.subscriptions.forEach(channel => {
      this.send({ subscribe: { channel } });
    });
  }

  startPing(interval) {
    this.pingInterval = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send('{}');
      }
    }, interval * 1000);
  }

  stopPing() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  scheduleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('❌ Max reconnect attempts reached');
      return;
    }

    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    console.log(`⏳ Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts + 1})`);
    this.reconnectAttempts++;

    setTimeout(() => this.connect(), delay);
  }

  close() {
    this.stopPing();
    if (this.ws) {
      this.ws.close(1000, 'Normal closure');
    }
  }
}

// Usage example
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();

// Subscribe after connection is established
setTimeout(() => {
  client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
  client.subscribe('dex-token-new:sol');
}, 1000);

Best Practices

Performance Optimization

Use Filters

Subscribe only to needed data to reduce bandwidth. Use CEL expressions to filter data.

Batch Processing

Batch process high-frequency data instead of processing one by one. Use message queues for buffering.

Local Caching

Cache static data like token information to reduce repeated processing.

Connection Reuse

A single connection can subscribe to multiple channels. Avoid creating multiple connections.

Error Handling

  1. Listen for error events — Handle connection and data errors promptly
  2. Implement retry mechanism — Use exponential backoff for reconnection
  3. Log recording — Record key events for troubleshooting
  4. Graceful degradation — Switch to polling when WebSocket is unavailable

Resource Management

// ✅ Unsubscribe from unneeded channels promptly
client.unsubscribe('dex-candle:sol_xxx_1m');

// ✅ Close connection gracefully
function gracefulClose() {
  // 1. Stop heartbeat
  client.stopPing();
  
  // 2. Close connection
  client.close();
}