Skip to main content

Installation

pip install chainstream-sdk

Quick Start

from chainstream import ChainStreamClient

client = ChainStreamClient(access_token='YOUR_ACCESS_TOKEN')

REST API Example

Query token metadata:
import asyncio
from chainstream.openapi_client import ApiClient, Configuration
from chainstream.openapi_client.api.token_api import TokenApi
from chainstream.openapi_client.models.chain_symbol import ChainSymbol

ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
BASE_URL = 'https://api-dex.chainstream.io'

async def main():
    # Configure API client
    config = Configuration(
        host=BASE_URL,
        access_token=ACCESS_TOKEN,
    )
    
    async with ApiClient(config) as api_client:
        # Create token API instance
        token_api = TokenApi(api_client)
        
        chain = ChainSymbol.SOL
        token_address = 'So11111111111111111111111111111111111111112'  # SOL
        
        print(f'Querying Token: {chain.value}/{token_address}')
        
        # Get token metadata
        result = await token_api.get_metadata(
            chain=chain,
            token_address=token_address,
        )
        
        print('Token Metadata:')
        print(f'  Name: {result.name}')
        print(f'  Symbol: {result.symbol}')
        print(f'  Decimals: {result.decimals}')

if __name__ == '__main__':
    asyncio.run(main())

WebSocket Example

Subscribe to real-time token candle data:
import asyncio
import signal
from chainstream import ChainStreamClient
from chainstream.stream import Resolution, TokenCandle

ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'

async def main():
    client = ChainStreamClient(ACCESS_TOKEN)

    chain = 'sol'
    token_address = 'So11111111111111111111111111111111111111112'  # SOL
    resolution = Resolution.S1

    print(f'Subscribing to Token Candles: {chain}/{token_address}')
    print('Listening... (Press Ctrl+C to stop)\n')

    message_count = 0

    def on_candle(candle: TokenCandle) -> None:
        nonlocal message_count
        message_count += 1
        print(
            f'[{message_count}] open={candle.open}, close={candle.close}, '
            f'high={candle.high}, low={candle.low}, volume={candle.volume}'
        )

    unsub = await client.stream.subscribe_token_candles(
        chain=chain,
        token_address=token_address,
        resolution=resolution,
        callback=on_candle,
    )

    # Create an event to wait for Ctrl+C
    stop_event = asyncio.Event()

    def signal_handler(sig, frame):
        print(f'\nReceived {message_count} messages')
        print('Closing connection...')
        stop_event.set()

    signal.signal(signal.SIGINT, signal_handler)

    await stop_event.wait()

    # Cleanup
    unsub.unsubscribe()
    await client.close()

if __name__ == '__main__':
    asyncio.run(main())

Pandas Integration

import pandas as pd

candles = await client.token.get_candles('sol', 'TOKEN_ADDRESS', resolution='1h', limit=100)
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')

Resources