메인 콘텐츠로 건너뛰기
ChainStream은 강력한 실시간 데이터 스트리밍 기능을 제공하여 개발자가 온체인 이벤트, 트랜잭션, 상태 변경을 즉시 수신할 수 있습니다. 이 문서에서는 WebSocket 연결, 구독 메커니즘, 모범 사례를 다룹니다.

연결

WebSocket 엔드포인트

wss://realtime-dex.chainstream.io/connection/websocket

인증

연결 수립 시 URL에 Access Token을 제공합니다:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK가 연결과 인증을 자동으로 처리합니다. subscribe 메서드를 호출하기만 하면 됩니다:
import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';

const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);

// 직접 구독, SDK가 연결과 인증을 자동으로 처리
client.stream.subscribeTokenCandles({
  chain: 'sol',
  tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
  resolution: Resolution._1m,
  callback: (data) => {
    console.log('Received data:', data);
  }
});
SDK는 연결 상태를 자동으로 감지하고 필요할 때 연결을 수립합니다. 수동으로 connect()를 호출할 필요가 없습니다.

연결 응답

인증 성공 시 다음과 같은 응답을 받게 됩니다:
{
  "id": 1,
  "connect": {
    "client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
    "version": "0.0.0 OSS",
    "expires": true,
    "ttl": 86002,
    "ping": 25,
    "pong": true
  }
}
필드설명
client고유 클라이언트 식별자
ttl토큰 잔여 유효시간 (초)
ping하트비트 간격 (초)
pongpong 응답 지원 여부

구독 유형

ChainStream WebSocket은 다양한 데이터 구독 유형을 지원합니다:
카테고리채널 접두사설명
캔들dex-candle:토큰 가격 캔들
토큰 통계dex-token-stats:토큰 시장 통계
홀더 통계dex-token-holders-stats:토큰 홀더 분포
새 토큰dex-token-new:새로 출시된 토큰
토큰 거래dex-token-trade:토큰 거래 기록
지갑 잔액dex-wallet-balance:지갑 자산 변동
지갑 거래dex-wallet-trade:지갑 거래 기록
랭킹dex-ranking-token-stats:토큰 랭킹 통계
DEX 풀dex-dex-pool-balance:DEX 유동성 풀 데이터
전체 구독 유형, 파라미터, 응답 형식은 WebSocket API 문서를 참고하세요.

구독 형식 예시

// 캔들 데이터 구독
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 2
}));

// 토큰 통계 구독
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
  },
  id: 3
}));

// 새 토큰 구독
ws.send(JSON.stringify({
  subscribe: {
    channel: 'dex-token-new:sol'
  },
  id: 4
}));

구독 해제

ws.send(JSON.stringify({
  unsubscribe: {
    channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
  },
  id: 5
}));

메시지 형식

요청 메시지

Connect 메시지 (인증):
{
  "connect": {
    "token": "YOUR_ACCESS_TOKEN",
    "name": "client_name"
  },
  "id": 1
}
Subscribe 메시지:
{
  "subscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 2
}
Unsubscribe 메시지:
{
  "unsubscribe": {
    "channel": "dex-candle:sol_xxx_1m"
  },
  "id": 3
}

응답 메시지

구독 확인:
{
  "id": 2,
  "subscribe": {}
}
데이터 푸시:
{
  "push": {
    "channel": "dex-candle:sol_xxx_1m",
    "pub": {
      "data": {
        "o": 0.001234,
        "c": 0.001256,
        "h": 0.001280,
        "l": 0.001200,
        "v": 1234567,
        "t": 1706745600
      }
    }
  }
}
오류 메시지:
{
  "id": 2,
  "error": {
    "code": 100,
    "message": "invalid channel"
  }
}

하트비트

WebSocket 연결을 유지하려면 주기적인 하트비트 메시지가 필요합니다. connect 응답의 ping 필드(보통 25초)를 기준으로 이 간격 내에 하트비트를 보내세요:
// 하트비트 메시지
ws.send(JSON.stringify({}));

