ChainStream은 강력한 실시간 데이터 스트리밍 기능을 제공하여 개발자가 온체인 이벤트, 트랜잭션, 상태 변경을 즉시 수신할 수 있습니다. 이 문서에서는 WebSocket 연결, 구독 메커니즘, 모범 사례를 다룹니다.
WebSocket 엔드포인트
wss://realtime-dex.chainstream.io/connection/websocket
연결 수립 시 URL에 Access Token을 제공합니다:
wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN
SDK 사용 (추천)
네이티브 WebSocket 사용
커맨드 라인 테스트
SDK가 연결과 인증을 자동으로 처리합니다. subscribe 메서드를 호출하기만 하면 됩니다:import { ChainStreamClient } from '@chainstream-io/sdk';
import { Resolution } from '@chainstream-io/sdk/openapi';
const client = new ChainStreamClient(process.env.CHAINSTREAM_ACCESS_TOKEN);
// 직접 구독, SDK가 연결과 인증을 자동으로 처리
client.stream.subscribeTokenCandles({
chain: 'sol',
tokenAddress: '6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN',
resolution: Resolution._1m,
callback: (data) => {
console.log('Received data:', data);
}
});
SDK는 연결 상태를 자동으로 감지하고 필요할 때 연결을 수립합니다. 수동으로 connect()를 호출할 필요가 없습니다.
네이티브 WebSocket 사용 시 연결 후 connect 메시지를 보내 인증을 완료합니다:const token = process.env.CHAINSTREAM_ACCESS_TOKEN;
const ws = new WebSocket(
`wss://realtime-dex.chainstream.io/connection/websocket?token=${token}`
);
ws.onopen = () => {
console.log('WebSocket connection established');
// connect 메시지를 보내 인증 완료
ws.send(JSON.stringify({
connect: {
token: token,
name: 'js'
},
id: 1
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// connect 응답 처리
if (data.connect) {
console.log('✅ Authentication successful, client ID:', data.connect.client);
// 인증 후 구독 시작
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
}
// 구독 데이터 처리
if (data.push) {
console.log('Received data:', data.push.pub.data);
}
};
wscat으로 테스트:wscat -c "wss://realtime-dex.chainstream.io/connection/websocket?token=YOUR_ACCESS_TOKEN"
연결 후 connect 메시지 전송:{"connect":{"token":"YOUR_ACCESS_TOKEN","name":"test"},"id":1}
연결 응답
인증 성공 시 다음과 같은 응답을 받게 됩니다:
{
"id": 1,
"connect": {
"client": "0f819f5f-7d8b-4949-9433-0e91bbfe1cdb",
"version": "0.0.0 OSS",
"expires": true,
"ttl": 86002,
"ping": 25,
"pong": true
}
}
| 필드 | 설명 |
|---|
client | 고유 클라이언트 식별자 |
ttl | 토큰 잔여 유효시간 (초) |
ping | 하트비트 간격 (초) |
pong | pong 응답 지원 여부 |
구독 유형
ChainStream WebSocket은 다양한 데이터 구독 유형을 지원합니다:
| 카테고리 | 채널 접두사 | 설명 |
|---|
| 캔들 | dex-candle: | 토큰 가격 캔들 |
| 토큰 통계 | dex-token-stats: | 토큰 시장 통계 |
| 홀더 통계 | dex-token-holders-stats: | 토큰 홀더 분포 |
| 새 토큰 | dex-token-new: | 새로 출시된 토큰 |
| 토큰 거래 | dex-token-trade: | 토큰 거래 기록 |
| 지갑 잔액 | dex-wallet-balance: | 지갑 자산 변동 |
| 지갑 거래 | dex-wallet-trade: | 지갑 거래 기록 |
| 랭킹 | dex-ranking-token-stats: | 토큰 랭킹 통계 |
| DEX 풀 | dex-dex-pool-balance: | DEX 유동성 풀 데이터 |
구독 형식 예시
// 캔들 데이터 구독
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 2
}));
// 토큰 통계 구독
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-stats:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN'
},
id: 3
}));
// 새 토큰 구독
ws.send(JSON.stringify({
subscribe: {
channel: 'dex-token-new:sol'
},
id: 4
}));
구독 해제
ws.send(JSON.stringify({
unsubscribe: {
channel: 'dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m'
},
id: 5
}));
메시지 형식
요청 메시지
Connect 메시지 (인증):
{
"connect": {
"token": "YOUR_ACCESS_TOKEN",
"name": "client_name"
},
"id": 1
}
Subscribe 메시지:
{
"subscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 2
}
Unsubscribe 메시지:
{
"unsubscribe": {
"channel": "dex-candle:sol_xxx_1m"
},
"id": 3
}
응답 메시지
구독 확인:
{
"id": 2,
"subscribe": {}
}
데이터 푸시:
{
"push": {
"channel": "dex-candle:sol_xxx_1m",
"pub": {
"data": {
"o": 0.001234,
"c": 0.001256,
"h": 0.001280,
"l": 0.001200,
"v": 1234567,
"t": 1706745600
}
}
}
}
오류 메시지:
{
"id": 2,
"error": {
"code": 100,
"message": "invalid channel"
}
}
하트비트
WebSocket 연결을 유지하려면 주기적인 하트비트 메시지가 필요합니다. connect 응답의 ping 필드(보통 25초)를 기준으로 이 간격 내에 하트비트를 보내세요:
// 하트비트 메시지
ws.send(JSON.stringify({}));
// 또는 ping 전송
ws.send(JSON.stringify({ ping: {} }));
지정된 시간 내(보통 ping 간격의 3배)에 메시지가 전송되지 않으면 서버가 연결을 끊습니다.
전체 예시
const WebSocket = require('ws');
class ChainStreamWebSocket {
constructor(accessToken) {
this.accessToken = accessToken;
this.ws = null;
this.messageId = 0;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.subscriptions = new Set();
this.pingInterval = null;
}
connect() {
const url = `wss://realtime-dex.chainstream.io/connection/websocket?token=${this.accessToken}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('WebSocket connection established');
this.reconnectAttempts = 0;
// connect 메시지 전송
this.send({
connect: {
token: this.accessToken,
name: 'nodejs'
}
});
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.ws.onclose = (event) => {
console.log(`Connection closed: ${event.code}`);
this.stopPing();
if (event.code !== 1000) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error.message);
};
}
handleMessage(data) {
// connect 응답 처리
if (data.connect) {
console.log('✅ Authentication successful');
this.startPing(data.connect.ping || 25);
this.resubscribe();
return;
}
// 데이터 푸시 처리
if (data.push) {
console.log(`[${data.push.channel}]`, data.push.pub.data);
return;
}
// 오류 처리
if (data.error) {
console.error('Error:', data.error.message);
return;
}
}
send(message) {
message.id = ++this.messageId;
this.ws.send(JSON.stringify(message));
}
subscribe(channel) {
this.subscriptions.add(channel);
this.send({ subscribe: { channel } });
}
unsubscribe(channel) {
this.subscriptions.delete(channel);
this.send({ unsubscribe: { channel } });
}
resubscribe() {
this.subscriptions.forEach(channel => {
this.send({ subscribe: { channel } });
});
}
startPing(interval) {
this.pingInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send('{}');
}
}, interval * 1000);
}
stopPing() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('❌ Max reconnect attempts reached');
return;
}
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`⏳ Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts + 1})`);
this.reconnectAttempts++;
setTimeout(() => this.connect(), delay);
}
close() {
this.stopPing();
if (this.ws) {
this.ws.close(1000, 'Normal closure');
}
}
}
// 사용 예시
const client = new ChainStreamWebSocket(process.env.CHAINSTREAM_ACCESS_TOKEN);
client.connect();
// 연결 수립 후 구독
setTimeout(() => {
client.subscribe('dex-candle:sol_6p6xgHyF7AeE6TZkSmFsko444wqoP15icUSqi2jfGiPN_1m');
client.subscribe('dex-token-new:sol');
}, 1000);
모범 사례
성능 최적화
필터 사용
필요한 데이터만 구독하여 대역폭을 줄이세요. CEL 표현식으로 데이터를 필터링하세요.
배치 처리
고빈도 데이터는 하나씩 처리하지 말고 배치로 처리하세요. 메시지 큐를 사용하여 버퍼링하세요.
로컬 캐싱
토큰 정보와 같은 정적 데이터를 캐시하여 반복 처리를 줄이세요.
연결 재사용
하나의 연결로 여러 채널을 구독할 수 있습니다. 여러 연결을 생성하지 마세요.
오류 처리
- 오류 이벤트 리스닝 — 연결 및 데이터 오류를 즉시 처리
- 재시도 메커니즘 구현 — 재연결 시 지수 백오프 사용
- 로그 기록 — 문제 해결을 위한 주요 이벤트 기록
- 그레이스풀 디그레이데이션 — WebSocket 사용 불가 시 폴링으로 전환
리소스 관리
// ✅ 불필요한 채널은 즉시 구독 해제
client.unsubscribe('dex-candle:sol_xxx_1m');
// ✅ 연결을 우아하게 종료
function gracefulClose() {
// 1. 하트비트 중지
client.stopPing();
// 2. 연결 종료
client.close();
}
관련 문서
WebSocket API 레퍼런스
전체 구독 유형 및 파라미터