Kafka Data Streams
Kafka Streams deliver real-time, high-performance blockchain data. Available formats include Protobuf and JSON, optimized for various latency and payload needs. The schema is available here. The complete code is available here.Kafka Consumer Implementation
Implementation in Go for subscribing and processing Kafka streams efficiently. This guide walks through the implementation of a Kafka consumer in Go to subscribe to a Kafka topic and process on-chain data streams from ChainStream in real-time. The consumer connects to Kafka brokers securely using SSL and handles incoming messages in Protobuf format.1
Configuration
config.yml:Copy
Ask AI
bootstrap_servers: "kafka.chainstream.io:9099"
username: "<your_username_here>"
password: "<your_password_here>"
group.id: "<username_group-number>"
topics:
- "eth.v1.transfers.proto"
- "bsc.v1.transfers.proto"
ssl:
ca_cert: "server-ca-bundle.pem"
client_cert: "client.crt"
client_key: "client.key"
2
Consumer Implementation
Create a new file
consumer.go:Copy
Ask AI
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"io/ioutil"
"github.com/confluentinc/confluent-kafka-go/kafka"
"gopkg.in/yaml.v3"
)
type Config struct {
BootstrapServers string `yaml:"bootstrap_servers"`
Username string `yaml:"username"`
Password string `yaml:"password"`
GroupID string `yaml:"group_id"`
Topics []string `yaml:"topics"`
SSL struct {
CACert string `yaml:"ca_cert"`
ClientCert string `yaml:"client_cert"`
ClientKey string `yaml:"client_key"`
} `yaml:"ssl"`
}
func LoadConfig(path string) (*Config, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
func NewConsumer(cfg *Config) (*kafka.Consumer, error) {
return kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": cfg.BootstrapServers,
"group.id": cfg.GroupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-512",
"sasl.username": cfg.Username,
"sasl.password": cfg.Password,
"ssl.ca.location": cfg.SSL.CACert,
"ssl.certificate.location": cfg.SSL.ClientCert,
"ssl.key.location": cfg.SSL.ClientKey,
"enable.ssl.certificate.verification": true,
})
}
func RunConsumer(c *kafka.Consumer, topics []string) {
if err := c.SubscribeTopics(topics, nil); err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe topics: %v\n", err)
return
}
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
switch e := ev.(type) {
case *kafka.Message:
topic := *e.TopicPartition.Topic
ProcessMessage(topic, e.Value)
case kafka.Error:
fmt.Fprintf(os.Stderr, "Error: %v\n", e)
}
}
}
c.Close()
}
3
Processor Implementation
Create a new file
processor.go:Copy
Ask AI
package main
import (
evm "chainstream/evm"
"fmt"
"google.golang.org/protobuf/proto"
)
func ProcessMessage(topic string, msg []byte) {
fmt.Printf("\n[DEBUG] Got message from topic=%s, size=%d bytes\n", topic, len(msg))
switch topic {
case "eth.v1.transfers.proto":
processEvmTransfers(msg, "eth")
case "bsc.v1.transfers.proto":
processEvmTransfers(msg, "bsc")
default:
fmt.Printf("[WARN] Unknown topic %s, raw head=%x\n", topic, msg[:min(32, len(msg))])
}
}
func processEvmTransfers(msg []byte, chain string) {
var transfers evm.TransfersMessage
if err := proto.Unmarshal(msg, &transfers); err != nil {
fmt.Println("[ERROR] Failed to decode EVM TransfersMessage:", err)
return
}
for _, t := range transfers.Transfers {
fmt.Printf("Chain %s Tx %s: %s -> %s, Amount=%s %s (Success=%v)\n",
chain,
t.TransactionHeader.Hash,
t.Sender,
t.Receiver,
t.Amount,
t.Currency.Symbol,
t.Success,
)
}
}
4
Main Application
Create the main entry point
main.go:Copy
Ask AI
package main
import "log"
func main() {
cfg, err := LoadConfig("config.yml")
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
consumer, err := NewConsumer(cfg)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
RunConsumer(consumer, cfg.Topics)
}
5
Running the Application
Copy
Ask AI
# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials and topics list
# Example config.yml
bootstrap_servers: "kafka.chainstream.io:9099"
username: "app_xxx"
password: "xxx"
group_id: "app_xxx-consumer-1"
topics:
- "eth.v1.transfers.proto"
- "bsc.v1.transfers.proto"
- "polygon.v1.transfers.proto"
- "tron.v1.transfers.proto"
ssl:
ca_cert: "server-ca-bundle.pem"
client_cert: "client.crt"
client_key: "client.key"
# Build and Run
go build -o stream_consumer
./stream_consumer

