背景信息
RocketMQ的生产消费验证是指在使用RocketMQ进行消息生产和消费时的验证过程。具体而言,验证包括以下几个方面:
l 生产者验证:RocketMQ提供了丰富的生产者API,开发人员可以使用这些API将消息发送到RocketMQ的消息队列中。在验证阶段,可以通过发送消息并检查返回结果来确保消息成功发送到Broker节点。此外,生产者还应该验证消息的顺序性、事务性以及可靠性等方面。
l 消费者验证:RocketMQ的消费者可以订阅特定的消息主题,从而消费这些主题下的消息。在验证阶段,消费者应该能够正确地从Broker节点拉取消息并进行消费处理。消费者还可以验证消息的顺序性、重试机制以及消息过滤等功能。
操作步骤
1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。
2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。
进入实例列表,点击【管理】按钮进入管理菜单。
3、 进入实例列表,点击【管理】按钮进入管理菜单。
4、 进入主题管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。
1)生产测试拨测:
l 选择消息类型,默认普通消息。
l 填写需要产生的测试消息数量,以及每条消息的大小,默认每条消息1KB,建议不超过4MB(4096KB)。
l 选择已建的消息主题,若无选项,请新增主题,详见上文创建主题和订阅组。
l 点击【测试】按钮,按照已填写规格及数量产生测试消息数据,展示消息数据的信息,包括消息ID(messageID)、发送状态、主题名(topic名)、Broker名、队列ID。
拨测功能涉及消息发送状态码,以下是RocketMQ消息发送状态码及其说明:
² SEND_OK(发送成功):表示消息成功发送到了消息服务器。
² FLUSH_DISK_TIMEOUT(刷新磁盘超时):表示消息已经成功发送到消息服务器,但是刷新到磁盘上超时。这可能会导致消息服务器在宕机后,尚未持久化到磁盘上的数据丢失。
² FLUSH_SLAVE_TIMEOUT(刷新从服务器超时):表示消息已经成功发送到消息服务器,但是刷新到从服务器上超时。这可能会导致主从同步不一致。
² SLAVE_NOT_AVAILABLE(从服务器不可用):表示消息已经成功发送到消息服务器,但是从服务器不可用。这可能是由于网络故障或从服务器宕机引起的。
² UNKNOWN_ERROR(未知错误):表示发送消息时遇到了未知的错误。一般情况下建议重试发送消息。
² MESSAGE_SIZE_EXCEEDED(消息大小超过限制):表示消息的大小超过了消息服务器的限制。需要检查消息的大小是否合适。
² PRODUCE_THROTTLE(消息生产被限流):表示消息生产者的频率超出了消息服务器的限制。这可能是由于消息发送频率过高引起的。
² SERVICE_NOT_AVAILABLE(服务不可用):表示消息服务器不可用。这可能是由于网络故障或者消息服务器宕机引起的。
请注意,以上状态码仅适用于RocketMQ消息发送阶段,并且并不代表消息是否成功被消费者接收。同时,这些状态码也可能因版本变化而有所不同,建议查阅官方文档获取最新信息。
2)消费测试拨测:
l 选择消息顺序,下拉选择无序/有序,默认选项为无序。
RocketMQ是一种开源的分布式消息中间件,它支持有序消息和无序消息。
² 有序消息是指消息的消费顺序与发送顺序完全一致。在某些业务场景下,消息的处理需要保证顺序性,例如订单的处理或者任务的执行。RocketMQ提供了有序消息的支持,通过指定消息的顺序属性或使用消息队列的分区机制,可以确保消息按照指定的顺序进行消费。
² 无序消息则是指消息的消费顺序与发送顺序无关。无序消息的特点是高吞吐量和低延迟,适用于一些不要求严格顺序的业务场景,如日志收集等。
在RocketMQ中,有序消息和无序消息的实现方式略有不同。有序消息需要借助MessageQueue的分区机制和消费者端的顺序消息消费来实现。而无序消息则是通过消息的发送和接收的并发处理来实现的。
总的来说,RocketMQ既支持有序消息也支持无序消息,根据业务需求选择合适的消息类型来满足业务的要求。
l 选择消费方式,目前仅提供pull方式。值得注意的是,RocketMQ还提供了推送(push)方式的消费模式,其中消息队列服务器会主动将消息推送给消费者。但在当前仅限于pull方式的消费模式。
l 填写消费数量。
l 下拉选择选择已建的消息主题和订阅组,若无选项,请新增主题和订阅组,详见上文创建主题和订阅组。
l 点击【测试】按钮,按照已填写规格及数量产生消费数据,展示消息数据的信息,包括消息ID(messageID)、主题名称(topicName)、生成时间、存储时间、队列ID、消费状态。
拨测功能涉及消息消费状态码,RocketMQ消费状态码是指在消息消费过程中,对消费结果进行标识的状态码。以下是常见的RocketMQ消费状态码:
² CONSUME_SUCCESS(消费成功):表示消息成功被消费。
² RECONSUME_LATER(稍后重试):表示消费失败,需要稍后再次进行消费。
² CONSUME_FAILURE(消费失败):表示消息消费出现异常或失败。
² SLAVE_NOT_AVAILABLE(从节点不可用):表示消费者无法访问从节点来消费消息。
² NO_MATCHED_MESSAGE(无匹配的消息):表示当前没有匹配的消息需要消费。
² OFFSET_ILLEGAL(偏移量非法):表示消费的偏移量参数不合法。
² BROKER_TIMEOUT(Broker超时):表示由于Broker超时导致消费失败。
5、 用户应用按照规范接入RocketMQ,发送、消费消息。
1)生产者示例API
SDK下载方式详见环境准备-其他工具章节。
生产示例代码
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
AclClientRPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ACCESS_KEY, SECRET_KEY));
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", rpcHook);
// 填入元数据地址
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); //如果需要开启SSL,请增加此行代码
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
示例参数说明:
Namesrv地址
Namesrv地址可从控制台查看,多个地址按分号分隔:
应用用户和密码
应用用户和密码就是控制台创建的应用用户和密码。
租户id和集群名
集群名和租户id可以从应用用户管理查询:
生产组
生产组名不需要提前创建,只需创建生产者时候配置,服务端会自动创建。建议按业务规划好生产组名,严禁按随机方式生成生产组名。
6)消费者示例API
SDK下载方式详见环境准备-其他工具章节。
消费示例代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
public class PushConsumer {
public static void main(String[] args) throws Exception {
AclClientRPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ACCESS_KEY, SECRET_KEY));
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
consumer.setConsumerGroup("ConsumerGroupName");
// 填入元数据地址
consumer.setNamesrvAddr("192.168.0.1:9876");
//consumer.setUseTLS(true); //如果需要开启SSL,请增加此行代码
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
示例参数说明:
Namesrv地址
Namesrv地址可从控制台查看,多个地址按分号分隔。
应用用户和密码
应用用户和密码就是控制台创建的应用用户和密码。
租户id和集群名
集群名和租户id可以从应用用户管理查询。
订阅组
订阅组名需要在控制台提前创建好。