Kafka Data Streams

Kafka Streams deliver real-time, high-performance blockchain data. Available formats include Protobuf and JSON, optimized for various latency and payload needs.

Kafka Consumer Implementation

Implementation in Go for subscribing and processing Kafka streams efficiently.

This guide walks through the implementation of a Kafka consumer in Go to subscribe to a Kafka topic and process on-chain data streams from ChainStream in real-time. The consumer connects to Kafka brokers securely using SSL and handles incoming messages in Protobuf format.

1

Configuration

Create a configuration file config.yml:

kafka:
  bootstrap.servers: "rpk0.chainstream.io:9093,rpk1.chainstream.io:9093,rpk2.chainstream.io:9093"
  security.protocol: "SASL_SSL"
  sasl.mechanism: "SCRAM-SHA-512"
  sasl.username: "<your_username_here>"
  sasl.password: "<your_password_here>"
  group.id: "<username_group-number>"
  ssl.key.location: "ssl/client.key.pem"
  ssl.certificate.location: "ssl/client.cer.pem"
  ssl.ca.location: "ssl/server.cer.pem"
  ssl.endpoint.identification.algorithm: "none"
  enable.auto.commit: false

consumer:
  topic: solana.dextrades.proto
  partitioned: true

processor:
  buffer: 100
  workers: 8

log_level: "debug"
2

Consumer Implementation

Create a new file consumer.go:

package main

import (
        "context"
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "golang.org/x/sync/errgroup"
        "log"
        "sync"
        "time"
)

type Consumer interface {
        waitMessages(ctx context.Context, listener Listener)
        close()
}

type SimpleConsumer struct {
        kafkaConsumer *kafka.Consumer
}

func newSimpleConsumer(config *Config) (*SimpleConsumer, error) {
        kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
        if err != nil {
                return nil, err
        }

        err = kafkaConsumer.Subscribe(config.Consumer.Topic, nil)
        if err != nil {
                return nil, err
        }

        return &SimpleConsumer{kafkaConsumer}, nil
}

func (c *SimpleConsumer) waitMessages(ctx context.Context, listener Listener) {
        for {
                select {
                case <-ctx.Done():
                        return
                default:
                        msg, err := c.kafkaConsumer.ReadMessage(-1)
                        if err != nil {
                                log.Printf("Consumer error: %v (%v)\n", err, msg)
                                continue
                        }
                        listener.onMessage(msg)
                }
        }
}

func (c *SimpleConsumer) close() {
        c.kafkaConsumer.Close()
}

type PartitionedConsumer struct {
        kafkaConsumer *kafka.Consumer
        wg           sync.WaitGroup
}

func newPartitionedConsumer(config *Config) (*PartitionedConsumer, error) {
        kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
        if err != nil {
                return nil, err
        }

        metadata, err := kafkaConsumer.GetMetadata(&config.Consumer.Topic, false, 5000)
        if err != nil {
                return nil, err
        }

        topicMetadata := metadata.Topics[config.Consumer.Topic]
        if topicMetadata == nil {
                return nil, fmt.Errorf("topic %s not found", config.Consumer.Topic)
        }

        partitions := make([]int32, 0, len(topicMetadata.Partitions))
        for _, partition := range topicMetadata.Partitions {
                partitions = append(partitions, partition.ID)
        }

        err = kafkaConsumer.Assign(createTopicPartitions(config.Consumer.Topic, partitions))
        if err != nil {
                return nil, err
        }

        return &PartitionedConsumer{kafkaConsumer: kafkaConsumer}, nil
}

func createTopicPartitions(topic string, partitions []int32) []kafka.TopicPartition {
        topicPartitions := make([]kafka.TopicPartition, len(partitions))
        for i, partition := range partitions {
                topicPartitions[i] = kafka.TopicPartition{
                        Topic:     &topic,
                        Partition: partition,
                        Offset:    kafka.OffsetStored,
                }
        }
        return topicPartitions
}

