分布式消息服务Kafka

Go

2024-05-09 02:49:57

环境准备

1.         下载Demo包kafka-confluent-go-demo.zip

2.         使用开发工具导入Demo。

配置修改

1.         如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。


keytool     -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore     caRoot.p12 -deststoretype pkcs12

openssl pkcs12 -in     caRoot.p12 -out caRoot.pem


2.         修改kafka.json文件。(security.protocol仅在ssl连接时需要配置)


{

  "topic": "XXX",

  "topic2": "XXX",

  "group.id": "XXX",

  "bootstrap.servers" :     "XXX:XX",

  "security.protocol" :     "SSL"

}


生产消息

发送以下命令发送消息。

 


go run -mod=vendor     producer/producer.go


生产消息示例代码如下:

 


package main

import (

        "encoding/json"

        "fmt"

        "github.com/confluentinc/confluent-kafka-go/kafka"

        "log"

        "os"

        "path/filepath"

        "strconv"

        "time"

)

const (

        INT32_MAX = 2147483647 - 1000

)

type KafkaConfig     struct {

        Topic      string `json:"topic"`

        Topic2      string `json:"topic2"`

        GroupId    string `json:"group.id"`

        BootstrapServers    string     `json:"bootstrap.servers"`

        SecurityProtocol string     `json:"security.protocol"`

        SslCaLocation string     `json:"ssl.ca.location"`

}

// config should be a     pointer to structure, if not, panic

func loadJsonConfig()     *KafkaConfig {

        workPath, err := os.Getwd()

        if err != nil {

                 panic(err)

        }

        configPath :=     filepath.Join(workPath, "conf")

        fullPath :=     filepath.Join(configPath, "kafka.json")

        file, err := os.Open(fullPath);

        if (err != nil) {

                 msg :=     fmt.Sprintf("Can not load config at %s. Error: %v", fullPath,     err)

                 panic(msg)

        }

        defer file.Close()

        decoder := json.NewDecoder(file)

        var config = &KafkaConfig{}

        err = decoder.Decode(config);

        if (err != nil) {

                 msg :=     fmt.Sprintf("Decode json fail for config file at %s. Error: %v",     fullPath, err)

                 panic(msg)

        }

        json.Marshal(config)

        return  config

}

func     doInitProducer(cfg *KafkaConfig) *kafka.Producer {

        fmt.Print("init kafka producer,     it may take a few seconds to init the connection\n")

        //common arguments

        var kafkaconf =     &kafka.ConfigMap{

                 "api.version.request":     "true",

                 "message.max.bytes":     1000000,

                 "linger.ms": 500,

                 "sticky.partitioning.linger.ms"     : 1000,

                 "retries":     INT32_MAX,

                 "retry.backoff.ms":     1000,

                 "acks":     "1"}

        kafkaconf.SetKey("bootstrap.servers",     cfg.BootstrapServers)

        switch cfg.SecurityProtocol {

        case "PLAINTEXT" :

                 kafkaconf.SetKey("security.protocol",     "plaintext");

    case "SSL":

            kafkaconf.SetKey("security.protocol", "ssl");

            kafkaconf.SetKey("ssl.ca.location",     "/XXX/caRoot.pem")

        case "SASL_SSL":

                 kafkaconf.SetKey("security.protocol",     "sasl_ssl");

                 kafkaconf.SetKey("ssl.ca.location",     "/XXX/caRoot.pem");

                 kafkaconf.SetKey("sasl.username",     cfg.SaslUsername);

                 kafkaconf.SetKey("sasl.password",     cfg.SaslPassword);

                 kafkaconf.SetKey("sasl.mechanism",     cfg.SaslMechanism);

              kafkaconf.SetKey("enable.ssl.certificate.verification",     "false")

        case "SASL_PLAINTEXT":

                 kafkaconf.SetKey("security.protocol",     "sasl_plaintext");

                 kafkaconf.SetKey("sasl.username",     cfg.SaslUsername);

                 kafkaconf.SetKey("sasl.password",     cfg.SaslPassword);

                 kafkaconf.SetKey("sasl.mechanism",     cfg.SaslMechanism)

        default:

                 panic(kafka.NewError(kafka.ErrUnknownProtocol,     "unknown protocol", true))

        }

        producer, err :=     kafka.NewProducer(kafkaconf)

        if err != nil {

                 panic(err)

        }

        fmt.Print("init kafka producer     success\n")

        return producer

}

