分布式消息服务-RocketMQ

生产消费验证

2024-07-02 01:57:20

背景信息

RocketMQ的生产消费验证是指在使用RocketMQ进行消息生产和消费时的验证过程。具体而言,验证包括以下几个方面:

l  生产者验证:RocketMQ提供了丰富的生产者API,开发人员可以使用这些API将消息发送到RocketMQ的消息队列中。在验证阶段,可以通过发送消息并检查返回结果来确保消息成功发送到Broker节点。此外,生产者还应该验证消息的顺序性、事务性以及可靠性等方面。

l  消费者验证:RocketMQ的消费者可以订阅特定的消息主题,从而消费这些主题下的消息。在验证阶段,消费者应该能够正确地从Broker节点拉取消息并进行消费处理。消费者还可以验证消息的顺序性、重试机制以及消息过滤等功能。

操作步骤

1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。

2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。

进入实例列表,点击【管理】按钮进入管理菜单。

3、 进入实例列表,点击【管理】按钮进入管理菜单。

4、 进入主题管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。

1)生产测试拨测:

l  选择消息类型,默认普通消息。

l  填写需要产生的测试消息数量,以及每条消息的大小,默认每条消息1KB,建议不超过4MB(4096KB)。

l  选择已建的消息主题,若无选项,请新增主题,详见上文创建主题和订阅组

l  点击【测试】按钮,按照已填写规格及数量产生测试消息数据,展示消息数据的信息,包括消息ID(messageID)、发送状态、主题名(topic名)、Broker名、队列ID。

拨测功能涉及消息发送状态码,以下是RocketMQ消息发送状态码及其说明:

² SEND_OK(发送成功):表示消息成功发送到了消息服务器。

² FLUSH_DISK_TIMEOUT(刷新磁盘超时):表示消息已经成功发送到消息服务器,但是刷新到磁盘上超时。这可能会导致消息服务器在宕机后,尚未持久化到磁盘上的数据丢失。

²  FLUSH_SLAVE_TIMEOUT(刷新从服务器超时):表示消息已经成功发送到消息服务器,但是刷新到从服务器上超时。这可能会导致主从同步不一致。

² SLAVE_NOT_AVAILABLE(从服务器不可用):表示消息已经成功发送到消息服务器,但是从服务器不可用。这可能是由于网络故障或从服务器宕机引起的。

² UNKNOWN_ERROR(未知错误):表示发送消息时遇到了未知的错误。一般情况下建议重试发送消息。

² MESSAGE_SIZE_EXCEEDED(消息大小超过限制):表示消息的大小超过了消息服务器的限制。需要检查消息的大小是否合适。

² PRODUCE_THROTTLE(消息生产被限流):表示消息生产者的频率超出了消息服务器的限制。这可能是由于消息发送频率过高引起的。

² SERVICE_NOT_AVAILABLE(服务不可用):表示消息服务器不可用。这可能是由于网络故障或者消息服务器宕机引起的。

请注意,以上状态码仅适用于RocketMQ消息发送阶段,并且并不代表消息是否成功被消费者接收。同时,这些状态码也可能因版本变化而有所不同,建议查阅官方文档获取最新信息。

2)消费测试拨测:

l  选择消息顺序,下拉选择无序/有序,默认选项为无序。

RocketMQ是一种开源的分布式消息中间件,它支持有序消息和无序消息。

² 有序消息是指消息的消费顺序与发送顺序完全一致。在某些业务场景下,消息的处理需要保证顺序性,例如订单的处理或者任务的执行。RocketMQ提供了有序消息的支持,通过指定消息的顺序属性或使用消息队列的分区机制,可以确保消息按照指定的顺序进行消费。

² 无序消息则是指消息的消费顺序与发送顺序无关。无序消息的特点是高吞吐量和低延迟,适用于一些不要求严格顺序的业务场景,如日志收集等。

在RocketMQ中,有序消息和无序消息的实现方式略有不同。有序消息需要借助MessageQueue的分区机制和消费者端的顺序消息消费来实现。而无序消息则是通过消息的发送和接收的并发处理来实现的。