func (c *PartitionedConsumer) waitMessages(ctx context.Context, listener Listener) {
        for {
                select {
                case <-ctx.Done():
                        return
                default:
                        msg, err := c.kafkaConsumer.ReadMessage(-1)
                        if err != nil {
                                log.Printf("Consumer error: %v (%v)\n", err, msg)
                                continue
                        }
                        listener.onMessage(msg)
                }
        }
}

func (c *PartitionedConsumer) close() {
        c.kafkaConsumer.Close()
}
3

Processor Implementation

Create a new file processor.go:

package main

import (
        "context"
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "github.com/jellydator/ttlcache/v3"
        "golang.org/x/sync/errgroup"
        "log"
        "sync/atomic"
        "time"
)

type Processor struct {
        queue     chan (*kafka.Message)
        wg        errgroup.Group
        config    ProcessorConfig
        processFn processFn
        stat      *Statistics
        dedup     *dedupCache
}

type processFn func(*kafka.Message) error

func newProcessor(config *Config) (*Processor, error) {
        processor := &Processor{
                queue:  make(chan *kafka.Message, config.Processor.Buffer),
                config: config.Processor,
                stat:   newStatistics(),
                dedup: &dedupCache{
                        cache: ttlcache.New[string, bool](
                                ttlcache.WithTTL[string, bool](time.Second * time.Duration(240)),
                        ),
                },
        }

        switch config.Consumer.Topic {
        case "solana.dextrades.proto":
                processor.processFn = processor.dexTradesMessageHandler
        case "solana.transactions.proto":
                processor.processFn = processor.transactionsMessageHandler
        default:
                return nil, fmt.Errorf("unknown topic: %s", config.Consumer.Topic)
        }

        return processor, nil
}

func (p *Processor) start(ctx context.Context) {
        for i := 0; i < p.config.Workers; i++ {
                p.wg.Go(func() error {
                        return p.worker(ctx)
                })
        }

        p.wg.Go(func() error {
                ticker := time.NewTicker(time.Second)
                defer ticker.Stop()

                for {
                        select {
                        case <-ctx.Done():
                                return nil
                        case <-ticker.C:
                                p.stat.report()
                        }
                }
        })
}

func (p *Processor) worker(ctx context.Context) error {
        for {
                select {
                case <-ctx.Done():
                        return nil
                case msg := <-p.queue:
                        if err := p.processFn(msg); err != nil {
                                log.Printf("Error processing message: %v", err)
                        }
                }
        }
}

func (p *Processor) onMessage(msg *kafka.Message) {
        p.queue <- msg
}

func (p *Processor) close() error {
        close(p.queue)
        return p.wg.Wait()
}

type Statistics struct {
        processed uint64
        duplicated uint64
        errors uint64
}

func newStatistics() *Statistics {
        return &Statistics{}
}

func (s *Statistics) report() {
        processed := atomic.LoadUint64(&s.processed)
        duplicated := atomic.LoadUint64(&s.duplicated)
        errors := atomic.LoadUint64(&s.errors)

        if processed > 0 || duplicated > 0 || errors > 0 {
                log.Printf("Statistics - Processed: %d, Duplicated: %d, Errors: %d",
                        processed, duplicated, errors)
        }
}
4

Main Application

Create the main entry point main.go:

package main

import (
        "context"
        "log"
        "os"
        "os/signal"
        "syscall"
)

func main() {
        config := loadConfig("config.yml")

        var consumer Consumer
        var err error
        if config.Consumer.Partitioned {
                consumer, err = newPartitionedConsumer(&config)
        } else {
                consumer, err = newSimpleConsumer(&config)
        }
        if err != nil {
                log.Fatalf("Failed to create consumer: %v", err)
        }
        defer consumer.close()

        processor, err := newProcessor(&config)
        if err != nil {
                log.Fatalf("Failed to create processor: %v", err)
        }
        defer processor.close()

        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

        go func() {
                <-sigChan
                cancel()
        }()

        processor.start(ctx)
        consumer.waitMessages(ctx, processor)
}
5

Running the Application

# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials

# Build and Run
go build
./stream_protobuf_example