跳转到主要内容

安装

pip install chainstream-sdk

快速开始

from chainstream import ChainStreamClient

client = ChainStreamClient(access_token='YOUR_ACCESS_TOKEN')

REST API 示例

查询代币元数据:
import asyncio
from chainstream.openapi_client import ApiClient, Configuration
from chainstream.openapi_client.api.token_api import TokenApi
from chainstream.openapi_client.models.chain_symbol import ChainSymbol

ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
BASE_URL = 'https://api.chainstream.io'

async def main():
    # 配置 API 客户端
    config = Configuration(
        host=BASE_URL,
        access_token=ACCESS_TOKEN,
    )
    
    async with ApiClient(config) as api_client:
        # 创建 token API 实例
        token_api = TokenApi(api_client)
        
        chain = ChainSymbol.SOL
        token_address = 'So11111111111111111111111111111111111111112'  # SOL
        
        print(f'查询代币: {chain.value}/{token_address}')
        
        # 获取代币元数据
        result = await token_api.get_metadata(
            chain=chain,
            token_address=token_address,
        )
        
        print('代币元数据:')
        print(f'  名称: {result.name}')
        print(f'  符号: {result.symbol}')
        print(f'  精度: {result.decimals}')

if __name__ == '__main__':
    asyncio.run(main())

WebSocket 示例

订阅实时代币 K 线数据:
import asyncio
import signal
from chainstream import ChainStreamClient
from chainstream.stream import Resolution, TokenCandle

ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'

async def main():
    client = ChainStreamClient(ACCESS_TOKEN)

    chain = 'sol'
    token_address = 'So11111111111111111111111111111111111111112'  # SOL
    resolution = Resolution.S1

    print(f'订阅代币 K 线: {chain}/{token_address}')
    print('监听中... (按 Ctrl+C 停止)\n')

    message_count = 0

    def on_candle(candle: TokenCandle) -> None:
        nonlocal message_count
        message_count += 1
        print(
            f'[{message_count}] open={candle.open}, close={candle.close}, '
            f'high={candle.high}, low={candle.low}, volume={candle.volume}'
        )

    unsub = await client.stream.subscribe_token_candles(
        chain=chain,
        token_address=token_address,
        resolution=resolution,
        callback=on_candle,
    )

    # 创建事件等待 Ctrl+C
    stop_event = asyncio.Event()

    def signal_handler(sig, frame):
        print(f'\n收到 {message_count} 条消息')
        print('关闭连接...')
        stop_event.set()

    signal.signal(signal.SIGINT, signal_handler)

    await stop_event.wait()

    # 清理
    unsub.unsubscribe()
    await client.close()

if __name__ == '__main__':
    asyncio.run(main())

Pandas 集成

import pandas as pd

candles = await client.token.get_candles('sol', 'TOKEN_ADDRESS', resolution='1h', limit=100)
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')

相关资源