跳转到主要内容

Kafka 数据流

Kafka 流提供实时、高性能的区块链数据。可用格式包括 Protobuf 和 JSON,针对不同的延迟和负载需求进行了优化。 该模式可在此处获得。 完整代码可在此处获取。

Kafka 消费者实现

使用 Go 语言高效订阅和处理 Kafka 流的实现。 本指南介绍了如何在 Go 中实现 Kafka 消费者,以订阅 Kafka 主题并实时处理来自 ChainStream 的链上数据流。消费者使用 SSL 安全连接到 Kafka 代理,并处理 Protobuf 格式的传入消息。
1

配置

config.yml
  bootstrap_servers: "kafka.chainstream.io:9099"
  username: "<your_username_here>"
  password: "<your_password_here>"
  group.id: "<username_group-number>"

  topics:
    - "eth.v1.transfers.proto"
    - "bsc.v1.transfers.proto"
  ssl:
    ca_cert: "server-ca-bundle.pem"
    client_cert: "client.crt"
    client_key: "client.key"
2

消费者实现

创建新文件 consumer.go
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "io/ioutil"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "gopkg.in/yaml.v3"
)

type Config struct {
    BootstrapServers string   `yaml:"bootstrap_servers"`
    Username         string   `yaml:"username"`
    Password         string   `yaml:"password"`
    GroupID          string   `yaml:"group_id"`
    Topics           []string `yaml:"topics"`
    SSL              struct {
            CACert     string `yaml:"ca_cert"`
            ClientCert string `yaml:"client_cert"`
            ClientKey  string `yaml:"client_key"`
    } `yaml:"ssl"`
}

func LoadConfig(path string) (*Config, error) {
    data, err := ioutil.ReadFile(path)
    if err != nil {
            return nil, err
    }
    var cfg Config
    if err := yaml.Unmarshal(data, &cfg); err != nil {
            return nil, err
    }
    return &cfg, nil
}

func NewConsumer(cfg *Config) (*kafka.Consumer, error) {
    return kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":                   cfg.BootstrapServers,
            "group.id":                            cfg.GroupID,
            "auto.offset.reset":                   "earliest",
            "security.protocol":                   "SASL_SSL",
            "sasl.mechanisms":                     "SCRAM-SHA-512",
            "sasl.username":                       cfg.Username,
            "sasl.password":                       cfg.Password,
            "ssl.ca.location":                     cfg.SSL.CACert,
            "ssl.certificate.location":            cfg.SSL.ClientCert,
            "ssl.key.location":                    cfg.SSL.ClientKey,
            "enable.ssl.certificate.verification": true,
    })
}

func RunConsumer(c *kafka.Consumer, topics []string) {
    if err := c.SubscribeTopics(topics, nil); err != nil {
            fmt.Fprintf(os.Stderr, "Failed to subscribe topics: %v\n", err)
            return
    }

    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    run := true
    for run {
            select {
            case sig := <-sigchan:
                    fmt.Printf("Caught signal %v: terminating\n", sig)
                    run = false
            default:
                    ev := c.Poll(100)
                    switch e := ev.(type) {
                    case *kafka.Message:
                            topic := *e.TopicPartition.Topic
                            ProcessMessage(topic, e.Value)
                    case kafka.Error:
                            fmt.Fprintf(os.Stderr, "Error: %v\n", e)
                    }
            }
    }
    c.Close()
}
3

处理器实现

创建新文件 processor.go
package main

import (
    evm "chainstream/evm"
    "fmt"

    "google.golang.org/protobuf/proto"
)

func ProcessMessage(topic string, msg []byte) {
    fmt.Printf("\n[DEBUG] Got message from topic=%s, size=%d bytes\n", topic, len(msg))

    switch topic {
    case "eth.v1.transfers.proto":
            processEvmTransfers(msg, "eth")
    case "bsc.v1.transfers.proto":
            processEvmTransfers(msg, "bsc")
    default:
            fmt.Printf("[WARN] Unknown topic %s, raw head=%x\n", topic, msg[:min(32, len(msg))])
    }
}

func processEvmTransfers(msg []byte, chain string) {
    var transfers evm.TransfersMessage
    if err := proto.Unmarshal(msg, &transfers); err != nil {
            fmt.Println("[ERROR] Failed to decode EVM TransfersMessage:", err)
            return
    }

    for _, t := range transfers.Transfers {
            fmt.Printf("Chain %s Tx %s: %s -> %s, Amount=%s %s (Success=%v)\n",
                    chain,
                    t.TransactionHeader.Hash,
                    t.Sender,
                    t.Receiver,
                    t.Amount,
                    t.Currency.Symbol,
                    t.Success,
            )
    }
}
4

主应用程序

创建新文件 main.go:
package main

import "log"

func main() {
    cfg, err := LoadConfig("config.yml")
    if err != nil {
            log.Fatalf("Failed to load config: %v", err)
    }

    consumer, err := NewConsumer(cfg)
    if err != nil {
            log.Fatalf("Failed to create consumer: %v", err)
    }

    RunConsumer(consumer, cfg.Topics)
}

5

运行程序

# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials and topics list

# Example config.yml
bootstrap_servers: "kafka.chainstream.io:9099"
username: "app_xxx"
password: "xxx"
group_id: "app_xxx-consumer-1"

topics:
  - "eth.v1.transfers.proto"
  - "bsc.v1.transfers.proto"
  - "polygon.v1.transfers.proto"
  - "tron.v1.transfers.proto"

ssl:
  ca_cert: "server-ca-bundle.pem"
  client_cert: "client.crt"
  client_key: "client.key"

# Build and Run
go build -o stream_consumer
./stream_consumer