Kafka 数据流

Kafka 流提供实时、高性能的区块链数据。可用格式包括 Protobuf 和 JSON,针对不同的延迟和负载需求进行了优化。

Kafka 消费者实现

使用 Go 语言高效订阅和处理 Kafka 流的实现。

本指南介绍了如何在 Go 中实现 Kafka 消费者,以订阅 Kafka 主题并实时处理来自 ChainStream 的链上数据流。消费者使用 SSL 安全连接到 Kafka 代理,并处理 Protobuf 格式的传入消息。

1

配置

创建配置文件 config.yml

kafka:
  bootstrap.servers: "rpk0.chainstream.io:9093,rpk1.chainstream.io:9093,rpk2.chainstream.io:9093"
  security.protocol: "SASL_SSL"
  sasl.mechanism: "SCRAM-SHA-512"
  sasl.username: "<your_username_here>"
  sasl.password: "<your_password_here>"
  group.id: "<username_group-number>"
  ssl.key.location: "ssl/client.key.pem"
  ssl.certificate.location: "ssl/client.cer.pem"
  ssl.ca.location: "ssl/server.cer.pem"
  ssl.endpoint.identification.algorithm: "none"
  enable.auto.commit: false

consumer:
  topic: solana.dextrades.proto
  partitioned: true

processor:
  buffer: 100
  workers: 8

log_level: "debug"
2

消费者实现

创建新文件 consumer.go

package main

import (
        "context"
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "golang.org/x/sync/errgroup"
        "log"
        "sync"
        "time"
)

type Consumer interface {
        waitMessages(ctx context.Context, listener Listener)
        close()
}

type SimpleConsumer struct {
        kafkaConsumer *kafka.Consumer
}

func newSimpleConsumer(config *Config) (*SimpleConsumer, error) {
        kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
        if err != nil {
                return nil, err
        }

        err = kafkaConsumer.Subscribe(config.Consumer.Topic, nil)
        if err != nil {
                return nil, err
        }

        return &SimpleConsumer{kafkaConsumer}, nil
}

func (c *SimpleConsumer) waitMessages(ctx context.Context, listener Listener) {
        for {
                select {
                case <-ctx.Done():
                        return
                default:
                        msg, err := c.kafkaConsumer.ReadMessage(-1)
                        if err != nil {
                                log.Printf("Consumer error: %v (%v)\n", err, msg)
                                continue
                        }
                        listener.onMessage(msg)
                }
        }
}

func (c *SimpleConsumer) close() {
        c.kafkaConsumer.Close()
}

type PartitionedConsumer struct {
        kafkaConsumer *kafka.Consumer
        wg           sync.WaitGroup
}

func newPartitionedConsumer(config *Config) (*PartitionedConsumer, error) {
        kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
        if err != nil {
                return nil, err
        }

        metadata, err := kafkaConsumer.GetMetadata(&config.Consumer.Topic, false, 5000)
        if err != nil {
                return nil, err
        }

        topicMetadata := metadata.Topics[config.Consumer.Topic]
        if topicMetadata == nil {
                return nil, fmt.Errorf("topic %s not found", config.Consumer.Topic)
        }

        partitions := make([]int32, 0, len(topicMetadata.Partitions))
        for _, partition := range topicMetadata.Partitions {
                partitions = append(partitions, partition.ID)
        }

        err = kafkaConsumer.Assign(createTopicPartitions(config.Consumer.Topic, partitions))
        if err != nil {
                return nil, err
        }

        return &PartitionedConsumer{kafkaConsumer: kafkaConsumer}, nil
}

func createTopicPartitions(topic string, partitions []int32) []kafka.TopicPartition {
        topicPartitions := make([]kafka.TopicPartition, len(partitions))
        for i, partition := range partitions {
                topicPartitions[i] = kafka.TopicPartition{
                        Topic:     &topic,
                        Partition: partition,
                        Offset:    kafka.OffsetStored,
                }
        }
        return topicPartitions
}

