跳转到主要内容
本教程介绍如何构建一个 Smart Money 追踪系统,实时监控聪明钱地址的交易行为,获取有价值的市场信号。
预计时间:45 分钟
难度等级:⭐⭐⭐ 中级

目标

构建完整的 Smart Money 追踪系统: 功能清单
  • ✅ 获取 Smart Money 列表
  • ✅ 订阅 SM 交易实时流
  • ✅ 交易解析和模式识别
  • ✅ 信号通知

方法论回顾

在开始之前,建议先阅读 Smart Money 方法论,了解:
  • Smart Money 的定义和类型
  • 评分模型(Smart Score)
  • 信号强度解读
  • 使用注意事项

Step 1:获取 SM 列表

API 获取 Smart Money 列表

# sm_client.py
import requests
from config import CHAINSTREAM_ACCESS_TOKEN, CHAINSTREAM_BASE_URL

class SmartMoneyClient:
    def __init__(self):
        self.headers = {'Authorization': f'Bearer {CHAINSTREAM_ACCESS_TOKEN}'}
    
    def get_smart_money_list(self, chain: str = 'ethereum', limit: int = 100) -> list:
        """获取 Smart Money 列表"""
        url = f"{CHAINSTREAM_BASE_URL}/discovery/smart-money"
        
        params = {
            'chain': chain,
            'limit': limit,
            'min_score': 70,  # 最低 Smart Score
            'sort': 'score_desc'
        }
        
        response = requests.get(url, headers=self.headers, params=params)
        return response.json().get('wallets', [])
        
        # 返回示例:
        # [
        #   {
        #     "address": "0xabc...",
        #     "smart_score": 92,
        #     "tags": ["whale", "defi_expert"],
        #     "stats": {
        #       "roi_90d": "45.2%",
        #       "win_rate": "72%"
        #     }
        #   },
        #   ...
        # ]
    
    def get_wallet_activities(self, address: str, limit: int = 20) -> list:
        """获取钱包最近交易"""
        url = f"{CHAINSTREAM_BASE_URL}/wallets/{address}/activities"
        
        params = {'limit': limit}
        response = requests.get(url, headers=self.headers, params=params)
        return response.json().get('activities', [])

使用预设列表

# ChainStream 提供预设的 SM 列表
PRESET_LISTS = {
    'ethereum_top_100': 'eth_top_traders',
    'solana_smart_money': 'sol_smart_money',
    'base_early_adopters': 'base_early'
}

def get_preset_list(list_id: str) -> list:
    url = f"{CHAINSTREAM_BASE_URL}/discovery/lists/{list_id}"
    response = requests.get(url, headers=headers)
    return response.json().get('wallets', [])

Step 2:订阅交易

WebSocket 订阅

# tracker.py
import asyncio
import json
import websockets
from config import CHAINSTREAM_ACCESS_TOKEN

class SmartMoneyTracker:
    def __init__(self, wallets: list):
        self.wallets = wallets
        self.ws_url = 'wss://realtime-dex.chainstream.io/connection/websocket'
        self.ws = None
    
    async def connect(self):
        """连接 WebSocket"""
        # 通过 URL 参数传递 token
        ws_url = f"{self.ws_url}?token={CHAINSTREAM_ACCESS_TOKEN}"
        self.ws = await websockets.connect(ws_url)
        print("✅ WebSocket 连接成功")
    
    async def subscribe(self):
        """订阅 Smart Money 交易"""
        # 方式1:订阅特定地址
        for wallet in self.wallets[:50]:  # 限制订阅数量
            msg = {
                'action': 'subscribe',
                'channel': 'wallet_activity',
                'params': {
                    'address': wallet['address'],
                    'events': ['swap', 'transfer']
                }
            }
            await self.ws.send(json.dumps(msg))
        
        # 方式2:订阅 Smart Money 聚合频道(推荐)
        msg = {
            'action': 'subscribe',
            'channel': 'smart_money',
            'params': {
                'chains': ['ethereum', 'arbitrum', 'base'],
                'min_score': 70,
                'min_value_usd': 10000
            }
        }
        await self.ws.send(json.dumps(msg))
        
        print("📡 已订阅 Smart Money 交易流")
    
    async def listen(self, callback):
        """监听交易事件"""
        async for message in self.ws:
            data = json.loads(message)
            
            if data.get('type') == 'smart_money_activity':
                await callback(data)
    
    async def run(self, callback):
        """运行追踪器"""
        while True:
            try:
                await self.connect()
                await self.subscribe()
                await self.listen(callback)
            except websockets.ConnectionClosed:
                print("⚠️ 连接断开,5秒后重连...")
                await asyncio.sleep(5)

或使用 Webhook

# 创建 Webhook 订阅 Smart Money 活动
webhook_config = {
    'url': 'https://your-server.com/sm-webhook',
    'events': ['smartmoney.buy', 'smartmoney.sell', 'smartmoney.new_token'],
    'filters': {
        'min_score': 70,
        'min_value_usd': 10000,
        'chains': ['ethereum', 'arbitrum']
    }
}

Step 3:分析与通知

交易解析

