Skip to main content
Solana targets 400ms block time, with actual throughput of ~4,000 TPS and theoretical peak of 65,000 TPS. The extremely high message volume places high demands on consumer processing capacity. Schema Repository: github.com/chainstream-io/streaming_protobuf/solana

Message Types Overview

Solana Streams provides the following message types:
Message TypeDescriptionTopic
TradeEventsDEX trade eventssol.dex.trades
TokenEventsToken eventssol.tokens
BalanceEventsBalance change eventssol.balances
DexPoolEventsLiquidity pool eventssol.dex.pools
TransferEventsTransfer eventssol.transfers
CandlestickEventsCandlestick datasol.candlesticks

Block-Level Data

Solana uses Slot rather than traditional block number as the timeline identifier.
FieldTypeDescription
Slotuint64Slot number (primary identifier)
BlockHeightuint64Block height
BlockHashbytesBlock hash
ParentSlotuint64Parent slot
TimestampTimestampBlock time
In Solana’s shred stream, Block Header may be incomplete, only the Slot field is guaranteed correct.

Transaction-Level Data

Core Transaction Fields

FieldTypeDescription
SignaturebytesTransaction signature (unique identifier)
StatusStatusExecution status
HeaderTransactionHeaderMetadata (including fees and signers)
Indexuint32Position within block

Instructions

Transactions contain multiple Instructions, which is the core of Solana’s execution model:
FieldDescription
ProgramAccountIndexCalled program
DataEncoded instruction data
AccountIndexesReferenced accounts
BalanceUpdatesSOL balance changes caused by this instruction
A Solana transaction can contain multiple Instructions, each calling different programs.

Transfer Data

TransferEvents provides Solana transfer information (Topic: sol.transfers).

TransferEvent Structure

message TransferEvent {
  Instruction instruction = 1;
  Block block = 2;
  Transaction transaction = 3;
  DApp d_app = 4;
  Transfer transfer = 100;
  TransferProcessed transfer_processed = 200;  // included in processed topic
}

Transfer Core Fields

FieldTypeDescription
sender_token_account_addressstringSender token account address
sender_account_owner_addressstringSender account owner address
receiver_token_account_addressstringReceiver token account address
receiver_account_owner_addressstringReceiver account owner address
token_addressstringToken address (Mint)
token_amountstringTransfer amount

Token Metadata (TokenEvent)

Token events use common TokenEvents (Topic: sol.tokens), containing:
FieldDescription
addressToken address
name / symbolName and symbol
decimalsDecimals
uriMetadata URI
metadata_addressMetadata address
creatorsCreator list (address, verified status, share)

SolanaExtra Specific Fields

Solana tokens include additional metadata fields:
FieldDescription
collection_addressCollection address (NFT)
fungibleIs fungible
is_mutableIs mutable
is_nativeIs native SOL
program_addressProgram address
seller_fee_basis_pointsCreator royalty basis points (NFT)
token_standardToken standard
mint_authority / freeze_authority / update_authorityAuthority accounts
is_verified_collectionIs verified collection

Balance Updates Layered Mechanism (Solana-specific)

This is an important characteristic of Solana stream data, balance updates are provided at two levels:
Immediate balance changes after each instruction execution, reflecting the direct impact of single-step operations.
Transaction
└── Instruction 1
    └── BalanceUpdate: +100 SOL
└── Instruction 2
    └── BalanceUpdate: -50 SOL
└── Instruction 3
    └── BalanceUpdate: +25 SOL
This enables developers to track both fine-grained fund flows and final states.

DEX Data

TradeEvents provides DEX trade data (Topic: sol.dex.trades), using common TradeEvent structure.

Trade Core Fields

FieldDescription
token_a_address / token_b_addressTrading pair token addresses
user_a_amount / user_b_amountUser trade amounts
pool_addressPool address
vault_a / vault_bPool vault addresses
vault_a_amount / vault_b_amountVault amounts
bonding_curveBonding curve info (if applicable)

DApp Info

FieldDescription
program_addressDEX program address (e.g., Raydium, Orca)
inner_program_addressInner program address
chainChain identifier (CHAIN_SOLANA)

DexPoolEvent - Liquidity Pools

FieldDescription
typeEvent type (INITIALIZE/INCREASE_LIQUIDITY/DECREASE_LIQUIDITY/SWAP)
addressPool address
token_a_address / token_b_addressToken addresses
token_a_amount / token_b_amountToken amounts
lp_walletLP wallet address

