安装
复制
询问AI
pip install chainstream-sdk
快速开始
复制
询问AI
from chainstream import ChainStreamClient
client = ChainStreamClient(access_token='YOUR_ACCESS_TOKEN')
REST API 示例
查询代币元数据:复制
询问AI
import asyncio
from chainstream.openapi_client import ApiClient, Configuration
from chainstream.openapi_client.api.token_api import TokenApi
from chainstream.openapi_client.models.chain_symbol import ChainSymbol
ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
BASE_URL = 'https://api.chainstream.io'
async def main():
# 配置 API 客户端
config = Configuration(
host=BASE_URL,
access_token=ACCESS_TOKEN,
)
async with ApiClient(config) as api_client:
# 创建 token API 实例
token_api = TokenApi(api_client)
chain = ChainSymbol.SOL
token_address = 'So11111111111111111111111111111111111111112' # SOL
print(f'查询代币: {chain.value}/{token_address}')
# 获取代币元数据
result = await token_api.get_metadata(
chain=chain,
token_address=token_address,
)
print('代币元数据:')
print(f' 名称: {result.name}')
print(f' 符号: {result.symbol}')
print(f' 精度: {result.decimals}')
if __name__ == '__main__':
asyncio.run(main())
WebSocket 示例
订阅实时代币 K 线数据:复制
询问AI
import asyncio
import signal
from chainstream import ChainStreamClient
from chainstream.stream import Resolution, TokenCandle
ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
async def main():
client = ChainStreamClient(ACCESS_TOKEN)
chain = 'sol'
token_address = 'So11111111111111111111111111111111111111112' # SOL
resolution = Resolution.S1
print(f'订阅代币 K 线: {chain}/{token_address}')
print('监听中... (按 Ctrl+C 停止)\n')
message_count = 0
def on_candle(candle: TokenCandle) -> None:
nonlocal message_count
message_count += 1
print(
f'[{message_count}] open={candle.open}, close={candle.close}, '
f'high={candle.high}, low={candle.low}, volume={candle.volume}'
)
unsub = await client.stream.subscribe_token_candles(
chain=chain,
token_address=token_address,
resolution=resolution,
callback=on_candle,
)
# 创建事件等待 Ctrl+C
stop_event = asyncio.Event()
def signal_handler(sig, frame):
print(f'\n收到 {message_count} 条消息')
print('关闭连接...')
stop_event.set()
signal.signal(signal.SIGINT, signal_handler)
await stop_event.wait()
# 清理
unsub.unsubscribe()
await client.close()
if __name__ == '__main__':
asyncio.run(main())
Pandas 集成
复制
询问AI
import pandas as pd
candles = await client.token.get_candles('sol', 'TOKEN_ADDRESS', resolution='1h', limit=100)
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')

