Get Started
ChainStream Products
ChainStream Data Services (CDS)
Documentation
- Market Data
- Alternative Data
- Third-party Swap Aggregation
Legal & Policies
Key Data Streams
Chainstream supports multiple structured data formats tailored for real-time streaming
Kafka Data Streams
Kafka Streams deliver real-time, high-performance blockchain data. Available formats include Protobuf and JSON, optimized for various latency and payload needs.
Protobuf streams are highly optimized, delivering minimal latency with compact binary formats and strict schema adherence.
Available proto schemas:
solana.dextrades.proto
solana.tokens.proto
solana.transactions.proto
Kafka Consumer Implementation
Implementation in Go for subscribing and processing Kafka streams efficiently.
This guide walks through the implementation of a Kafka consumer in Go to subscribe to a Kafka topic and process on-chain data streams from ChainStream in real-time. The consumer connects to Kafka brokers securely using SSL and handles incoming messages in Protobuf format.
Configuration
Create a configuration file config.yml
:
kafka:
bootstrap.servers: "rpk0.chainstream.io:9093,rpk1.chainstream.io:9093,rpk2.chainstream.io:9093"
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "<your_username_here>"
sasl.password: "<your_password_here>"
group.id: "<username_group-number>"
ssl.key.location: "ssl/client.key.pem"
ssl.certificate.location: "ssl/client.cer.pem"
ssl.ca.location: "ssl/server.cer.pem"
ssl.endpoint.identification.algorithm: "none"
enable.auto.commit: false
consumer:
topic: solana.dextrades.proto
partitioned: true
processor:
buffer: 100
workers: 8
log_level: "debug"
Consumer Implementation
Create a new file consumer.go
:
package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"golang.org/x/sync/errgroup"
"log"
"sync"
"time"
)
type Consumer interface {
waitMessages(ctx context.Context, listener Listener)
close()
}
type SimpleConsumer struct {
kafkaConsumer *kafka.Consumer
}
func newSimpleConsumer(config *Config) (*SimpleConsumer, error) {
kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
}
err = kafkaConsumer.Subscribe(config.Consumer.Topic, nil)
if err != nil {
return nil, err
}
return &SimpleConsumer{kafkaConsumer}, nil
}
func (c *SimpleConsumer) waitMessages(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
default:
msg, err := c.kafkaConsumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
listener.onMessage(msg)
}
}
}
func (c *SimpleConsumer) close() {
c.kafkaConsumer.Close()
}
type PartitionedConsumer struct {
kafkaConsumer *kafka.Consumer
wg sync.WaitGroup
}
func newPartitionedConsumer(config *Config) (*PartitionedConsumer, error) {
kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
}
metadata, err := kafkaConsumer.GetMetadata(&config.Consumer.Topic, false, 5000)
if err != nil {
return nil, err
}
topicMetadata := metadata.Topics[config.Consumer.Topic]
if topicMetadata == nil {
return nil, fmt.Errorf("topic %s not found", config.Consumer.Topic)
}
partitions := make([]int32, 0, len(topicMetadata.Partitions))
for _, partition := range topicMetadata.Partitions {
partitions = append(partitions, partition.ID)
}
err = kafkaConsumer.Assign(createTopicPartitions(config.Consumer.Topic, partitions))
if err != nil {
return nil, err
}
return &PartitionedConsumer{kafkaConsumer: kafkaConsumer}, nil
}
func createTopicPartitions(topic string, partitions []int32) []kafka.TopicPartition {
topicPartitions := make([]kafka.TopicPartition, len(partitions))
for i, partition := range partitions {
topicPartitions[i] = kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: kafka.OffsetStored,
}
}
return topicPartitions
}
func (c *PartitionedConsumer) waitMessages(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
default:
msg, err := c.kafkaConsumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
listener.onMessage(msg)
}
}
}
func (c *PartitionedConsumer) close() {
c.kafkaConsumer.Close()
}
Processor Implementation
Create a new file processor.go
:
package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/jellydator/ttlcache/v3"
"golang.org/x/sync/errgroup"
"log"
"sync/atomic"
"time"
)
type Processor struct {
queue chan (*kafka.Message)
wg errgroup.Group
config ProcessorConfig
processFn processFn
stat *Statistics
dedup *dedupCache
}
type processFn func(*kafka.Message) error
func newProcessor(config *Config) (*Processor, error) {
processor := &Processor{
queue: make(chan *kafka.Message, config.Processor.Buffer),
config: config.Processor,
stat: newStatistics(),
dedup: &dedupCache{
cache: ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](time.Second * time.Duration(240)),
),
},
}
switch config.Consumer.Topic {
case "solana.dextrades.proto":
processor.processFn = processor.dexTradesMessageHandler
case "solana.transactions.proto":
processor.processFn = processor.transactionsMessageHandler
default:
return nil, fmt.Errorf("unknown topic: %s", config.Consumer.Topic)
}
return processor, nil
}
func (p *Processor) start(ctx context.Context) {
for i := 0; i < p.config.Workers; i++ {
p.wg.Go(func() error {
return p.worker(ctx)
})
}
p.wg.Go(func() error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
p.stat.report()
}
}
})
}
func (p *Processor) worker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case msg := <-p.queue:
if err := p.processFn(msg); err != nil {
log.Printf("Error processing message: %v", err)
}
}
}
}
func (p *Processor) onMessage(msg *kafka.Message) {
p.queue <- msg
}
func (p *Processor) close() error {
close(p.queue)
return p.wg.Wait()
}
type Statistics struct {
processed uint64
duplicated uint64
errors uint64
}
func newStatistics() *Statistics {
return &Statistics{}
}
func (s *Statistics) report() {
processed := atomic.LoadUint64(&s.processed)
duplicated := atomic.LoadUint64(&s.duplicated)
errors := atomic.LoadUint64(&s.errors)
if processed > 0 || duplicated > 0 || errors > 0 {
log.Printf("Statistics - Processed: %d, Duplicated: %d, Errors: %d",
processed, duplicated, errors)
}
}
Main Application
Create the main entry point main.go
:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
)
func main() {
config := loadConfig("config.yml")
var consumer Consumer
var err error
if config.Consumer.Partitioned {
consumer, err = newPartitionedConsumer(&config)
} else {
consumer, err = newSimpleConsumer(&config)
}
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.close()
processor, err := newProcessor(&config)
if err != nil {
log.Fatalf("Failed to create processor: %v", err)
}
defer processor.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
processor.start(ctx)
consumer.waitMessages(ctx, processor)
}
Running the Application
# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials
# Build and Run
go build
./stream_protobuf_example