Solana targets 400ms block time, with actual throughput of ~4,000 TPS and theoretical peak of 65,000 TPS. The extremely high message volume places high demands on consumer processing capacity.
Schema Repository: github.com/chainstream-io/streaming_protobuf/solana
Message Types Overview
Solana Streams provides the following message types:
| Message Type | Description | Topic |
|---|
| TradeEvents | DEX trade events | sol.dex.trades |
| TokenEvents | Token events | sol.tokens |
| BalanceEvents | Balance change events | sol.balances |
| DexPoolEvents | Liquidity pool events | sol.dex.pools |
| TransferEvents | Transfer events | sol.transfers |
| CandlestickEvents | Candlestick data | sol.candlesticks |
Block-Level Data
Solana uses Slot rather than traditional block number as the timeline identifier.
| Field | Type | Description |
|---|
Slot | uint64 | Slot number (primary identifier) |
BlockHeight | uint64 | Block height |
BlockHash | bytes | Block hash |
ParentSlot | uint64 | Parent slot |
Timestamp | Timestamp | Block time |
In Solana’s shred stream, Block Header may be incomplete, only the Slot field is guaranteed correct.
Transaction-Level Data
Core Transaction Fields
| Field | Type | Description |
|---|
Signature | bytes | Transaction signature (unique identifier) |
Status | Status | Execution status |
Header | TransactionHeader | Metadata (including fees and signers) |
Index | uint32 | Position within block |
Instructions
Transactions contain multiple Instructions, which is the core of Solana’s execution model:
| Field | Description |
|---|
ProgramAccountIndex | Called program |
Data | Encoded instruction data |
AccountIndexes | Referenced accounts |
BalanceUpdates | SOL balance changes caused by this instruction |
A Solana transaction can contain multiple Instructions, each calling different programs.
Transfer Data
TransferEvents provides Solana transfer information (Topic: sol.transfers).
TransferEvent Structure
message TransferEvent {
Instruction instruction = 1;
Block block = 2;
Transaction transaction = 3;
DApp d_app = 4;
Transfer transfer = 100;
TransferProcessed transfer_processed = 200; // included in processed topic
}
Transfer Core Fields
| Field | Type | Description |
|---|
sender_token_account_address | string | Sender token account address |
sender_account_owner_address | string | Sender account owner address |
receiver_token_account_address | string | Receiver token account address |
receiver_account_owner_address | string | Receiver account owner address |
token_address | string | Token address (Mint) |
token_amount | string | Transfer amount |
Token events use common TokenEvents (Topic: sol.tokens), containing:
| Field | Description |
|---|
address | Token address |
name / symbol | Name and symbol |
decimals | Decimals |
uri | Metadata URI |
metadata_address | Metadata address |
creators | Creator list (address, verified status, share) |
Solana tokens include additional metadata fields:
| Field | Description |
|---|
collection_address | Collection address (NFT) |
fungible | Is fungible |
is_mutable | Is mutable |
is_native | Is native SOL |
program_address | Program address |
seller_fee_basis_points | Creator royalty basis points (NFT) |
token_standard | Token standard |
mint_authority / freeze_authority / update_authority | Authority accounts |
is_verified_collection | Is verified collection |
Balance Updates Layered Mechanism (Solana-specific)
This is an important characteristic of Solana stream data, balance updates are provided at two levels:
Immediate balance changes after each instruction execution, reflecting the direct impact of single-step operations.Transaction
└── Instruction 1
└── BalanceUpdate: +100 SOL
└── Instruction 2
└── BalanceUpdate: -50 SOL
└── Instruction 3
└── BalanceUpdate: +25 SOL
Final balance state after all instructions in a transaction complete.Transaction
└── FinalBalanceUpdate: +75 SOL (net change)
This enables developers to track both fine-grained fund flows and final states.
DEX Data
TradeEvents provides DEX trade data (Topic: sol.dex.trades), using common TradeEvent structure.
Trade Core Fields
| Field | Description |
|---|
token_a_address / token_b_address | Trading pair token addresses |
user_a_amount / user_b_amount | User trade amounts |
pool_address | Pool address |
vault_a / vault_b | Pool vault addresses |
vault_a_amount / vault_b_amount | Vault amounts |
bonding_curve | Bonding curve info (if applicable) |
DApp Info
| Field | Description |
|---|
program_address | DEX program address (e.g., Raydium, Orca) |
inner_program_address | Inner program address |
chain | Chain identifier (CHAIN_SOLANA) |
DexPoolEvent - Liquidity Pools
| Field | Description |
|---|
type | Event type (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP) |
address | Pool address |
token_a_address / token_b_address | Token addresses |
token_a_amount / token_b_amount | Token amounts |
lp_wallet | LP wallet address |
Solana Chain Characteristics
Slot Mechanism
Solana uses Slot rather than traditional block number as timeline identifier:
| Characteristic | Description |
|---|
| Block interval | ~400ms |
| Message volume | Much higher than EVM chains |
| Primary identifier | Slot number |
Message Packaging
Transactions are packaged in small batches, each Kafka message contains no more than 250 transactions.
In Solana’s shred stream, Block Header may be incomplete, only the Slot field is guaranteed correct. Other fields may be empty or inaccurate.
High-Throughput Processing Recommendations
Due to Solana’s extremely high throughput, we recommend:
- Sufficient processing capacity: Ensure consumers have adequate CPU and memory
- Parallel processing: Use multi-threading/coroutines for parallel message processing
- Efficient parsing: Optimize Protobuf parsing code
- Batch writes: Use batch database writes if persistence is needed
Topic → Message Type Mapping
| Topic | Proto File | Message Type | Description |
|---|
sol.dex.trades | trade_event.proto | TradeEvents | DEX trade events |
sol.dex.trades.processed | trade_event.proto | TradeEvents | With USD price, suspicious flag |
sol.tokens | token_event.proto | TokenEvents | Token events |
sol.tokens.processed | token_event.proto | TokenEvents | With description, image, social links |
sol.balances | balance_event.proto | BalanceEvents | Balance change events |
sol.transfers | solana/transfer_event.proto | TransferEvents | Solana transfer events |
sol.transfers.processed | solana/transfer_processed_event.proto | TransferProcessedEvents | With USD value |
sol.dex.pools | dex_pool_event.proto | DexPoolEvents | Liquidity pool events |
sol.candlesticks | candlestick.proto | CandlestickEvents | Candlestick data |
Code Examples
Python Example: Consume Solana DEX Trades
from kafka import KafkaConsumer
from common import trade_event_pb2 # Get from streaming_protobuf repository
# Create consumer
consumer = KafkaConsumer(
'sol.dex.trades',
bootstrap_servers=['<your_broker_address>'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='your_username',
sasl_plain_password='your_password',
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-solana-consumer'
)
# Consume and parse messages
for message in consumer:
trade_events = trade_event_pb2.TradeEvents()
trade_events.ParseFromString(message.value)
for event in trade_events.events:
print(f"Pool: {event.trade.pool_address}")
print(f"Token A: {event.trade.token_a_address}")
print(f"Token B: {event.trade.token_b_address}")
print(f"Amount A: {event.trade.user_a_amount}")
print(f"Amount B: {event.trade.user_b_amount}")
print(f"Slot: {event.block.slot}")
print(f"DEX Program: {event.d_app.program_address}")
print("---")
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"google.golang.org/protobuf/proto"
common "github.com/chainstream-io/streaming_protobuf/common/messages"
)
func main() {
mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"<your_broker_address>"},
Topic: "sol.dex.trades",
GroupID: "my-solana-consumer",
Dialer: &kafka.Dialer{
SASLMechanism: mechanism,
TLS: &tls.Config{},
},
})
defer reader.Close()
// Use worker pool for parallel processing
messages := make(chan kafka.Message, 100)
// Start workers
for i := 0; i < 10; i++ {
go func() {
for msg := range messages {
var tradeEvents common.TradeEvents
if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
log.Printf("Failed to parse: %v", err)
continue
}
// Process trades
for _, event := range tradeEvents.Events {
log.Printf("Pool: %s, Token A: %s, Token B: %s",
event.Trade.PoolAddress,
event.Trade.TokenAAddress,
event.Trade.TokenBAddress)
log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
event.Trade.UserAAmount,
event.Trade.UserBAmount,
event.Block.Slot)
}
}
}()
}
// Read messages
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Error reading: %v", err)
continue
}
messages <- msg
}
}