Skip to main content

Kafka Data Streams

Kafka Streams deliver real-time, high-performance blockchain data. Available formats include Protobuf and JSON, optimized for various latency and payload needs. The schema is available here. The complete code is available here.

Kafka Consumer Implementation

Implementation in Go for subscribing and processing Kafka streams efficiently. This guide walks through the implementation of a Kafka consumer in Go to subscribe to a Kafka topic and process on-chain data streams from ChainStream in real-time. The consumer connects to Kafka brokers securely using SSL and handles incoming messages in Protobuf format.
1

Configuration

config.yml
  bootstrap_servers: "kafka.chainstream.io:9099"
  username: "<your_username_here>"
  password: "<your_password_here>"
  group.id: "<username_group-number>"

  topics:
    - "eth.v1.transfers.proto"
    - "bsc.v1.transfers.proto"
  ssl:
    ca_cert: "server-ca-bundle.pem"
    client_cert: "client.crt"
    client_key: "client.key"
2

Consumer Implementation

Create a new file consumer.go:
package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "io/ioutil"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "gopkg.in/yaml.v3"
)

type Config struct {
    BootstrapServers string   `yaml:"bootstrap_servers"`
    Username         string   `yaml:"username"`
    Password         string   `yaml:"password"`
    GroupID          string   `yaml:"group_id"`
    Topics           []string `yaml:"topics"`
    SSL              struct {
            CACert     string `yaml:"ca_cert"`
            ClientCert string `yaml:"client_cert"`
            ClientKey  string `yaml:"client_key"`
    } `yaml:"ssl"`
}

func LoadConfig(path string) (*Config, error) {
    data, err := ioutil.ReadFile(path)
    if err != nil {
            return nil, err
    }
    var cfg Config
    if err := yaml.Unmarshal(data, &cfg); err != nil {
            return nil, err
    }
    return &cfg, nil
}

func NewConsumer(cfg *Config) (*kafka.Consumer, error) {
    return kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":                   cfg.BootstrapServers,
            "group.id":                            cfg.GroupID,
            "auto.offset.reset":                   "earliest",
            "security.protocol":                   "SASL_SSL",
            "sasl.mechanisms":                     "SCRAM-SHA-512",
            "sasl.username":                       cfg.Username,
            "sasl.password":                       cfg.Password,
            "ssl.ca.location":                     cfg.SSL.CACert,
            "ssl.certificate.location":            cfg.SSL.ClientCert,
            "ssl.key.location":                    cfg.SSL.ClientKey,
            "enable.ssl.certificate.verification": true,
    })
}

func RunConsumer(c *kafka.Consumer, topics []string) {
    if err := c.SubscribeTopics(topics, nil); err != nil {
            fmt.Fprintf(os.Stderr, "Failed to subscribe topics: %v\n", err)
            return
    }

    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    run := true
    for run {
            select {
            case sig := <-sigchan:
                    fmt.Printf("Caught signal %v: terminating\n", sig)
                    run = false
            default:
                    ev := c.Poll(100)
                    switch e := ev.(type) {
                    case *kafka.Message:
                            topic := *e.TopicPartition.Topic
                            ProcessMessage(topic, e.Value)
                    case kafka.Error:
                            fmt.Fprintf(os.Stderr, "Error: %v\n", e)
                    }
            }
    }
    c.Close()
}
3

Processor Implementation

Create a new file processor.go:
package main

import (
    evm "chainstream/evm"
    "fmt"

    "google.golang.org/protobuf/proto"
)

func ProcessMessage(topic string, msg []byte) {
    fmt.Printf("\n[DEBUG] Got message from topic=%s, size=%d bytes\n", topic, len(msg))

    switch topic {
    case "eth.v1.transfers.proto":
            processEvmTransfers(msg, "eth")
    case "bsc.v1.transfers.proto":
            processEvmTransfers(msg, "bsc")
    default:
            fmt.Printf("[WARN] Unknown topic %s, raw head=%x\n", topic, msg[:min(32, len(msg))])
    }
}

func processEvmTransfers(msg []byte, chain string) {
    var transfers evm.TransfersMessage
    if err := proto.Unmarshal(msg, &transfers); err != nil {
            fmt.Println("[ERROR] Failed to decode EVM TransfersMessage:", err)
            return
    }

    for _, t := range transfers.Transfers {
            fmt.Printf("Chain %s Tx %s: %s -> %s, Amount=%s %s (Success=%v)\n",
                    chain,
                    t.TransactionHeader.Hash,
                    t.Sender,
                    t.Receiver,
                    t.Amount,
                    t.Currency.Symbol,
                    t.Success,
            )
    }
}
4

Main Application

Create the main entry point main.go:
package main

import "log"

func main() {
    cfg, err := LoadConfig("config.yml")
    if err != nil {
            log.Fatalf("Failed to load config: %v", err)
    }

    consumer, err := NewConsumer(cfg)
    if err != nil {
            log.Fatalf("Failed to create consumer: %v", err)
    }

    RunConsumer(consumer, cfg.Topics)
}

5

Running the Application

# Setup Configuration
cp config_example.yml config.yml
# Edit config.yml with your credentials and topics list

# Example config.yml
bootstrap_servers: "kafka.chainstream.io:9099"
username: "app_xxx"
password: "xxx"
group_id: "app_xxx-consumer-1"

topics:
  - "eth.v1.transfers.proto"
  - "bsc.v1.transfers.proto"
  - "polygon.v1.transfers.proto"
  - "tron.v1.transfers.proto"

ssl:
  ca_cert: "server-ca-bundle.pem"
  client_cert: "client.crt"
  client_key: "client.key"

# Build and Run
go build -o stream_consumer
./stream_consumer