环境准备
1. 下载Demo包kafka-confluent-go-demo.zip。
2. 使用开发工具导入Demo。
配置修改
1. 如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12 openssl pkcs12 -in caRoot.p12 -out caRoot.pem |
2. 修改kafka.json文件。(security.protocol仅在ssl连接时需要配置)
{ "topic": "XXX", "topic2": "XXX", "group.id": "XXX", "bootstrap.servers" : "XXX:XX", "security.protocol" : "SSL" } |
生产消息
发送以下命令发送消息。
go run -mod=vendor producer/producer.go |
生产消息示例代码如下:
package main import ( "encoding/json" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "path/filepath" "strconv" "time" ) const ( INT32_MAX = 2147483647 - 1000 ) type KafkaConfig struct { Topic string `json:"topic"` Topic2 string `json:"topic2"` GroupId string `json:"group.id"` BootstrapServers string `json:"bootstrap.servers"` SecurityProtocol string `json:"security.protocol"` SslCaLocation string `json:"ssl.ca.location"` } // config should be a pointer to structure, if not, panic func loadJsonConfig() *KafkaConfig { workPath, err := os.Getwd() if err != nil { panic(err) } configPath := filepath.Join(workPath, "conf") fullPath := filepath.Join(configPath, "kafka.json") file, err := os.Open(fullPath); if (err != nil) { msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err) panic(msg) } defer file.Close() decoder := json.NewDecoder(file) var config = &KafkaConfig{} err = decoder.Decode(config); if (err != nil) { msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err) panic(msg) } json.Marshal(config) return config } func doInitProducer(cfg *KafkaConfig) *kafka.Producer { fmt.Print("init kafka producer, it may take a few seconds to init the connection\n") //common arguments var kafkaconf = &kafka.ConfigMap{ "api.version.request": "true", "message.max.bytes": 1000000, "linger.ms": 500, "sticky.partitioning.linger.ms" : 1000, "retries": INT32_MAX, "retry.backoff.ms": 1000, "acks": "1"} kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers) switch cfg.SecurityProtocol { case "PLAINTEXT" : kafkaconf.SetKey("security.protocol", "plaintext"); case "SSL": kafkaconf.SetKey("security.protocol", "ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem") case "SASL_SSL": kafkaconf.SetKey("security.protocol", "sasl_ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism); kafkaconf.SetKey("enable.ssl.certificate.verification", "false") case "SASL_PLAINTEXT": kafkaconf.SetKey("security.protocol", "sasl_plaintext"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) default: panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true)) } producer, err := kafka.NewProducer(kafkaconf) if err != nil { panic(err) } fmt.Print("init kafka producer success\n") return producer } func main() { // Choose the correct protocol cfg := loadJsonConfig(); producer := doInitProducer(cfg) defer producer.Close() // Delivery report handler for produced messages go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Failed to write access log entry:%v", ev.TopicPartition.Error) } else { log.Printf("Send OK topic:%v partition:%v offset:%v content:%s\n", *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset, ev.Value) } } } }() // Produce messages to topic (asynchronously) i := 0 for { i = i + 1 value := "this is a kafka message from confluent go " + strconv.Itoa(i) var msg *kafka.Message = nil if i % 2 == 0 { msg = &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &cfg.Topic2, Partition: kafka.PartitionAny}, Value: []byte(value), } } else { msg = &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &cfg.Topic, Partition: kafka.PartitionAny}, Value: []byte(value), } } producer.Produce(msg, nil) time.Sleep(time.Duration(1) * time.Millisecond) } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) }
|
消费消息
发送以下命令消费消息。
go run -mod=vendor consumer/consumer.go |
消费消息示例代码如下:
package main import ( "encoding/json" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "os" "path/filepath" ) type KafkaConfig struct { Topic string `json:"topic"` Topic2 string `json:"topic2"` GroupId string `json:"group.id"` BootstrapServers string `json:"bootstrap.servers"` SecurityProtocol string `json:"security.protocol"` } // config should be a pointer to structure, if not, panic func loadJsonConfig() *KafkaConfig { workPath, err := os.Getwd() if err != nil { panic(err) } configPath := filepath.Join(workPath, "conf") fullPath := filepath.Join(configPath, "kafka.json") file, err := os.Open(fullPath); if (err != nil) { msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err) panic(msg) } defer file.Close() decoder := json.NewDecoder(file) var config = &KafkaConfig{} err = decoder.Decode(config); if (err != nil) { msg := fmt.Sprintf("Decode json fail for config file at %s. Error: %v", fullPath, err) panic(msg) } json.Marshal(config) return config } func doInitConsumer(cfg *KafkaConfig) *kafka.Consumer { fmt.Print("init kafka consumer, it may take a few seconds to init the connection\n") //common arguments var kafkaconf = &kafka.ConfigMap{ "api.version.request": "true", "auto.offset.reset": "latest", "heartbeat.interval.ms": 3000, "session.timeout.ms": 30000, "max.poll.interval.ms": 120000, "fetch.max.bytes": 1024000, "max.partition.fetch.bytes": 256000} kafkaconf.SetKey("bootstrap.servers", cfg.BootstrapServers); kafkaconf.SetKey("group.id", cfg.GroupId) switch cfg.SecurityProtocol { case "PLAINTEXT" : kafkaconf.SetKey("security.protocol", "plaintext"); case "SSL": kafkaconf.SetKey("security.protocol", "ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem") case "SASL_SSL": kafkaconf.SetKey("security.protocol", "sasl_ssl"); kafkaconf.SetKey("ssl.ca.location", "/XXX/caRoot.pem"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) case "SASL_PLAINTEXT": kafkaconf.SetKey("security.protocol", "sasl_plaintext"); kafkaconf.SetKey("sasl.username", cfg.SaslUsername); kafkaconf.SetKey("sasl.password", cfg.SaslPassword); kafkaconf.SetKey("sasl.mechanism", cfg.SaslMechanism) default: panic(kafka.NewError(kafka.ErrUnknownProtocol, "unknown protocol", true)) } consumer, err := kafka.NewConsumer(kafkaconf) if err != nil { panic(err) } fmt.Print("init kafka consumer success\n") return consumer; } func main() { // Choose the correct protocol cfg := loadJsonConfig(); consumer := doInitConsumer(cfg) consumer.SubscribeTopics([]string{cfg.Topic, cfg.Topic2}, nil) for { msg, err := consumer.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { // The client will //automatically try to recover from all errors. fmt.Printf("Consumer error: %v (%v)\n", err, msg) } } consumer.Close() } |