Background
RocketMQ verifies the process for producing and consuming messages. Specifically, the verification includes the following aspects:
l Producer verification: RocketMQ provides diverse producer APIs for developers to send messages to RocketMQ's message queues. During verification, you can ensure that messages are successfully sent to the broker by sending messages and checking the returned result. Furthermore, producers verify the sequence, transaction, and reliability of messages.
l Consumer verification: RocketMQ consumers can subscribe to specific message topics to consume their messages. During verification, consumers can properly pull messages from the broker. Consumers can also verify features such as message sequence, retry mechanisms, and message filtering.
Procedure
1. On the eSurfing Cloud official website, click Control Center and select RocketMQ.
2. Log in to the DMS RocketMQ console and click Region in the upper right corner to select the corresponding resource pool.
Go to the instance list and click Manage to enter the management menu.
3. Go to the instance list and click Manage to enter the management menu.
4. Go to the topic management menu. Click Dialing Test to perform a production and consumption dialing test, and verify the activated message instances and topics.
(1) Production dialing test:
l Select the message type. By default, normal messages are selected.
l Enter the number of test messages to be generated and the size of each message. Each message is 1KB by default. You are not advised to set the size to greater than 4MB (4,096KB).
l Select an existing message topic. If there is no topic, create a new topic. For details, see Creating Topics and Subscription Groups.
l Click Test to generate test message data based on the entered specifications and number. The information of the message data is displayed, including message ID, sending status, topic name, broker name, and queue ID.
The dialing test function involves the status codes for message sending, which are explained as follows:
² SEND_OK: Indicates that messages are successfully sent to the message server.
² FLUSH_DISK_TIMEOUT: Indicates that messages are successfully sent to the message server, but disk flushing timed out. If the message server crashes, data that has not been persisted to disks may be lost.
² FLUSH_SLAVE_TIMEOUT: Indicates that messages are successfully sent to the message server, but slave server flushing timed out. This may cause inconsistency in master/slave synchronization.
² SLAVE_NOT_AVAILABLE: Indicates that messages are successfully sent to the message server, but the slave server is unavailable. The failure may be caused by a network problem or a server crash.
² UNKNOWN_ERROR: Indicates that an unknown error occurred during message sending. In this case, it is recommended to re-send messages.
² MESSAGE_SIZE_EXCEEDED: Indicates that the message size exceeds the limit allowed by the message server. You need to check the message size.
² PRODUCE_THROTTLE: Indicates that the message sending frequency of the producer exceeds the limit allowed by the message server. This may be caused by messages sent too frequently.
² SERVICE_NOT_AVAILABLE: Indicates that the message server is unavailable. The failure may be caused by a network problem or message server crash.
Note: The status codes apply only to the message sending phase of RocketMQ and do not indicate that a message is successfully received by the consumer. Meanwhile, these status codes may vary for different versions. You can consult official documentation for the latest information.
(2) Consumption dialing test:
l Choose unordered/ordered messages from the drop-down menu. Unordered messages are selected by default.
RocketMQ is an open-source distributed message middleware that supports unordered and ordered messages.
² Ordered messages are those that are consumed in the exact order as they are sent. In certain business scenarios, messages must be processed sequentially, such as order processing or task execution. RocketMQ supports ordered messages and ensures that messages are consumed in the specified order by specifying order properties or using partitions of message queues.
² Unordered messages are those that are consumed regardless of the order in which they are sent. Unordered messages are characterized by high throughput and low latency, and apply to service scenarios requiring no strict sequence, such as log collection.
In RocketMQ, ordered and unordered messages are implemented slightly differently. Ordered messages are implemented using the partitioning mechanism of the message queue and the sequential message consumption on consumers. Unordered messages are realized through concurrent sending and receiving of messages.
To sum up, RocketMQ supports both ordered and unordered messages. You can select a proper message type based on business requirements.
l Currently, you can only choose the pull consumption method. RocketMQ also provides the push consumption mode in which the message queue server proactively pushes messages to consumers. Currently, only the pull consumption mode is available.
l Enter the consumption quantity.
l In the drop-down list, select an existing topic and subscription group. If there is no topic or subscription group, create a new one. For details, see Creating Topics and Subscription Groups.
l Click Test to generate consumption data based on the entered specifications and number. The information of the message data is displayed, including the message ID, topic name, generation time, storage time, queue ID, and consumption status.
The dialing test function involves message consumption status codes that are used to identify the consumption results during message consumption. The following shows the common consumption status codes of RocketMQ:
² CONSUME_SUCCESS: Indicates that a message is successfully consumed.
² RECONSUME_LATER: Indicates that a message fails to be consumed and needs to be re-consumed later.
² CONSUME_FAILURE: Indicates that a message has an error in consumption or fails to be consumed.
² SLAVE_NOT_AVAILABLE: Indicates that consumers cannot access the slave node to consume messages.
² NO_MATCHED_MESSAGE: Indicates that no matched messages can be consumed.
² OFFSET_ILLEGAL: Indicates that the offset parameter of consumption is invalid.
² BROKER_TIMEOUT: Indicates that consumption failed due to broker timeout.
5. User can connect applications to RocketMQ as per relevant specifications to send and consume messages.
(1) API examples for producers
For details about how to download the SDK, see Preparing the Environment - Other Tools.
Production code example
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
AclClientRPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ACCESS_KEY, SECRET_KEY));
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", rpcHook);
// Enter the metadata address
producer.setNamesrvAddr("192.168.0.1:9876");
//producer.setUseTLS(true); //To enable SSL, add this code line
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();Parameter example:
Namesrv Address
You can view Namesrv addresses on the Console. Multiple addresses are separated by semicolons.
Application User and Password
The application user and password are those created on the Console.
Tenant ID and Cluster Name
You can query the cluster name and tenant ID on the application user management page.
Producer Group
You can configure the name of a producer group when creating a producer or the server will automatically create the name. You are recommended to plan the producer group names according to your business needs, and are not advised to use the names that are generated randomly.
(6) API examples for consumers
For details about how to download the SDK, see Preparing the Environment - Other Tools.
Consumption code example
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
public class PushConsumer {
public static void main(String[] args) throws Exception {
AclClientRPCHook rpcHook = new AclClientRPCHook(
new SessionCredentials(ACCESS_KEY, SECRET_KEY));
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
consumer.setConsumerGroup("ConsumerGroupName");
// Enter the metadata address
consumer.setNamesrvAddr("192.168.0.1:9876");
//consumer.setUseTLS(true); //To enable SSL, add this code line
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}Parameter example:
Namesrv Address
You can view Namesrv addresses on the Console. Multiple addresses are separated by semicolons.
Application User and Password
The application user and password are those created on the Console.
Tenant ID and Cluster Name
You can query the cluster name and tenant ID on the application user management page.
Subscription Group
You need to create the subscription group name in advance on the Console.