Kafka 数据流
Kafka 流提供实时、高性能的区块链数据。可用格式包括 Protobuf 和 JSON,针对不同的延迟和负载需求进行了优化。 该模式可在此处获得。 完整代码可在此处获取。Kafka 消费者实现
使用 Go 语言高效订阅和处理 Kafka 流的实现。 本指南介绍了如何在 Go 中实现 Kafka 消费者,以订阅 Kafka 主题并实时处理来自 ChainStream 的链上数据流。消费者使用 SSL 安全连接到 Kafka 代理,并处理 Protobuf 格式的传入消息。1
配置
config.yml:复制
询问AI
bootstrap_servers: "kafka.chainstream.io:9099"
username: "<your_username_here>"
password: "<your_password_here>"
group.id: "<username_group-number>"
topics:
- "eth.v1.transfers.proto"
- "bsc.v1.transfers.proto"
ssl:
ca_cert: "server-ca-bundle.pem"
client_cert: "client.crt"
client_key: "client.key"
2
消费者实现
创建新文件
consumer.go:复制
询问AI
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"io/ioutil"
"github.com/confluentinc/confluent-kafka-go/kafka"
"gopkg.in/yaml.v3"
)
type Config struct {
BootstrapServers string `yaml:"bootstrap_servers"`
Username string `yaml:"username"`
Password string `yaml:"password"`
GroupID string `yaml:"group_id"`
Topics []string `yaml:"topics"`
SSL struct {
CACert string `yaml:"ca_cert"`
ClientCert string `yaml:"client_cert"`
ClientKey string `yaml:"client_key"`
} `yaml:"ssl"`
}
func LoadConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
func NewConsumer(cfg *Config) (*kafka.Consumer, error) {
return kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": cfg.BootstrapServers,
"group.id": cfg.GroupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-512",
"sasl.username": cfg.Username,
"sasl.password": cfg.Password,
"ssl.ca.location": cfg.SSL.CACert,
"ssl.certificate.location": cfg.SSL.ClientCert,
"ssl.key.location": cfg.SSL.ClientKey,
"enable.ssl.certificate.verification": true,
})
}
func RunConsumer(c *kafka.Consumer, topics []string) {
if err := c.SubscribeTopics(topics, nil); err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe topics: %v\n", err)
return
}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
switch e := ev.(type) {
case *kafka.Message:
topic := *e.TopicPartition.Topic
ProcessMessage(topic, e.Value)
case kafka.Error:
fmt.Fprintf(os.Stderr, "Error: %v\n", e)
}
}
}
c.Close()
}
3
处理器实现
创建新文件
processor.go:复制
询问AI
package main
import (
evm "chainstream/evm"
"fmt"
"google.golang.org/protobuf/proto"
)
func ProcessMessage(topic string, msg []byte) {
fmt.Printf("\n[DEBUG] Got message from topic=%s, size=%d bytes\n", topic, len(msg))
switch topic {
case "eth.v1.transfers.proto":
processEvmTransfers(msg, "eth")
case "bsc.v1.transfers.proto":
processEvmTransfers(msg, "bsc")
default:
fmt.Printf("[WARN] Unknown topic %s, raw head=%x\n", topic, msg[:min(32, len(msg))])
}
}
func processEvmTransfers(msg []byte, chain string) {
var transfers evm.TransfersMessage
if err := proto.Unmarshal(msg, &transfers); err != nil {
fmt.Println("[ERROR] Failed to decode EVM TransfersMessage:", err)
return
}
for _, t := range transfers.Transfers {
fmt.Printf("Chain %s Tx %s: %s -> %s, Amount=%s %s (Success=%v)\n",
chain,
t.TransactionHeader.Hash,
t.Sender,
t.Receiver,
t.Amount,
t.Currency.Symbol,
t.Success,
)
}
}
4
主应用程序
创建新文件
main.go:复制
询问AI
package main
import "log"
func main() {
cfg, err := LoadConfig("config.yml")
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
consumer, err := NewConsumer(cfg)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
RunConsumer(consumer, cfg.Topics)
}
5
运行程序
复制
询问AI
# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials and topics list
# Example config.yml
bootstrap_servers: "kafka.chainstream.io:9099"
username: "app_xxx"
password: "xxx"
group_id: "app_xxx-consumer-1"
topics:
- "eth.v1.transfers.proto"
- "bsc.v1.transfers.proto"
- "polygon.v1.transfers.proto"
- "tron.v1.transfers.proto"
ssl:
ca_cert: "server-ca-bundle.pem"
client_cert: "client.crt"
client_key: "client.key"
# Build and Run
go build -o stream_consumer
./stream_consumer

