分布式消息服务Kafka

编译运行Demo Java工程

2024-05-08 03:36:48

(一) 需要kafka-clients引入依赖

在使用Kafka时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用Kafka之前,请确保查阅官方文档以获取最新的依赖项和使用说明。

以Java编程语言为例,可以使用Kafka的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项:

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
    </dependency>


(二)示例代码

(1)从控制台获取以下信息

连接地址

Topic名称

消费组名称

(2)在实例代码中替换以上信息即可实现消息。


private static     void testPlainProducer() throws InterruptedExceptionthrows {

      Properties props = new Properties();

          props.put("bootstrap.servers","192.168.90.139;8090");  //连接地址

          props.put('acks","all");

      props.put('retries", 1);

      props.put("batch.size",     1684);

      props.put("linger.ms", 0) ;

          props.put("buffer.memory",33554432);       // buffer空间32M

      props.put('request.timeout.ms",     1000);

      props.put("key.serializer",     "org.apache. kafka. common. serialization. StringSerializer");

          props.put("value.serializer", "org.apache.kafka.     common. serialization. StringSerializer");

 

      Producer<tring, String>     producer = new KafkaProducer<~>(props);

       int index = 0;

       while(true) {

              String dvalue =     "hello"; //消息内容

              ProducerRecord record = new     ProducerRecord<>( topic:"pps", key:"pps"+index++,     dvalue); //主题名称

              producer.send(record, new     Callback() {

                    @Override

                    public void     onCompletion(RecordMetadata paramRecordMetadata, Exception paramException)     {

                            if     (paramRecordMetadata == null) {

                                    System.out.println("paramRecordMetadata is null ");

                                    paramException.printStackTrace() ;

                                return;

                            }

                           System.     out.println("发送的消息信息 " +     aramRecordMetadata. topic() +",partition:+     paramRecordMetadata.partition();

                     }

               });

               TimeUnit.SECONDS.sleep(     timeout: 1) ;

        }

            producer.close() ;

 }

点击启动后成功发送消息。

(3)同样在实例代码中替换以上信息即可消费消息。


private static void     testPlainConsumer() throws InterruptedException {

      Properties properties = new     Properties();

          properties.put("bootstrap.servers","10.142.233.65:9092"); //连接地址

          properties.put("group.id","ppsgroup"); //消费组名称

          properties.put("enable.auto.commit","true");

          properties.put("auto.offset.reset","earliest");

          properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

          properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

      KafkaConsumer<Object, Object>     consumer = new KafkaConsumer<>(properties);

          consumer.subscribe(Arrays.asList("pps")); //主题名称

      while (true) {

            ConsumerRecords<0bject,     Object> records = consumer.poll( timeout: 100);

            records.forEach(record->{

                   String format = String.     format(offset = %d, key = %s, value = %s, record.offset ), record. key0,     record. value();

                   System.     out.println(format);

            });

            TimeUnit.SECONSsleep( timeout:     1);

        }

}


 

 


ddWV_akBn7wP