Preparing the Environment
1. Download the demo package kafka-confluent-go-demo.zip.
2. Import the demo using development tools.
Modifying Configurations
1. If it is an SSL connection, download the certificate from the console. Unzip the compressed package to get ssl.client.truststore.jks, and execute the following command to generate the caRoot.pem file.
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12
openssl pkcs12 -in caRoot.p12 -out caRoot.pem
2. Modify the kafka.json file. (configure security.protocol only for SSL connections)
{
"topic": "XXX",
"topic2": "XXX",
"group.id": "XXX",
"bootstrap.servers" : "XXX:XX",
"security.protocol" : "SSL"
}
Producing Messages
Send the following command to produce messages.
go run -mod=vendor producer/producer.go
Examples:
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"os"
"path/filepath"
"strconv"
"time"
)
const (
INT32_MAX = 2147483647 - 1000
)
type KafkaConfig struct {
Topic string `json:"topic"`
Topic2 string `json:"topic2"`
GroupId string `json:"group.id"`
BootstrapServers string `json:"bootstrap.servers"`
SecurityProtocol string `json:"security.protocol"`
SslCaLocation string `json:"ssl.ca.location"`
}
// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
workPath, err := os.Getwd()
if err != nil {
panic(err)
}
configPath := filepath.Join(workPath, "conf")
fullPath := filepath.Join(configPath, "kafka.json")
file, err := os.Open(fullPath);
if (err != nil) {
msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
Consuming Messages
Send the following command to consume messages.
go run -mod=vendor consumer/consumer.go
Examples:
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"path/filepath"
)
type KafkaConfig struct {
Topic string `json:"topic"`
Topic2 string `json:"topic2"`
GroupId string `json:"group.id"`
BootstrapServers string `json:"bootstrap.servers"`
SecurityProtocol string `json:"security.protocol"`
}
// config should be a pointer to structure, if not, panic
func loadJsonConfig() *KafkaConfig {
workPath, err := os.Getwd()
if err != nil {
panic(err)
}
configPath := filepath.Join(workPath, "conf")
fullPath := filepath.Join(configPath, "kafka.json")
file, err := os.Open(fullPath);
if (err != nil) {
msg := fmt.Sprintf("Can not load config at %s. Error: %v", fullPath, err)
panic(msg)
}
defer file.Close()
decoder := json.NewDecoder(file)
var config = &KafkaConfig{}
err = decoder.Decode(config);
if (err != nil) {