Solana targets 400ms block time, with actual throughput of ~4,000 TPS and theoretical peak of 65,000 TPS. The extremely high message volume places high demands on consumer processing capacity.
Schema Repository : github.com/chainstream-io/streaming_protobuf/solana
Message Types Overview
Solana Streams provides the following message types:
Message Type Description Topic TradeEvents DEX trade events sol.dex.tradesTokenEvents Token events sol.tokensBalanceEvents Balance change events sol.balancesDexPoolEvents Liquidity pool events sol.dex.poolsTransferEvents Transfer events sol.transfersCandlestickEvents Candlestick data sol.candlesticks
Block-Level Data
Solana uses Slot rather than traditional block number as the timeline identifier.
Field Type Description Slotuint64 Slot number (primary identifier) BlockHeightuint64 Block height BlockHashbytes Block hash ParentSlotuint64 Parent slot TimestampTimestamp Block time
In Solana’s shred stream, Block Header may be incomplete, only the Slot field is guaranteed correct.
Transaction-Level Data
Core Transaction Fields
Field Type Description Signaturebytes Transaction signature (unique identifier) StatusStatus Execution status HeaderTransactionHeader Metadata (including fees and signers) Indexuint32 Position within block
Instructions
Transactions contain multiple Instructions, which is the core of Solana’s execution model:
Field Description ProgramAccountIndexCalled program DataEncoded instruction data AccountIndexesReferenced accounts BalanceUpdatesSOL balance changes caused by this instruction
A Solana transaction can contain multiple Instructions, each calling different programs.
Transfer Data
TransferEvents provides Solana transfer information (Topic: sol.transfers).
TransferEvent Structure
message TransferEvent {
Instruction instruction = 1 ;
Block block = 2 ;
Transaction transaction = 3 ;
DApp d_app = 4 ;
Transfer transfer = 100 ;
TransferProcessed transfer_processed = 200 ; // included in processed topic
}
Transfer Core Fields
Field Type Description sender_token_account_addressstring Sender token account address sender_account_owner_addressstring Sender account owner address receiver_token_account_addressstring Receiver token account address receiver_account_owner_addressstring Receiver account owner address token_addressstring Token address (Mint) token_amountstring Transfer amount
Token events use common TokenEvents (Topic: sol.tokens), containing:
Field Description addressToken address name / symbolName and symbol decimalsDecimals uriMetadata URI metadata_addressMetadata address creatorsCreator list (address, verified status, share)
Solana tokens include additional metadata fields:
Field Description collection_addressCollection address (NFT) fungibleIs fungible is_mutableIs mutable is_nativeIs native SOL program_addressProgram address seller_fee_basis_pointsCreator royalty basis points (NFT) token_standardToken standard mint_authority / freeze_authority / update_authorityAuthority accounts is_verified_collectionIs verified collection
Balance Updates Layered Mechanism (Solana-specific)
This is an important characteristic of Solana stream data, balance updates are provided at two levels:
Immediate balance changes after each instruction execution, reflecting the direct impact of single-step operations. Transaction
└── Instruction 1
└── BalanceUpdate: +100 SOL
└── Instruction 2
└── BalanceUpdate: -50 SOL
└── Instruction 3
└── BalanceUpdate: +25 SOL
Final balance state after all instructions in a transaction complete. Transaction
└── FinalBalanceUpdate: +75 SOL (net change)
This enables developers to track both fine-grained fund flows and final states.
DEX Data
TradeEvents provides DEX trade data (Topic: sol.dex.trades), using common TradeEvent structure.
Trade Core Fields
Field Description token_a_address / token_b_addressTrading pair token addresses user_a_amount / user_b_amountUser trade amounts pool_addressPool address vault_a / vault_bPool vault addresses vault_a_amount / vault_b_amountVault amounts bonding_curveBonding curve info (if applicable)
DApp Info
Field Description program_addressDEX program address (e.g., Raydium, Orca) inner_program_addressInner program address chainChain identifier (CHAIN_SOLANA)
DexPoolEvent - Liquidity Pools
Field Description typeEvent type (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) addressPool address token_a_address / token_b_addressToken addresses token_a_amount / token_b_amountToken amounts lp_walletLP wallet address
Solana Chain Characteristics
Slot Mechanism
Solana uses Slot rather than traditional block number as timeline identifier:
Characteristic Description Block interval ~400ms Message volume Much higher than EVM chains Primary identifier Slot number
Message Packaging
Transactions are packaged in small batches, each Kafka message contains no more than 250 transactions.
In Solana’s shred stream, Block Header may be incomplete, only the Slot field is guaranteed correct. Other fields may be empty or inaccurate.
High-Throughput Processing Recommendations
Due to Solana’s extremely high throughput, we recommend:
Sufficient processing capacity : Ensure consumers have adequate CPU and memory
Parallel processing : Use multi-threading/coroutines for parallel message processing
Efficient parsing : Optimize Protobuf parsing code
Batch writes : Use batch database writes if persistence is needed
Topic → Message Type Mapping
Topic Proto File Message Type Description sol.dex.tradestrade_event.proto TradeEvents DEX trade events sol.dex.trades.processedtrade_event.proto TradeEvents With USD price, suspicious flag sol.tokenstoken_event.proto TokenEvents Token events sol.tokens.processedtoken_event.proto TokenEvents With description, image, social links sol.balancesbalance_event.proto BalanceEvents Balance change events sol.transferssolana/transfer_event.proto TransferEvents Solana transfer events sol.transfers.processedsolana/transfer_processed_event.proto TransferProcessedEvents With USD value sol.dex.poolsdex_pool_event.proto DexPoolEvents Liquidity pool events sol.candlestickscandlestick.proto CandlestickEvents Candlestick data
Code Examples
Python Example: Consume Solana DEX Trades
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'sol.dex.trades' ,
bootstrap_servers = [ '<your_broker_address>' ],
security_protocol = 'SASL_SSL' ,
sasl_mechanism = 'SCRAM-SHA-512' ,
sasl_plain_username = 'your_username' ,
sasl_plain_password = 'your_password' ,
auto_offset_reset = 'latest' ,
enable_auto_commit = False ,
group_id = 'my-solana-consumer'
)
# Consume and parse messages
for message in consumer:
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
for event in trade_events.events:
print ( f "Pool: { event.trade.pool_address } " )
print ( f "Token A: { event.trade.token_a_address } " )
print ( f "Token B: { event.trade.token_b_address } " )
print ( f "Amount A: { event.trade.user_a_amount } " )
print ( f "Amount B: { event.trade.user_b_amount } " )
print ( f "Slot: { event.block.slot } " )
print ( f "DEX Program: { event.d_app.program_address } " )
print ( "---" )
package main
import (
" context "
" log "
" github.com/segmentio/kafka-go "
" github.com/segmentio/kafka-go/sasl/scram "
" google.golang.org/protobuf/proto "
common " github.com/chainstream-io/streaming_protobuf/common/messages "
)
func main () {
mechanism , _ := scram . Mechanism ( scram . SHA512 , "your_username" , "your_password" )
reader := kafka . NewReader ( kafka . ReaderConfig {
Brokers : [] string { "<your_broker_address>" },
Topic : "sol.dex.trades" ,
GroupID : "my-solana-consumer" ,
Dialer : & kafka . Dialer {
SASLMechanism : mechanism ,
TLS : & tls . Config {},
},
})
defer reader . Close ()
// Use worker pool for parallel processing
messages := make ( chan kafka . Message , 100 )
// Start workers
for i := 0 ; i < 10 ; i ++ {
go func () {
for msg := range messages {
var tradeEvents common . TradeEvents
if err := proto . Unmarshal ( msg . Value , & tradeEvents ); err != nil {
log . Printf ( "Failed to parse: %v " , err )
continue
}
// Process trades
for _ , event := range tradeEvents . Events {
log . Printf ( "Pool: %s , Token A: %s , Token B: %s " ,
event . Trade . PoolAddress ,
event . Trade . TokenAAddress ,
event . Trade . TokenBAddress )
log . Printf ( "Amount A: %s , Amount B: %s , Slot: %d " ,
event . Trade . UserAAmount ,
event . Trade . UserBAmount ,
event . Block . Slot )
}
}
}()
}
// Read messages
for {
msg , err := reader . ReadMessage ( context . Background ())
if err != nil {
log . Printf ( "Error reading: %v " , err )
continue
}
messages <- msg
}
}
Concepts & Integration Guide Kafka Streams integration basics
EVM Streams EVM chain data streams
TRON Streams TRON network data streams
WebSocket Real-time Data WebSocket integration