Kafka 优势

更低延迟

  • 更短的数据管道
  • 与 GraphQL 订阅相比处理开销更小
  • 无需自定义数据库或额外的格式化逻辑

更好的可靠性

  • 比 WebSocket 更稳定的连接协议
  • 针对持久连接优化
  • 可以从最新偏移量读取消息

更强的可扩展性

  • 多个消费者可以分担负载
  • 自动负载重分配
  • 更好地处理高容量数据

Kafka 限制

浏览器不兼容

  • 仅支持服务器端实现
  • 无法直接在浏览器中使用

有限的过滤功能

  • 无预过滤功能
  • 预定义的模式
  • 需要客户端后处理

开发复杂性

  • 无 IDE 对流的支持
  • 调试仅限于消费者端

技术考虑因素

使用 Kafka 流时,需要考虑消息保留策略、延迟影响和分区策略。关键数据流的消息默认保留 24 小时。

实现指南

消息保留

保留策略

  • Protobuf 流:保留 24 小时
  • DEX 交易(JSON):保留 24 小时
  • 其他 JSON 流:保留 4 小时

连接设置

const sasl_conf = {
  'bootstrap.servers': 'kfk0.chainstream.io:9093,kfk1.chainstream.io:9093,kfk2.chainstream.io:9093',
  'security.protocol': 'SASL_SSL',
  'sasl.mechanism': 'SCRAM-SHA-512',
  'sasl.username': '<YOUR USERNAME>',
  'sasl.password': '<YOUR PASSWORD>',
  'ssl.key.location': 'client.key.pem',
  'ssl.ca.location': 'server.cer.pem',
  'ssl.certificate.location': 'client.cer.pem',
  'ssl.endpoint.identification.algorithm': 'none'
}

主题结构

<BLOCKCHAIN_NAME>.<MESSAGE_TYPE>
<BLOCKCHAIN_NAME>.broadcasted.<MESSAGE_TYPE>

消息类型:

  • dextrades - DEX 交易
  • dexorders - DEX 订单
  • dexpools - DEX 池
  • transactions - 交易
  • transfers - 转账
  • instructions - 指令
  • raw - 原始数据

Kafka 流处理最佳实践

高吞吐量的并行处理

  • 在多个分区之间分配消费者以并行处理数据
  • 为每个分区分配一个消费者线程以实现高效的负载均衡

持续流处理以保持稳定性

  • 保持 Kafka 消费者运行以确保数据流的无缝衔接
  • 异步处理消息以防止瓶颈
  • 避免阻塞主消费循环以维持最佳吞吐量

优化的消息处理

  • 在适用的情况下实现批处理以减少开销
  • 使用工作组以提高并发性
  • 持续监控处理延迟以确保实时性能

其他代码示例

前提条件

ChainStream Kafka 服务器访问

访问 ChainStream Kafka 代理

认证凭据

Kafka 代理的用户名和密码

主题订阅

要订阅的主题名称

Go 安装

Go 版本 >= 1.16

Kafka 客户端

Confluent Kafka Go 客户端库

依赖设置

go mod init kafka-consumer

实现

运行应用程序

go mod init kafka-consumer

执行工作流

Kafka 客户端初始化

  • 使用 SSL 和 SASL 配置进行初始化
  • 设置唯一的 group.id 以实现独立的消息消费

消费者设置

  • 创建 Kafka 消费者
  • 订阅指定主题

消息处理

  • 每 100ms 轮询新消息
  • 解析 JSON 消息
  • 记录处理后的数据

优雅关闭

  • 处理终止信号
  • 清理关闭消费者