场景说明
RocketMQ提供了两种连接:
l 生产者连接:生产者通过调用RocketMQ提供的API,与RocketMQ代理(Broker)建立连接。生产者连接主要用于发送消息到RocketMQ。
l 消费者连接:消费者通过调用RocketMQ提供的API,与RocketMQ代理(Broker)建立连接。消费者连接主要用于从RocketMQ订阅消息并进行消费。
RocketMQ采用基于TCP/IP的通信协议,使用长连接方式进行连接。在建立连接之前,需要配置正确的RocketMQ服务端地址和端口,并使用相应的身份验证信息(如AccessKey和SecretKey)进行认证。
建立连接后,RocketMQ客户端可以通过连接发送消息到RocketMQ集群,并从集群中接收消息进行消费。连接的建立和维护是RocketMQ消息传递的基础,确保了消息的可靠传递和高效处理。
操作步骤
首先需要在RocketMQ控制台实例详情记录下集群的接入点信息,目前支持namesrv的内网访问,将该地址作为客户端程序namesrv地址的参数,具体生产消费消息的客户端示例代码请参考快速入门-生产消费验证。
1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。
2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。
进入实例列表,点击【管理】按钮进入管理菜单。
3、 进入实例列表,点击【管理】按钮进入管理菜单。
4、 进入主题管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。
5、 用户应用按照规范接入RocketMQ,发送、消费消息。
连接未开启SSL的RocketMQ实例
在连接未开启SSL的RocketMQ实例时,使用如下代码进行消息的收发。
生产消息:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 填入元数据地址
producer.setNamesrvAddr("192.168.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();消费消息:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 填入元数据地址
consumer.setNamesrvAddr("192.168.0.1:9876");
consumer.subscribe("TopicTest", "*") consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();连接已开启SSL的RocketMQ实例
在连接已开启SSL的RocketMQ实例时,使用如下代码进行消息的收发。
生产消息:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 填入元数据地址
producer.setNamesrvAddr("192.168.0.1:9876");
producer.setUseTLS(true);
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();消费消息:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 填入元数据地址
consumer.setNamesrvAddr("192.168.0.1:9876");
consumer.setUseTLS(true);
consumer.subscribe("TopicTest", "*") consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();