前提条件
l 已配置正确的安全组。
l 已获取连接Kafka实例的地址。
l 如果是使用内网通过同一个VPC访问,实例端口为8092,实例连接地址从控制台集群信息菜单中获取。
l 如果是公网访问,需要绑定公网弹性IP,实例连接地址获取如下图。
l 确认SASL认证机制为SCRAM-SHA-512。
l 确认启用的安全协议为SASL_PLAINTEXT。
l 如果Kafka实例未开启自动创建Topic功能,在连接实例前,请先创建Topic。
l 已下载Kafka命令行工具2.8.2版本,确保Kafka实例版本与命令行工具版本相同。
l 已创建弹性云服务器,如果使用内网通过同一个VPC访问实例,请设置弹性云服务器的VPC、子网、安全组与Kafka实例的VPC、子网、安全组一致。
命令行模式连接实例
以下操作命令以Linux系统为例进行说明:
1.解压Kafka命令行工具。
进入文件压缩包所在目录,然后执行tar -zxf [kafka_tar]命令解压文件。
其中,[kafka_tar]表示命令行工具的压缩包名称。
例如:tar -zxf kafka_2.13-2.8.2.tgz
2.修改Kafka命令行工具配置文件,在Kafka命令行工具的“/config”目录中找到“consumer.properties”和“producer.properties”文件。
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="**********" \ password="**********"; sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_PLAINTEXT |
3.进入Kafka命令行工具的“/bin”目录下。
注意
Windows系统下需要进入“/bin/windows”目录下。
4.执行如下命令进行生产消息。
./kafka-console-producer.sh --broker-list ${连接地址} --topic ${Topic名称} --producer.config ../config/producer.properties |
说明
l 连接地址:从前提条件中获取的连接地址。
l Topic名称:Kafka实例下创建的Topic名称。如果Kafka实例开启了自动创建Topic功能,此参数值可以填写已创建的Topic名称,也可以填写未创建的Topic名称。
本文以内网连接为例,获取的Kafka实例内网连接地址为“10.10.3.214:8092,10.10.3.126:8092”。执行完命令后输入内容,按“Enter”发送消息到Kafka实例,输入的每一行内容都将作为一条消息发送到Kafka实例。
[root@ctyun-kafka bin]#./kafka-console-producer.sh --broker-list 10.10.3.214:8092,10.10.3.126:8092 --topic topic-demo --producer.config ../config/producer.properties Hello DMS Kafka! [root@ctyun-kafka bin]# |
如需停止生产使用Ctrl+C命令退出。
5.执行如下命令消费消息。
./kafka-console-consumer.sh --bootstrap-server ${连接地址} --topic ${Topic名称} --group ${消费组名称} --from-beginning --consumer.config ../config/consumer.properties |
说明
l 连接地址:从前提条件中获取的连接地址。
l Topic名称:Kafka实例下创建的Topic名称。
l 消费组名称:根据您的业务需求,设定消费组名称。如果已经在配置文件中指定了消费组名称,请确保命令行中的消费组名称与配置文件中的相同,否则可能消费失败。消费组名称开头包含特殊字符,例如#号“#”时,监控数据无法展示。
示例如下:
[root@ctyun-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 10.10.3.214:8092,10.10.3.126:8092 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties Hello DMS Kafka! Processed a total of 3 messages [root@ctyun-kafka bin]# |
如需停止消费使用Ctrl+C命令退出。