分布式消息服务Kafka

通过认证生产与消费加密主题的消息

2024-05-08 09:01:50

分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。

创建用户并配置用户权限

进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。

运行生产者客户端

如下是Java客户端代码示例,注意修改内容(用户、密码、接入地址、主题名称)

 

    private static void testPlainSaslProducer() throws Exception {
    Properties props = new Properties();
    //
填写应用用户密码
    String username="user";
    String password="password";
    //注意!密码需要md5
    password = DigestUtils.md5DigestAsHex(password.getBytes());
    String template="org.apache.kafka.common.security.scram.ScramLoginModule required " +"username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(template, username, password);
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config",jaasCfg);
    props.put("security.protocol", "SASL_PLAINTEXT");
    // 填写sasl接入地址
    props.put("bootstrap.servers", "!sasl_address!");
    props.put("acks", "0");
    props.put("retries", 3);
    props.put("batch.size", 1684);
    props.put("linger.ms", 100);
    props.put("buffer.memory", 33554432); // buffer空间32M
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    int index = 0;
    TimeUnit.SECONDS.sleep(2);
    while (true) {
        String dvalue = "hello kafka";
        // 填写主题和消息体内容
        ProducerRecord record = new ProducerRecord<>("topicName", "pps200" + index++, dvalue);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
                if (paramRecordMetadata == null) {
                    System.out.println("paramRecordMetadata is null ");
                    paramException.printStackTrace();
                    return;
                }
                System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
            } } );
        TimeUnit.SECONDS.sleep(1)

      } }

 

                                             

运行消费者客户端

如下是Java客户端代码示例,注意修改内容(用户、密码、接入地址、消费组名称、主题名称)

 

    private static void testPlainSaslConsumer() throws Exception {
    Properties props = new Properties();
    //
填写应用用户密码
    String username="user";
    String password="password";
    //注意!密码需要md5
    password = DigestUtils.md5DigestAsHex(password.getBytes());
    String template="org.apache.kafka.common.security.scram.ScramLoginModule required " +
            "username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(template, username, password);
    // 填写sasl接入地址
    props.put("bootstrap.servers", "!sasl_address!");
    // 填写消费组名称
    props.put("group.id", "!consumerGroup!");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("sasl.jaas.config",jaasCfg);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // // 填写订阅的topic
    consumer.subscribe(Arrays.asList("topicName"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        System.out.printf("==============>poll size = %d%n", records.count());
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s partition = %s%n", record.offset(), record.key(), record.value(), record.partition());
        }
        consumer.commitAsync();//手动提交进度
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 


8UYwISyXgroJ