Distributed Message Service (Kafka)

Go

2024-05-09 09:20:35

Preparing the Environment

1.         Download the demo package kafka-confluent-go-demo.zip.

2.         Import the demo using development tools.

Modifying Configurations

1.         If it is an SSL connection, download the certificate from the console. Unzip the compressed package to get ssl.client.truststore.jks, and execute the following command to generate the caRoot.pem file.


keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12

openssl pkcs12 -in caRoot.p12 -out caRoot.pem


2.         Modify the kafka.json file. (configure security.protocol only for SSL connections)


{

  "topic": "XXX",

  "topic2": "XXX",

  "group.id": "XXX",

  "bootstrap.servers" : "XXX:XX",

  "security.protocol" : "SSL"

}


Producing Messages

Send the following command to produce messages.


go run -mod=vendor producer/producer.go


Examples:

 

package main

import (

        "encoding/json"

        "fmt"

        "github.com/confluentinc/confluent-kafka-go/kafka"

        "log"

        "os"

        "path/filepath"

        "strconv"

        "time"

)

const (

        INT32_MAX = 2147483647 - 1000

)

type KafkaConfig struct {

        Topic      string `json:"topic"`

        Topic2      string `json:"topic2"`

        GroupId    string `json:"group.id"`

        BootstrapServers    string `json:"bootstrap.servers"`

        SecurityProtocol string `json:"security.protocol"`

        SslCaLocation string `json:"ssl.ca.location"`

}

// config should be a pointer to structure, if not, panic

func loadJsonConfig() *KafkaConfig {

        workPath, err := os.Getwd()

        if err != nil {

                 panic(err)

        }

        configPath := filepath.Join(workPath, "conf")

        fullPath := filepath.Join(configPath, "kafka.json")

        file, err := os.Open(fullPath);

        if (err != nil) {

                 msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)




Consuming Messages

Send the following command to consume messages.


go run -mod=vendor consumer/consumer.go 


Examples:

 

package main

import (

        "encoding/json"

        "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"

        "os"

        "path/filepath"

)

type KafkaConfig struct {

        Topic      string `json:"topic"`

        Topic2      string `json:"topic2"`

        GroupId    string `json:"group.id"`

        BootstrapServers    string `json:"bootstrap.servers"`

        SecurityProtocol string `json:"security.protocol"`

}

// config should be a pointer to structure, if not, panic

func loadJsonConfig() *KafkaConfig {

        workPath, err := os.Getwd()

        if err != nil {

                 panic(err)

        }

        configPath := filepath.Join(workPath, "conf")

        fullPath := filepath.Join(configPath, "kafka.json")

        file, err := os.Open(fullPath);

        if (err != nil) {

                 msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)

                 panic(msg)

        }

        defer file.Close()

        decoder := json.NewDecoder(file)

        var config = &KafkaConfig{}

        err = decoder.Decode(config);

        if (err != nil) {


VpYcivGC0ASM