Solana Chain Characteristics

Slot Mechanism

Solana uses Slot rather than traditional block number as timeline identifier:
CharacteristicDescription
Block interval~400ms
Message volumeMuch higher than EVM chains
Primary identifierSlot number

Message Packaging

Transactions are packaged in small batches, each Kafka message contains no more than 250 transactions.

Block Header Completeness

In Solana’s shred stream, Block Header may be incomplete, only the Slot field is guaranteed correct. Other fields may be empty or inaccurate.

High-Throughput Processing Recommendations

Due to Solana’s extremely high throughput, we recommend:
  1. Sufficient processing capacity: Ensure consumers have adequate CPU and memory
  2. Parallel processing: Use multi-threading/coroutines for parallel message processing
  3. Efficient parsing: Optimize Protobuf parsing code
  4. Batch writes: Use batch database writes if persistence is needed

Topic → Message Type Mapping

TopicProto FileMessage TypeDescription
sol.dex.tradestrade_event.protoTradeEventsDEX trade events
sol.dex.trades.processedtrade_event.protoTradeEventsWith USD price, suspicious flag
sol.tokenstoken_event.protoTokenEventsToken events
sol.tokens.processedtoken_event.protoTokenEventsWith description, image, social links
sol.balancesbalance_event.protoBalanceEventsBalance change events
sol.transferssolana/transfer_event.protoTransferEventsSolana transfer events
sol.transfers.processedsolana/transfer_processed_event.protoTransferProcessedEventsWith USD value
sol.dex.poolsdex_pool_event.protoDexPoolEventsLiquidity pool events
sol.candlestickscandlestick.protoCandlestickEventsCandlestick data

Code Examples

Python Example: Consume Solana DEX Trades

from kafka import KafkaConsumer
from common import trade_event_pb2  # Get from streaming_protobuf repository

# Create consumer
consumer = KafkaConsumer(
    'sol.dex.trades',
    bootstrap_servers=['<your_broker_address>'],
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='your_username',
    sasl_plain_password='your_password',
    auto_offset_reset='latest',
    enable_auto_commit=False,
    group_id='my-solana-consumer'
)

# Consume and parse messages
for message in consumer:
    trade_events = trade_event_pb2.TradeEvents()
    trade_events.ParseFromString(message.value)
    
    for event in trade_events.events:
        print(f"Pool: {event.trade.pool_address}")
        print(f"Token A: {event.trade.token_a_address}")
        print(f"Token B: {event.trade.token_b_address}")
        print(f"Amount A: {event.trade.user_a_amount}")
        print(f"Amount B: {event.trade.user_b_amount}")
        print(f"Slot: {event.block.slot}")
        print(f"DEX Program: {event.d_app.program_address}")
        print("---")

Go Example: High-Performance Consumption

package main

import (
    "context"
    "log"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/scram"
    "google.golang.org/protobuf/proto"
    
    common "github.com/chainstream-io/streaming_protobuf/common/messages"
)

func main() {
    mechanism, _ := scram.Mechanism(scram.SHA512, "your_username", "your_password")
    
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"<your_broker_address>"},
        Topic:   "sol.dex.trades",
        GroupID: "my-solana-consumer",
        Dialer: &kafka.Dialer{
            SASLMechanism: mechanism,
            TLS:           &tls.Config{},
        },
    })
    defer reader.Close()

    // Use worker pool for parallel processing
    messages := make(chan kafka.Message, 100)
    
    // Start workers
    for i := 0; i < 10; i++ {
        go func() {
            for msg := range messages {
                var tradeEvents common.TradeEvents
                if err := proto.Unmarshal(msg.Value, &tradeEvents); err != nil {
                    log.Printf("Failed to parse: %v", err)
                    continue
                }
                
                // Process trades
                for _, event := range tradeEvents.Events {
                    log.Printf("Pool: %s, Token A: %s, Token B: %s",
                        event.Trade.PoolAddress,
                        event.Trade.TokenAAddress,
                        event.Trade.TokenBAddress)
                    log.Printf("Amount A: %s, Amount B: %s, Slot: %d",
                        event.Trade.UserAAmount,
                        event.Trade.UserBAmount,
                        event.Block.Slot)
                }
            }
        }()
    }

    // Read messages
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("Error reading: %v", err)
            continue
        }
        messages <- msg
    }
}