分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。
创建用户并配置用户权限
进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
运行生产者客户端
如下是Java客户端代码示例,注意修改内容(用户、密码、接入地址、主题名称)
private static void testPlainSaslProducer() throws Exception {
Properties props = new Properties();
// 填写应用用户密码
String username="user";
String password="password";
//注意!密码需要md5
password = DigestUtils.md5DigestAsHex(password.getBytes());
String template="org.apache.kafka.common.security.scram.ScramLoginModule required " +"username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(template, username, password);
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",jaasCfg);
props.put("security.protocol", "SASL_PLAINTEXT");
// 填写sasl接入地址
props.put("bootstrap.servers", "!sasl_address!");
props.put("acks", "0");
props.put("retries", 3);
props.put("batch.size", 1684);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int index = 0;
TimeUnit.SECONDS.sleep(2);
while (true) {
String dvalue = "hello kafka";
// 填写主题和消息体内容
ProducerRecord record = new ProducerRecord<>("topicName", "pps200" + index++, dvalue);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
} } );
TimeUnit.SECONDS.sleep(1)
} }
运行消费者客户端
如下是Java客户端代码示例,注意修改内容(用户、密码、接入地址、消费组名称、主题名称)
private static void testPlainSaslConsumer() throws Exception {
Properties props = new Properties();
// 填写应用用户密码
String username="user";
String password="password";
//注意!密码需要md5
password = DigestUtils.md5DigestAsHex(password.getBytes());
String template="org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(template, username, password);
// 填写sasl接入地址
props.put("bootstrap.servers", "!sasl_address!");
// 填写消费组名称
props.put("group.id", "!consumerGroup!");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("sasl.jaas.config",jaasCfg);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// // 填写订阅的topic
consumer.subscribe(Arrays.asList("topicName"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf("==============>poll size = %d%n", records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s partition = %s%n", record.offset(), record.key(), record.value(), record.partition());
}
consumer.commitAsync();//手动提交进度
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}