func (c *PartitionedConsumer) waitMessages(ctx context.Context, listener Listener) {
        for {
                select {
                case <-ctx.Done():
                        return
                default:
                        msg, err := c.kafkaConsumer.ReadMessage(-1)
                        if err != nil {
                                log.Printf("Consumer error: %v (%v)\n", err, msg)
                                continue
                        }
                        listener.onMessage(msg)
                }
        }
}

func (c *PartitionedConsumer) close() {
        c.kafkaConsumer.Close()
}
3

处理器实现

创建新文件 processor.go

package main

import (
        "context"
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "github.com/jellydator/ttlcache/v3"
        "golang.org/x/sync/errgroup"
        "log"
        "sync/atomic"
        "time"
)

type Processor struct {
        queue     chan (*kafka.Message)
        wg        errgroup.Group
        config    ProcessorConfig
        processFn processFn
        stat      *Statistics
        dedup     *dedupCache
}

type processFn func(*kafka.Message) error

func newProcessor(config *Config) (*Processor, error) {
        processor := &Processor{
                queue:  make(chan *kafka.Message, config.Processor.Buffer),
                config: config.Processor,
                stat:   newStatistics(),
                dedup: &dedupCache{
                        cache: ttlcache.New[string, bool](
                                ttlcache.WithTTL[string, bool](time.Second * time.Duration(240)),
                        ),
                },
        }

        switch config.Consumer.Topic {
        case "solana.dextrades.proto":
                processor.processFn = processor.dexTradesMessageHandler
        case "solana.transactions.proto":
                processor.processFn = processor.transactionsMessageHandler
        default:
                return nil, fmt.Errorf("unknown topic: %s", config.Consumer.Topic)
        }

        return processor, nil
}

func (p *Processor) start(ctx context.Context) {
        for i := 0; i < p.config.Workers; i++ {
                p.wg.Go(func() error {
                        return p.worker(ctx)
                })
        }

        p.wg.Go(func() error {
                ticker := time.NewTicker(time.Second)
                defer ticker.Stop()

                for {
                        select {
                        case <-ctx.Done():
                                return nil
                        case <-ticker.C:
                                p.stat.report()
                        }
                }
        })
}

func (p *Processor) worker(ctx context.Context) error {
        for {
                select {
                case <-ctx.Done():
                        return nil
                case msg := <-p.queue:
                        if err := p.processFn(msg); err != nil {
                                log.Printf("Error processing message: %v", err)
                        }
                }
        }
}

func (p *Processor) onMessage(msg *kafka.Message) {
        p.queue <- msg
}

func (p *Processor) close() error {
        close(p.queue)
        return p.wg.Wait()
}

type Statistics struct {
        processed uint64
        duplicated uint64
        errors uint64
}

func newStatistics() *Statistics {
        return &Statistics{}
}

func (s *Statistics) report() {
        processed := atomic.LoadUint64(&s.processed)
        duplicated := atomic.LoadUint64(&s.duplicated)
        errors := atomic.LoadUint64(&s.errors)

        if processed > 0 || duplicated > 0 || errors > 0 {
                log.Printf("Statistics - Processed: %d, Duplicated: %d, Errors: %d",
                        processed, duplicated, errors)
        }
}
4

Main Application

Create the main entry point main.go:

package main

import (
        "context"
        "log"
        "os"
        "os/signal"
        "syscall"
)

func main() {
        config := loadConfig("config.yml")

        var consumer Consumer
        var err error
        if config.Consumer.Partitioned {
                consumer, err = newPartitionedConsumer(&config)
        } else {
                consumer, err = newSimpleConsumer(&config)
        }
        if err != nil {
                log.Fatalf("Failed to create consumer: %v", err)
        }
        defer consumer.close()

        processor, err := newProcessor(&config)
        if err != nil {
                log.Fatalf("Failed to create processor: %v", err)
        }
        defer processor.close()

        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

        go func() {
                <-sigChan
                cancel()
        }()

        processor.start(ctx)
        consumer.waitMessages(ctx, processor)
}
5

Running the Application

# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials

# Build and Run
go build
./stream_protobuf_example