1.1.1 Setting Up the Java Development Environment
Development Environment
l Maven
l Apache Maven 2.5 or later can be downloaded from the Maven official website. JDK
Java Development Kit 1.8 or later can be downloaded from the Oracle official website.
l After the installation, configure the Java environment variables. IntelliJ IDEA
IntelliJ IDEA can be downloaded from the IntelliJ IDEA official website and be installed.
Procedure
1. Download the demo package kafka-java-demo.zip.
Decompress the package to obtain the following files.
Table 1 Files in the demo package
File name | Directory | Description |
JavaKafkaConfigurer.java | .\src\main\java\javaDemo | Read Kafka configuration files. |
KafkaConsumerDemo.java | .\src\main\java\javaDemo | Consume messages. |
KafkaMultiConsumerDemo.java | .\src\main\java\javaDemo | Consume messages in bulk. |
KafkaProducerDemo.java | .\src\main\java\javaDemo | Produce messages. |
kafka.properties | .\src\main\resources | Kafka configuration parameter |
pom.xml | .\ | Maven configuration file, containing the Kafka client dependencies. |
2. In IntelliJ IDEA, import the demo project. The demo project is a Java project built in Maven. Therefore, you need the JDK and the Maven plugin in IDEA.
3. Modify Kafka configuration information.
Modify kafka.properties in the demo project.
1.1.2 Configuring Kafka Clients in Java
This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDE, see Setting Up the Java Development Environment.
The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.
Adding Kafka Clients in Maven
Install the Java dependency library and add the following dependencies to pom.xml.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
Modifying Configurations
1. If it is an SSL endpoint, download the certificate from the console.
2. Modify the kafka.properties file.
##==============================Common configuration parameters==============================
bootstrap.servers=xxxxxxxxxxxxxxxxxxxxx
topic=xxx
group.id=xxx
##=======================Configure the following parameters based on your business requirements========================
##Configure the certificate path of the SSL endpoint
ssl.truststore.location=/xxxx/ssl.client.truststore.jks
3. If it is not an SSL endpoint, comment out the SSL part in the code. If it is an SSL endpoint, change the certificate password (ssl.client.pw.key) in the code.
// The password of the root certificate store. Use the default value. props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "xxxxxxx");
Reading Configuration Information
import java.util.Properties;
public class JavaKafkaConfigurer {
private static Properties properties;
public synchronized static Properties getKafkaProperties() {
if (null != properties) {
return properties;
}
//Obtain the content of the kafka.properties file.
Properties kafkaProperties = new Properties();
try {
kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
} catch (Exception e) {
// If the file cannot be loaded, exit the program.
e.printStackTrace();
}
properties = kafkaProperties;
return kafkaProperties;
}
}
Producing Messages
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaProducerDemo {
public static void main(String args[]) {
//Load kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//Set the endpoint. Obtain the endpoint of the corresponding topic in the console.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//Set the path of the SSL root certificate. Replace XXX with your path.
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
//Specify the password of the root certificate store. Use the default value.
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "c24f5210");
//Specify the access protocol. Set the value to SSL.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
//How Kafka messages are serialized
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Consuming Messages
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String args[]) {
//Load kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
//Set the endpoint. Obtain the endpoint of the corresponding topic in the console.
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//Set the path of the SSL root certificate. Replace XXX with your path.
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
//Specify the password of the root certificate store. Use the default value.
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "c24f5210");
//Specify the access protocol. Set the value to SSL.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
//Set the maximum interval between two polling cycles.
//Set the value based on actual conditions and your version. The default value is 30 s.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//Specify the maximum message size allowed for a single poll. If data is transmitted over a public network, this parameter may significantly influe