Skip to main content
ChainStream provides multi-chain real-time on-chain data streams through Kafka Streams. Compared to GraphQL Subscriptions and WebSocket, Kafka Streams is designed for latency-sensitive, high-reliability server-side application scenarios, offering lower latency and stronger fault tolerance for data consumption.

Protobuf Schema Repository

Official ChainStream Protobuf Schema definitions, supporting Go and Python, including all message types for EVM, Solana, and TRON.

Support Matrix

Chaindex.tradestokensbalancesdex.poolstransferscandlesticks
Ethereum (eth)
BSC (bsc)
Solana (sol)
TRON (tron)
All chains also support token-supplies, token-prices, token-holdings, token-market-caps, trade-stats Topics. See full Topic list for details.

Kafka Streams vs WebSocket Selection Guide

When to Choose Kafka Streams

Latency Sensitive

Latency is the primary concern, application deployed on cloud or dedicated servers

Message Reliability

Cannot accept losing any messages, requires durable and reliable data consumption

Complex Processing

Need to perform complex computation, filtering, or formatting beyond pre-processing capabilities

Horizontal Scaling

Need multi-instance horizontal scaling for consumption capacity

When to Choose WebSocket

Rapid Prototyping

Building prototypes, development speed is the primary factor

Unified Interface

Application needs both historical and real-time data with unified query and subscription interface

Browser-side

Application consumes data directly in browser (Kafka Streams only supports server-side)

Dynamic Filtering

Need to dynamically filter data based on page content

Comparison Summary

FeatureKafka StreamsWebSocket
LatencyLowestLow
ReliabilityPersistent, no message lossMay lose on disconnect
ScalabilityNative horizontal scalingRequires additional design
Data FilteringClient-side processingServer-side pre-filtering
Client SupportServer-side onlyServer + Browser
Integration ComplexityHigherLower

Credential Acquisition

Kafka Streams uses independent authentication credentials and requires contacting the ChainStream team to apply for access.
1

Contact to Apply

Send an email to [email protected] to apply for Kafka Streams access
2

Receive Credentials

After approval, you will receive the following credential information:
  • Username
  • Password
  • Broker address list
3

Configure Connection

Configure your Kafka client connection using the received credentials

Connection Configuration

Broker Address

The Broker address will be provided along with your credentials after your application is approved. Do not use any unauthorized addresses for connection.

SASL_SSL Connection Configuration

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='your_group_id'
)

Topic Naming Convention & Complete List

Naming Convention

Topics follow this naming pattern:
{chain}.{message_type}              # Raw event data
{chain}.{message_type}.processed    # Processed data (with price, flags, enrichments)
{chain}.{message_type}.created      # Creation events (e.g., token creation)
Where {chain} includes: sol, bsc, eth, tron

Message Types

TypeDescription
dex.tradesDEX trade events
dex.poolsLiquidity pool events
tokensToken events
balancesBalance change events
transfersTransfer events
token-suppliesToken supply events
token-pricesToken price events
token-holdingsToken holding data
token-market-capsToken market cap events
candlesticksOHLCV candlestick data
trade-statsTrade statistics

Complete Topic List

The following Topics apply to all supported chains (replace {chain} with sol, bsc, eth):
# DEX Trades
{chain}.dex.trades
{chain}.dex.trades.processed    # Includes USD/native price, suspicious flag

# Token Events
{chain}.tokens
{chain}.tokens.created          # Token creation events
{chain}.tokens.processed        # Includes description, image URL, social links

# Balance Changes
{chain}.balances
{chain}.balances.processed      # Includes USD/native value

# Liquidity Pools
{chain}.dex.pools
{chain}.dex.pools.processed     # Includes liquidity USD/native value

# Token Data
{chain}.token-supplies
{chain}.token-supplies.processed
{chain}.token-prices
{chain}.token-holdings
{chain}.token-market-caps.processed

# Aggregated Data
{chain}.candlesticks            # OHLCV candlestick data
{chain}.trade-stats             # Trade statistics
For complete Protobuf Schema and Topic mappings, refer to the streaming_protobuf repository.

