Distributed Message Service (Kafka)

Java

2024-05-09 09:12:42

1.1.1         Setting Up the Java Development Environment

Development Environment

l  Maven

l  Apache Maven 2.5 or later can be downloaded from the Maven official website. JDK

Java Development Kit 1.8 or later can be downloaded from the Oracle official website.

l  After the installation, configure the Java environment variables. IntelliJ IDEA

IntelliJ IDEA can be downloaded from the IntelliJ IDEA official website and be installed.

Procedure

1.         Download the demo package kafka-java-demo.zip.

Decompress the package to obtain the following files.

Table 1 Files in the demo package

File name

Directory

Description

JavaKafkaConfigurer.java

.\src\main\java\javaDemo

Read Kafka configuration   files.

KafkaConsumerDemo.java

.\src\main\java\javaDemo

Consume messages.

KafkaMultiConsumerDemo.java

.\src\main\java\javaDemo

Consume messages in bulk.

KafkaProducerDemo.java

.\src\main\java\javaDemo

Produce messages.

    kafka.properties

.\src\main\resources

Kafka configuration parameter

pom.xml

.\

Maven configuration file,   containing the Kafka client dependencies.

2.         In IntelliJ IDEA, import the demo project. The demo project is a Java project built in Maven. Therefore, you need the JDK and the Maven plugin in IDEA.

3.         Modify Kafka configuration information.

Modify kafka.properties in the demo project.

1.1.2         Configuring Kafka Clients in Java

This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDE, see Setting Up the Java Development Environment.

The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.

Adding Kafka Clients in Maven

Install the Java dependency library and add the following dependencies to pom.xml.

 

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

    <version>2.4.0</version>

</dependency>

<dependency>

    <groupId>org.slf4j</groupId>

    <artifactId>slf4j-log4j12</artifactId>

    <version>1.7.6</version>

</dependency>


Modifying Configurations

1.         If it is an SSL endpoint, download the certificate from the console.

2.         Modify the kafka.properties file.

 

##==============================Common configuration parameters==============================

bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx

topic=xxx

group.id=xxx

##=======================Configure the following parameters based on your business requirements========================

##Configure the certificate path of the SSL endpoint

ssl.truststore.location=/xxxx/ssl.client.truststore.jks


3.         If it is not an SSL endpoint, comment out the SSL part in the code. If it is an SSL endpoint, change the certificate password (ssl.client.pw.key) in the code.

 

// The password of the root certificate store. Use the default value. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "xxxxxxx");


Reading Configuration Information

 

import java.util.Properties;

public class JavaKafkaConfigurer {

    private static Properties properties;

    public synchronized static Properties getKafkaProperties() {

        if (null != properties) {

            return properties;

        }

        //Obtain the content of the kafka.properties file.

        Properties kafkaProperties = new Properties();

        try {

            kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));

        } catch (Exception e) {

            // If the file cannot be loaded, exit the program.

            e.printStackTrace();

        }

        properties = kafkaProperties;

        return kafkaProperties;

    }

}



Producing Messages

 

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.Future;

import org.apache.kafka.clients.CommonClientConfigs;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.apache.kafka.common.config.SslConfigs;

 

public class KafkaProducerDemo {

    public static void main(String args[]) {

        //Load kafka.properties

        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        //Set the endpoint. Obtain the endpoint of the corresponding topic in the console.

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //Set the path of the SSL root certificate. Replace XXX with your path.

        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));

        //Specify the password of the root certificate store. Use the default value.

        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "c24f5210");

        //Specify the access protocol. Set the value to SSL.

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

        //How Kafka messages are serialized

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");




Consuming Messages


import org.apache.kafka.clients.CommonClientConfigs;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.config.SslConfigs;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

 

public class KafkaConsumerDemo {

    public static void main(String args[]) {

        //Load kafka.properties

        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();

        //Set the endpoint. Obtain the endpoint of the corresponding topic in the console.

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //Set the path of the SSL root certificate. Replace XXX with your path.

        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));

        //Specify the password of the root certificate store. Use the default value.

        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "c24f5210");

        //Specify the access protocol. Set the value to SSL.

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

        //Set the maximum interval between two polling cycles.

        //Set the value based on actual conditions and your version. The default value is 30 s.

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

        //Specify the maximum message size allowed for a single poll. If data is transmitted over a public network, this parameter may significantly influe


_ILHYYFdIGDu