总的来说,RocketMQ既支持有序消息也支持无序消息,根据业务需求选择合适的消息类型来满足业务的要求。

l  选择消费方式,目前仅提供pull方式。值得注意的是,RocketMQ还提供了推送(push)方式的消费模式,其中消息队列服务器会主动将消息推送给消费者。但在当前仅限于pull方式的消费模式。

l  填写消费数量。

l  下拉选择选择已建的消息主题和订阅组,若无选项,请新增主题和订阅组,详见上文创建主题和订阅组

l  点击【测试】按钮,按照已填写规格及数量产生消费数据,展示消息数据的信息,包括消息ID(messageID)、主题名称(topicName)、生成时间、存储时间、队列ID、消费状态。

拨测功能涉及消息消费状态码,RocketMQ消费状态码是指在消息消费过程中,对消费结果进行标识的状态码。以下是常见的RocketMQ消费状态码:

² CONSUME_SUCCESS(消费成功):表示消息成功被消费。

² RECONSUME_LATER(稍后重试):表示消费失败,需要稍后再次进行消费。

² CONSUME_FAILURE(消费失败):表示消息消费出现异常或失败。

² SLAVE_NOT_AVAILABLE(从节点不可用):表示消费者无法访问从节点来消费消息。

² NO_MATCHED_MESSAGE(无匹配的消息):表示当前没有匹配的消息需要消费。

² OFFSET_ILLEGAL(偏移量非法):表示消费的偏移量参数不合法。

² BROKER_TIMEOUT(Broker超时):表示由于Broker超时导致消费失败。

5、 用户应用按照规范接入RocketMQ,发送、消费消息。

1)生产者示例API

SDK下载方式详见环境准备-其他工具章节。

生产示例代码

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;


public class Producer {
   public static void main(String[] args) throws MQClientException, InterruptedException {

AclClientRPCHook rpcHook = new AclClientRPCHook(
           new SessionCredentials(ACCESS_KEY, SECRET_KEY));
       DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", rpcHook);
       // 填入元数据地址
       producer.setNamesrvAddr("192.168.0.1:9876");
       //producer.setUseTLS(true);    //如果需要开启SSL,请增加此行代码
       producer.start();

       for (int i = 0; i < 128; i++)
           try {
               {
                   Message msg = new Message("TopicTest",
                       "TagA",
                       "OrderID188",
                       "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                   SendResult sendResult = producer.send(msg);
                   System.out.printf("%s%n", sendResult);
               }

           } catch (Exception e) {
               e.printStackTrace();
           }

       producer.shutdown();
   }
}

示例参数说明:

Namesrv地址

Namesrv地址可从控制台查看,多个地址按分号分隔:

应用用户和密码

应用用户和密码就是控制台创建的应用用户和密码。

租户id和集群名

集群名和租户id可以从应用用户管理查询:

生产组

生产组名不需要提前创建,只需创建生产者时候配置,服务端会自动创建。建议按业务规划好生产组名,严禁按随机方式生成生产组名。

6)消费者示例API

SDK下载方式详见环境准备-其他工具章节。

消费示例代码

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;

public class PushConsumer {

   public static void main(String[] args) throws Exception {
   AclClientRPCHook rpcHook = new AclClientRPCHook(
           new SessionCredentials(ACCESS_KEY, SECRET_KEY));
   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
   consumer.setConsumerGroup("ConsumerGroupName");
       // 填入元数据地址
       consumer.setNamesrvAddr("192.168.0.1:9876");
       //consumer.setUseTLS(true);    //如果需要开启SSL,请增加此行代码
       consumer.subscribe("TopicTest", "*");
       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
           System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       });
       consumer.start();
       System.out.printf("Consumer Started.%n");
   }
}

示例参数说明:

Namesrv地址

Namesrv地址可从控制台查看,多个地址按分号分隔。

应用用户和密码

应用用户和密码就是控制台创建的应用用户和密码。

租户id和集群名

集群名和租户id可以从应用用户管理查询。

订阅组

订阅组名需要在控制台提前创建好。


i84CKery24fv