Consumption Modes & Offset Management

Two core configurations to consider when subscribing to topics:

Offset Strategy Selection

Consumers need to decide where to start reading messages after connecting to Kafka. Two common strategies:
Start from the current latest position on each connection, suitable for scenarios only caring about real-time data. No historical message replay on reconnection.
{
  autoCommit: false,
  fromBeginning: false,
  'auto.offset.reset': 'latest'
}

Group ID Rules

Deploying multiple instances with the same Group ID enables failover and load balancing—messages from the same topic will only be consumed by one instance in the Group, with Kafka automatically distributing partitions among instances.
It’s recommended to have an independent consumer for each topic, as different topics have different message parsing logic.

Quick Start: First Consumer in 5 Minutes

The following example shows how to consume the eth.dex.trades topic and parse DEX trade data.
1

Get Protobuf Schema

Clone the Schema definitions from the official repository:
git clone https://github.com/chainstream-io/streaming_protobuf.git
Or add as a Git submodule to your project:
git submodule add https://github.com/chainstream-io/streaming_protobuf.git
2

Install Dependencies

pip install kafka-python protobuf
3

Configure and Consume

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'eth.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-dex-consumer'
)

# Consume messages
for message in consumer:
    # Parse protobuf message
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    # Print DEX trade info
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Block: {event.block.height}")
        print("---")

Core Data Structures

All message types share these base structures (defined in common/common.proto):

Base Structures

Block information:
FieldTypeDescription
timestampint64Block timestamp
hashstringBlock hash
heightuint64Block height
slotuint64Slot number (Solana)

Main Message Types

Topic: {chain}.dex.trades
message TradeEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Trade trade = 100;
  BondingCurve bonding_curve = 110;
  TradeProcessed trade_processed = 200;  // included in processed topic
}
Trade Core Fields:
FieldDescription
token_a_address / token_b_addressTrading pair token addresses
user_a_amount / user_b_amountUser trade amounts
pool_addressPool address
vault_a / vault_bPool vault addresses
vault_a_amount / vault_b_amountVault amounts
TradeProcessed Enhanced Fields (processed topic):
FieldDescription
token_a_price_in_usd / token_b_price_in_usdUSD price
token_a_price_in_native / token_b_price_in_nativeNative currency price
is_token_a_price_in_usd_suspectIs price suspicious
is_token_a_price_in_usd_suspect_reasonSuspicious reason
Topic: {chain}.tokens, {chain}.tokens.created
message TokenEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  EventType type = 100;        // CREATED, UPDATED
  Token token = 101;
  TokenProcessed token_processed = 200;
}
Token Core Fields:
FieldDescription
addressToken address
name / symbolName and symbol
decimalsDecimals
uriMetadata URI
metadata_addressMetadata address
creatorsCreator list
solana_extraSolana-specific fields
evm_extraEVM-specific fields (token_standard)
Topic: {chain}.balances
message BalanceEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Balance balance = 100;
  BalanceProcessed balance_processed = 200;
}
Balance Core Fields:
FieldDescription
token_account_addressToken account address
account_owner_addressAccount owner address
token_addressToken address
pre_amount / post_amountBalance before/after
decimalsDecimals
lifecycleAccount lifecycle (NEW/EXISTING/CLOSED)
Topic: {chain}.dex.pools
message DexPoolEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  DexPoolEventType type = 100;  // INITIALIZE, INCREASE_LIQUIDITY, DECREASE_LIQUIDITY, SWAP
  DexPool pool = 101;
  DexPoolProcessed pool_processed = 200;
}
DexPool Core Fields:
FieldDescription
addressPool address
token_a_address / token_b_addressToken addresses
token_a_vault_address / token_b_vault_addressVault addresses
token_a_amount / token_b_amountToken amounts
lp_walletLP wallet address
Topic: {chain}.candlesticks
FieldDescription
token_addressToken address
resolutionTime resolution (1m, 5m, 15m, 1h, etc.)
timestampTimestamp
open / high / low / closeOHLC prices (USD)
open_in_native / high_in_native / low_in_native / close_in_nativeOHLC prices (native)
volume / volume_in_usd / volume_in_nativeVolume
tradesTrade count
dimensionDimension type (TOKEN_ADDRESS/POOL_ADDRESS/PAIR)
Topic: {chain}.trade-stats
FieldDescription
token_addressToken address
resolutionTime resolution
buys / sellsBuy/sell count
buyers / sellersBuyer/seller count
buy_volume / sell_volumeBuy/sell volume
buy_volume_in_usd / sell_volume_in_usdUSD volume
high_in_usd / low_in_usdHigh/low price
Topic: {chain}.token-holdings
Field GroupDescription
top10_holders / top10_amount / top10_ratioTop 10 holders stats
top50_holders / top50_amount / top50_ratioTop 50 holders stats
top100_holders / top100_amount / top100_ratioTop 100 holders stats
holdersTotal holders count
creators_count / creators_amount / creators_ratioCreators holding stats
fresh_count / fresh_amount / fresh_ratioFresh wallets holding stats
smart_count / smart_amount / smart_ratioSmart Money holding stats
sniper_count / sniper_amount / sniper_ratioSniper holding stats
insider_count / insider_amount / insider_ratioInsider holding stats
Topic: {chain}.token-prices
FieldDescription
token_addressToken address
price_in_usdUSD price
price_in_nativeNative currency price
Topic: {chain}.token-supplies
FieldDescription
typeEvent type (INITIALIZE_MINT/MINT/BURN)
token_addressToken address
amountAmount
decimalsDecimals
amount_with_decimalsAmount with decimals
For complete Protobuf definitions, refer to the streaming_protobuf repository.

