Scenario Description
RocketMQ provides two connection methods:
l Producer connection: Producers establish a connection with the broker by calling the RocketMQ API. They are used to send messages to RocketMQ.
l Consumer connection: Consumers establish a connection with the broker by calling the RocketMQ API. They are used to subscribe to and consume messages from RocketMQ.
RocketMQ uses a TCP/IP-based communication protocol and persistent connections. Before establishing a connection, you need to configure the correct address and port for the RocketMQ server, and perform the authentication with the information such as AccessKey and SecretKey.
Once the connection is established, the RocketMQ client can send messages to the RocketMQ cluster over it and receive and consume messages from the cluster. The connections are the foundation of RocketMQ message delivery and ensure the reliable delivery and efficient processing of messages.
Procedure
Firstly, record the endpoint information of the cluster displayed in the instance details in the RocketMQ Console. Currently, it supports access to namesrv through intranets and uses this address as the namesrv address parameter of the client program. For specific client example codes of message production and consumption, refer to Quick Start - Production and Consumption Verification.
1. On the eSurfing Cloud official website, click Control Center and select RocketMQ.
2. Log in to the DMS RocketMQ console and click Region in the upper right corner to select the corresponding resource pool.
Go to the instance list and click Manage to enter the management menu.
3. Go to the instance list and click Manage to enter the management menu.
4. Go to the topic management menu. Click Dialing Test to perform a production and consumption dialing test, and verify the activated message instances and topics.
5. User can connect applications to RocketMQ as per relevant specifications to send and consume messages.
Accessing a RocketMQ Instance Without SSL Enabled
When connecting to a RocketMQ instance without SSL enabled, use the following code to send and receive messages.
Producing Messages:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// Enter the metadata address
producer.setNamesrvAddr("192.168.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();Consuming Messages:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// Enter the metadata address
consumer.setNamesrvAddr("192.168.0.1:9876");
consumer.subscribe("TopicTest", "*") consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();Accessing a RocketMQ Instance With SSL Enabled
When connecting to a RocketMQ instance with SSL enabled, use the following code to send and receive messages.
Producing Messages:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// Enter the metadata address
producer.setNamesrvAddr("192.168.0.1:9876");
producer.setUseTLS(true);
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();Consuming Messages:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// Enter the metadata address
consumer.setNamesrvAddr("192.168.0.1:9876");
consumer.setUseTLS(true);
consumer.subscribe("TopicTest", "*") consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();