# analyzer.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class SmartMoneySignal:
    wallet_address: str
    wallet_score: int
    wallet_tags: list
    action: str  # buy, sell
    token_symbol: str
    token_address: str
    amount_usd: float
    price: float
    signal_strength: str  # strong, medium, weak
    timestamp: str

class SignalAnalyzer:
    
    def analyze(self, event: dict) -> Optional[SmartMoneySignal]:
        """分析交易事件,生成信号"""
        wallet = event.get('wallet', {})
        transaction = event.get('transaction', {})
        
        # 计算信号强度
        strength = self._calc_signal_strength(wallet, transaction)
        
        if strength == 'ignore':
            return None
        
        return SmartMoneySignal(
            wallet_address=wallet['address'],
            wallet_score=wallet.get('smart_score', 0),
            wallet_tags=wallet.get('tags', []),
            action=transaction.get('type', 'unknown'),
            token_symbol=transaction.get('token', {}).get('symbol', 'Unknown'),
            token_address=transaction.get('token', {}).get('address', ''),
            amount_usd=float(transaction.get('value_usd', 0)),
            price=float(transaction.get('price', 0)),
            signal_strength=strength,
            timestamp=event.get('timestamp', '')
        )
    
    def _calc_signal_strength(self, wallet: dict, tx: dict) -> str:
        """计算信号强度"""
        score = wallet.get('smart_score', 0)
        value_usd = float(tx.get('value_usd', 0))
        is_new_token = tx.get('is_first_buy', False)
        
        # 强信号条件
        if score >= 85 and value_usd >= 100000:
            return 'strong'
        if is_new_token and score >= 80:
            return 'strong'
        
        # 中等信号
        if score >= 70 and value_usd >= 50000:
            return 'medium'
        
        # 弱信号
        if score >= 60 and value_usd >= 10000:
            return 'weak'
        
        return 'ignore'

通知模块

# notifier.py
import aiohttp
from config import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID

class Notifier:
    
    async def send_signal(self, signal):
        """发送信号通知"""
        
        # 信号强度 emoji
        strength_emoji = {
            'strong': '🔴',
            'medium': '🟡',
            'weak': '🟢'
        }
        
        # 动作 emoji
        action_emoji = '📈' if signal.action == 'buy' else '📉'
        
        message = f"""
{strength_emoji.get(signal.signal_strength, '⚪')} Smart Money 信号

{action_emoji} **{signal.action.upper()}** {signal.token_symbol}

👤 地址: `{signal.wallet_address[:10]}...`
⭐ Smart Score: {signal.wallet_score}
🏷️ 标签: {', '.join(signal.wallet_tags)}

💰 金额: ${signal.amount_usd:,.0f}
💵 价格: ${signal.price:.6f}

{signal.timestamp}
        """
        
        await self._send_telegram(message)
    
    async def _send_telegram(self, message: str):
        """发送 Telegram 消息"""
        url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
        
        async with aiohttp.ClientSession() as session:
            await session.post(url, json={
                'chat_id': TELEGRAM_CHAT_ID,
                'text': message,
                'parse_mode': 'Markdown'
            })

完整代码

# main.py
import asyncio
from sm_client import SmartMoneyClient
from tracker import SmartMoneyTracker
from analyzer import SignalAnalyzer
from notifier import Notifier

async def main():
    # 初始化
    client = SmartMoneyClient()
    analyzer = SignalAnalyzer()
    notifier = Notifier()
    
    # 获取 Smart Money 列表
    print("📋 获取 Smart Money 列表...")
    wallets = client.get_smart_money_list(chain='ethereum', limit=100)
    print(f"✅ 获取到 {len(wallets)} 个 Smart Money 地址")
    
    # 定义回调处理
    async def on_activity(event):
        signal = analyzer.analyze(event)
        
        if signal:
            print(f"🎯 {signal.signal_strength.upper()} | {signal.action} {signal.token_symbol} | ${signal.amount_usd:,.0f}")
            await notifier.send_signal(signal)
    
    # 启动追踪
    tracker = SmartMoneyTracker(wallets)
    print("🚀 启动 Smart Money 追踪器...")
    await tracker.run(on_activity)

if __name__ == '__main__':
    asyncio.run(main())

应用场景

跟单参考

# 筛选强信号用于参考
def filter_strong_signals(signals: list) -> list:
    return [
        s for s in signals 
        if s.signal_strength == 'strong' 
        and s.action == 'buy'
    ]

市场情绪指标

# 计算 SM 买卖比
def calc_sentiment(signals: list, hours: int = 24) -> dict:
    recent = filter_recent(signals, hours)
    
    buys = sum(1 for s in recent if s.action == 'buy')
    sells = sum(1 for s in recent if s.action == 'sell')
    
    ratio = buys / max(sells, 1)
    
    return {
        'buy_count': buys,
        'sell_count': sells,
        'ratio': ratio,
        'sentiment': 'bullish' if ratio > 1.5 else 'bearish' if ratio < 0.7 else 'neutral'
    }

注意事项

重要提示
  1. 不要盲目跟单 — SM 信号仅供参考
  2. 注意延迟 — 链上确认 + 分析存在延迟
  3. 分散关注 — 多个 SM 共振信号更有价值
  4. 独立研究 — 结合基本面做判断

相关文档