Message Characteristics & Considerations

Developers need to be aware of the following message characteristics when consuming Kafka Streams:
Stream does not pre-filter, containing all messages and complete data within the topic. This means consumers need sufficient network throughput, server performance, and efficient parsing code.
Messages for the same token or same account arrive strictly in block order. This means the event stream for a specific token or wallet address is ordered, making it easy to track state changes. However, message arrival order between different tokens/accounts is not guaranteed.
The same message may be delivered multiple times. If duplicate processing causes issues, consumers need to maintain cache or storage for idempotent processing.
ChainStream guarantees the integrity of each message. Messages will not be split. Regardless of how many transactions a block contains, the message you receive is a complete data unit.
Messages use Protobuf encoding, more compact than JSON. Consumers need to use the corresponding language’s Protobuf library for parsing.

Latency Model

Kafka Streams latency depends on the processing stages data passes through in the pipeline. Different topics from the same chain have different latencies:

Broadcasted vs Committed

TypeDescriptionLatencyData Certainty
BroadcastedTransactions consumable at broadcast stage, no block confirmation neededLowestLower
CommittedTransactions enter stream only after block confirmationHigherHighest

Pipeline Latency

Each transformation layer from blockchain node to Kafka topic (parsing, structuring, enrichment) introduces approximately 100-1000ms latency:
  • raw topic: Lowest latency, closest to raw node data
  • transactions topic: Parsed and structured
  • dextrades topic: Relatively higher latency, but richer data
If latency is your primary concern, prefer topics closest to raw data that you can effectively parse.

Best Practices

Parallel Partition Consumption

Kafka topics are divided into multiple partitions, each partition needs parallel reading to maximize throughput. Message partition keys are set to token address or wallet address (unified across all chains), ensuring:
  • All events for the same token route to the same partition, guaranteeing order
  • All balance changes for the same wallet route to the same partition, facilitating state tracking
Recommend allocating an independent thread for each partition for load balancing.

Continuous Consumption, Don’t Block Main Loop

Consumer’s read loop should run continuously, avoiding backlog from message processing blocking. If messages need processing, adopt async processing mode: main loop handles reading, processing logic delegated to worker threads.

Message Processing Efficiency

Batch processing can reduce overhead, but requires balancing batch size and latency. In Go, use channel + worker group for concurrent processing.

Chain-Specific Documentation