func main() {

        // Choose the correct protocol

        cfg := loadJsonConfig();

        producer := doInitProducer(cfg)

    defer producer.Close()

        // Delivery report handler for     produced messages

        go func() {

                 for e := range     producer.Events() {

                         switch ev :=     e.(type) {

                         case     *kafka.Message:

                                  if     ev.TopicPartition.Error != nil {

                                          log.Printf("Failed     to write access log entry:%v", ev.TopicPartition.Error)

                                  } else {

                                          log.Printf("Send     OK topic:%v partition:%v offset:%v content:%s\n",     *ev.TopicPartition.Topic,      ev.TopicPartition.Partition, ev.TopicPartition.Offset, ev.Value)

                                  }

                         }

                 }

        }()

    // Produce messages to topic     (asynchronously)

        i := 0

        for {

                 i = i + 1

                 value := "this is a     kafka message from confluent go " + strconv.Itoa(i)

                 var msg *kafka.Message =     nil

                 if i % 2 == 0 {

                         msg =     &kafka.Message{

                                  TopicPartition:     kafka.TopicPartition{Topic: &cfg.Topic2, Partition:     kafka.PartitionAny},

                                  Value:          []byte(value),

                         }

                 } else {

                         msg =     &kafka.Message{

                                  TopicPartition:     kafka.TopicPartition{Topic: &cfg.Topic, Partition: kafka.PartitionAny},

                                  Value:          []byte(value),

                         }

                 }

                 producer.Produce(msg, nil)

                 time.Sleep(time.Duration(1)     * time.Millisecond)

        }

        // Wait for message deliveries     before shutting down

        producer.Flush(15 * 1000)

}

 


消费消息

发送以下命令消费消息。

 


go run -mod=vendor     consumer/consumer.go


消费消息示例代码如下:

 


package main

import (

        "encoding/json"

        "fmt"

        "github.com/confluentinc/confluent-kafka-go/kafka"

        "os"

        "path/filepath"

)

type KafkaConfig     struct {

        Topic      string `json:"topic"`

        Topic2      string `json:"topic2"`

        GroupId    string `json:"group.id"`

        BootstrapServers    string     `json:"bootstrap.servers"`

        SecurityProtocol string     `json:"security.protocol"`

}

// config should be a     pointer to structure, if not, panic

func loadJsonConfig()     *KafkaConfig {

        workPath, err := os.Getwd()

        if err != nil {

                 panic(err)

        }

        configPath :=     filepath.Join(workPath, "conf")

        fullPath :=     filepath.Join(configPath, "kafka.json")

        file, err := os.Open(fullPath);

        if (err != nil) {

                 msg := fmt.Sprintf("Can     not load config at %s. Error: %v", fullPath, err)

                 panic(msg)

        }

        defer file.Close()

        decoder := json.NewDecoder(file)

        var config = &KafkaConfig{}

        err = decoder.Decode(config);

        if (err != nil) {

                 msg :=     fmt.Sprintf("Decode json fail for config file at %s. Error: %v",     fullPath, err)

                 panic(msg)

        }

        json.Marshal(config)

        return  config

}

func     doInitConsumer(cfg *KafkaConfig) *kafka.Consumer {

        fmt.Print("init kafka consumer,     it may take a few seconds to init the connection\n")

        //common arguments

        var kafkaconf =     &kafka.ConfigMap{

                 "api.version.request":     "true",

                 "auto.offset.reset":     "latest",

                 "heartbeat.interval.ms":     3000,

                 "session.timeout.ms":     30000,

                 "max.poll.interval.ms":     120000,

                 "fetch.max.bytes":     1024000,

                 "max.partition.fetch.bytes":     256000}

        kafkaconf.SetKey("bootstrap.servers",     cfg.BootstrapServers);

        kafkaconf.SetKey("group.id",     cfg.GroupId)

        switch cfg.SecurityProtocol {

        case "PLAINTEXT" :

                 kafkaconf.SetKey("security.protocol",     "plaintext");

        case "SSL":

            kafkaconf.SetKey("security.protocol",     "ssl");

            kafkaconf.SetKey("ssl.ca.location",     "/XXX/caRoot.pem")

        case "SASL_SSL":

                 kafkaconf.SetKey("security.protocol",     "sasl_ssl");

                 kafkaconf.SetKey("ssl.ca.location",     "/XXX/caRoot.pem");

                 kafkaconf.SetKey("sasl.username",     cfg.SaslUsername);

                 kafkaconf.SetKey("sasl.password",     cfg.SaslPassword);

                 kafkaconf.SetKey("sasl.mechanism",     cfg.SaslMechanism)

        case "SASL_PLAINTEXT":

                 kafkaconf.SetKey("security.protocol",     "sasl_plaintext");

                 kafkaconf.SetKey("sasl.username",     cfg.SaslUsername);

                 kafkaconf.SetKey("sasl.password",     cfg.SaslPassword);

                 kafkaconf.SetKey("sasl.mechanism",     cfg.SaslMechanism)

        default:

                 panic(kafka.NewError(kafka.ErrUnknownProtocol,     "unknown protocol", true))

        }

        consumer, err :=     kafka.NewConsumer(kafkaconf)

        if err != nil {

                 panic(err)

        }

        fmt.Print("init kafka consumer     success\n")

        return consumer;

}

func main() {

        // Choose the correct protocol

        cfg := loadJsonConfig();

        consumer := doInitConsumer(cfg)

        consumer.SubscribeTopics([]string{cfg.Topic,     cfg.Topic2}, nil)

        for {

                 msg, err :=     consumer.ReadMessage(-1)

                 if err == nil {

                         fmt.Printf("Message     on %s: %s\n", msg.TopicPartition, string(msg.Value))

                 } else {

                         // The client will

                         //automatically try     to recover from all errors.

                         fmt.Printf("Consumer     error: %v (%v)\n", err, msg)

                 }

        }

        consumer.Close()

}