关键数据
关键数据流
Chainstream 支持多种针对实时流式传输优化的结构化数据格式
Kafka 数据流
Kafka 流提供实时、高性能的区块链数据。可用格式包括 Protobuf 和 JSON,针对不同的延迟和负载需求进行了优化。
Protobuf 流经过高度优化,通过紧凑的二进制格式和严格的模式遵循,实现最小延迟。
可用的 proto 模式:
solana.dextrades.proto
solana.tokens.proto
solana.transactions.proto
Kafka 消费者实现
使用 Go 语言高效订阅和处理 Kafka 流的实现。
本指南介绍了如何在 Go 中实现 Kafka 消费者,以订阅 Kafka 主题并实时处理来自 ChainStream 的链上数据流。消费者使用 SSL 安全连接到 Kafka 代理,并处理 Protobuf 格式的传入消息。
1
配置
创建配置文件 config.yml
:
kafka:
bootstrap.servers: "rpk0.chainstream.io:9093,rpk1.chainstream.io:9093,rpk2.chainstream.io:9093"
security.protocol: "SASL_SSL"
sasl.mechanism: "SCRAM-SHA-512"
sasl.username: "<your_username_here>"
sasl.password: "<your_password_here>"
group.id: "<username_group-number>"
ssl.key.location: "ssl/client.key.pem"
ssl.certificate.location: "ssl/client.cer.pem"
ssl.ca.location: "ssl/server.cer.pem"
ssl.endpoint.identification.algorithm: "none"
enable.auto.commit: false
consumer:
topic: solana.dextrades.proto
partitioned: true
processor:
buffer: 100
workers: 8
log_level: "debug"
2
消费者实现
创建新文件 consumer.go
:
package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"golang.org/x/sync/errgroup"
"log"
"sync"
"time"
)
type Consumer interface {
waitMessages(ctx context.Context, listener Listener)
close()
}
type SimpleConsumer struct {
kafkaConsumer *kafka.Consumer
}
func newSimpleConsumer(config *Config) (*SimpleConsumer, error) {
kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
}
err = kafkaConsumer.Subscribe(config.Consumer.Topic, nil)
if err != nil {
return nil, err
}
return &SimpleConsumer{kafkaConsumer}, nil
}
func (c *SimpleConsumer) waitMessages(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
default:
msg, err := c.kafkaConsumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
listener.onMessage(msg)
}
}
}
func (c *SimpleConsumer) close() {
c.kafkaConsumer.Close()
}
type PartitionedConsumer struct {
kafkaConsumer *kafka.Consumer
wg sync.WaitGroup
}
func newPartitionedConsumer(config *Config) (*PartitionedConsumer, error) {
kafkaConsumer, err := kafka.NewConsumer(&config.Kafka)
if err != nil {
return nil, err
}
metadata, err := kafkaConsumer.GetMetadata(&config.Consumer.Topic, false, 5000)
if err != nil {
return nil, err
}
topicMetadata := metadata.Topics[config.Consumer.Topic]
if topicMetadata == nil {
return nil, fmt.Errorf("topic %s not found", config.Consumer.Topic)
}
partitions := make([]int32, 0, len(topicMetadata.Partitions))
for _, partition := range topicMetadata.Partitions {
partitions = append(partitions, partition.ID)
}
err = kafkaConsumer.Assign(createTopicPartitions(config.Consumer.Topic, partitions))
if err != nil {
return nil, err
}
return &PartitionedConsumer{kafkaConsumer: kafkaConsumer}, nil
}
func createTopicPartitions(topic string, partitions []int32) []kafka.TopicPartition {
topicPartitions := make([]kafka.TopicPartition, len(partitions))
for i, partition := range partitions {
topicPartitions[i] = kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: kafka.OffsetStored,
}
}
return topicPartitions
}
func (c *PartitionedConsumer) waitMessages(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
default:
msg, err := c.kafkaConsumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v (%v)\n", err, msg)
continue
}
listener.onMessage(msg)
}
}
}
func (c *PartitionedConsumer) close() {
c.kafkaConsumer.Close()
}
3
处理器实现
创建新文件 processor.go
:
package main
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/jellydator/ttlcache/v3"
"golang.org/x/sync/errgroup"
"log"
"sync/atomic"
"time"
)
type Processor struct {
queue chan (*kafka.Message)
wg errgroup.Group
config ProcessorConfig
processFn processFn
stat *Statistics
dedup *dedupCache
}
type processFn func(*kafka.Message) error
func newProcessor(config *Config) (*Processor, error) {
processor := &Processor{
queue: make(chan *kafka.Message, config.Processor.Buffer),
config: config.Processor,
stat: newStatistics(),
dedup: &dedupCache{
cache: ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](time.Second * time.Duration(240)),
),
},
}
switch config.Consumer.Topic {
case "solana.dextrades.proto":
processor.processFn = processor.dexTradesMessageHandler
case "solana.transactions.proto":
processor.processFn = processor.transactionsMessageHandler
default:
return nil, fmt.Errorf("unknown topic: %s", config.Consumer.Topic)
}
return processor, nil
}
func (p *Processor) start(ctx context.Context) {
for i := 0; i < p.config.Workers; i++ {
p.wg.Go(func() error {
return p.worker(ctx)
})
}
p.wg.Go(func() error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
p.stat.report()
}
}
})
}
func (p *Processor) worker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case msg := <-p.queue:
if err := p.processFn(msg); err != nil {
log.Printf("Error processing message: %v", err)
}
}
}
}
func (p *Processor) onMessage(msg *kafka.Message) {
p.queue <- msg
}
func (p *Processor) close() error {
close(p.queue)
return p.wg.Wait()
}
type Statistics struct {
processed uint64
duplicated uint64
errors uint64
}
func newStatistics() *Statistics {
return &Statistics{}
}
func (s *Statistics) report() {
processed := atomic.LoadUint64(&s.processed)
duplicated := atomic.LoadUint64(&s.duplicated)
errors := atomic.LoadUint64(&s.errors)
if processed > 0 || duplicated > 0 || errors > 0 {
log.Printf("Statistics - Processed: %d, Duplicated: %d, Errors: %d",
processed, duplicated, errors)
}
}
4
Main Application
Create the main entry point main.go
:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
)
func main() {
config := loadConfig("config.yml")
var consumer Consumer
var err error
if config.Consumer.Partitioned {
consumer, err = newPartitionedConsumer(&config)
} else {
consumer, err = newSimpleConsumer(&config)
}
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.close()
processor, err := newProcessor(&config)
if err != nil {
log.Fatalf("Failed to create processor: %v", err)
}
defer processor.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
processor.start(ctx)
consumer.waitMessages(ctx, processor)
}
5
Running the Application
# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials
# Build and Run
go build
./stream_protobuf_example
此页面对您有帮助吗?