
我可以使用confluence cli 连接到confluence kafka 集群,但无法使用segmentio 的kafka-go 库。 我收到以下错误。
with SASL: SASL handshake failed: EOF
这是我在 go 中的函数
package consumer
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
func Consume(ctx context.Context) {
// create a new logger that outputs to stdout
// and has the `kafka reader` prefix
l := log.New(os.Stdout, "kafka reader: ", 0)
mechanism := plain.Mechanism{
Username: "my-api-key",
Password: "my-api-secret",
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress}, // brokerAddress given in confluent cloud cluster settings.
Topic: []string{"steps"}[0],
// assign the logger to the reader
Logger: l,
Dialer: dialer,
})
for {
// the `ReadMessage` method blocks until we receive the next event
msg, err := r.ReadMessage(ctx)
if err != nil {
panic("could not read message " + err.Error())
}
// after receiving the message, log its value
fmt.Println("received: ", string(msg.Value))
}
}我尝试生成新密钥、使用我的帐户用户名和密码、减少分区,但没有任何效果。
看来您的服务器的 TLS 版本不被接受,您可以使用 MinVersion 强制 go-kafka 接受它:
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
以上就是无法使用segmentio的kafka-go连接到Confluence Kafka的详细内容,更多请关注php中文网其它相关文章!
Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号