// 또는 ping 전송
ws.send(JSON.stringify({ ping: {} }));
지정된 시간 내(보통 ping 간격의 3배)에 메시지가 전송되지 않으면 서버가 연결을 끊습니다.

전체 예시

const WebSocket = require('ws');

class ChainStreamWebSocket {
  constructor(accessToken) {
    this.accessToken = accessToken;
    this.ws = null;
    this.messageId = 0;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
    this.subscriptions = new Set();
    this.pingInterval = null;
  }

  connect() {
    const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('WebSocket connection established');
      this.reconnectAttempts = 0;
      
      // connect 메시지 전송
      this.send({
        connect: {
          token: this.accessToken,
          name: 'nodejs'
        }
      });
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.ws.onclose = (event) => {
      console.log(`Connection closed: ${event.code}`);
      this.stopPing();
      
      if (event.code !== 1000) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error.message);
    };
  }

  handleMessage(data) {
    // connect 응답 처리
    if (data.connect) {
      console.log('✅ Authentication successful');
      this.startPing(data.connect.ping || 25);
      this.resubscribe();
      return;
    }

    // 데이터 푸시 처리
    if (data.push) {
      console.log(`[${data.push.channel}]`, data.push.pub.data);
      return;
    }

    // 오류 처리
    if (data.error) {
      console.error('Error:', data.error.message);
      return;
    }
  }

  send(message) {
    message.id = ++this.messageId;
    this.ws.send(JSON.stringify(message));
  }

  subscribe(channel) {
    this.subscriptions.add(channel);
    this.send({ subscribe: { channel } });
  }

  unsubscribe(channel) {
    this.subscriptions.delete(channel);
    this.send({ unsubscribe: { channel } });
  }

  resubscribe() {
    this.subscriptions.forEach(channel => {
      this.send({ subscribe: { channel } });
    });
  }

  startPing(interval) {
    this.pingInterval = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send('{}');
      }
    }, interval * 1000);
  }

  stopPing() {
    if (this.pingInterval) {
      clearInterval(this.pingInterval);
      this.pingInterval = null;
    }
  }

  scheduleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('❌ Max reconnect attempts reached');
      return;
    }

    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    console.log(`⏳ Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts + 1})`);
    this.reconnectAttempts++;

    setTimeout(() => this.connect(), delay);
  }

  close() {
    this.stopPing();
    if (this.ws) {
      this.ws.close(1000, 'Normal closure');
    }
  }
}

// 사용 예시
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();

// 연결 수립 후 구독
setTimeout(() => {
  client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
  client.subscribe('dex-token-new:sol');
}, 1000);

모범 사례

성능 최적화

필터 사용

필요한 데이터만 구독하여 대역폭을 줄이세요. CEL 표현식으로 데이터를 필터링하세요.

배치 처리

고빈도 데이터는 하나씩 처리하지 말고 배치로 처리하세요. 메시지 큐를 사용하여 버퍼링하세요.

로컬 캐싱

토큰 정보와 같은 정적 데이터를 캐시하여 반복 처리를 줄이세요.

연결 재사용

하나의 연결로 여러 채널을 구독할 수 있습니다. 여러 연결을 생성하지 마세요.

오류 처리

  1. 오류 이벤트 리스닝 — 연결 및 데이터 오류를 즉시 처리
  2. 재시도 메커니즘 구현 — 재연결 시 지수 백오프 사용
  3. 로그 기록 — 문제 해결을 위한 주요 이벤트 기록
  4. 그레이스풀 디그레이데이션 — WebSocket 사용 불가 시 폴링으로 전환

리소스 관리

// ✅ 불필요한 채널은 즉시 구독 해제
client.unsubscribe('dex-candle:sol_xxx_1m');

// ✅ 연결을 우아하게 종료
function gracefulClose() {
  // 1. 하트비트 중지
  client.stopPing();
  
  // 2. 연결 종료
  client.close();
}

관련 문서

WebSocket API 레퍼런스

전체 구독 유형 및 파라미터

가격 알림 봇

실습: 가격 모니터링 봇 만들기