分布式消息服务-RocketMQ

连接实例

2024-07-02 02:15:49

场景说明

RocketMQ提供了两种连接:

l  生产者连接:生产者通过调用RocketMQ提供的API,与RocketMQ代理(Broker)建立连接。生产者连接主要用于发送消息到RocketMQ。

l  消费者连接:消费者通过调用RocketMQ提供的API,与RocketMQ代理(Broker)建立连接。消费者连接主要用于从RocketMQ订阅消息并进行消费。

RocketMQ采用基于TCP/IP的通信协议,使用长连接方式进行连接。在建立连接之前,需要配置正确的RocketMQ服务端地址和端口,并使用相应的身份验证信息(如AccessKey和SecretKey)进行认证。

建立连接后,RocketMQ客户端可以通过连接发送消息到RocketMQ集群,并从集群中接收消息进行消费。连接的建立和维护是RocketMQ消息传递的基础,确保了消息的可靠传递和高效处理。

操作步骤

首先需要在RocketMQ控制台实例详情记录下集群的接入点信息,目前支持namesrv的内网访问,将该地址作为客户端程序namesrv地址的参数,具体生产消费消息的客户端示例代码请参考快速入门-生产消费验证

1、 天翼云官网点击控制中心,选择产品分布式消息服务RocketMQ。

2、 登录分布式消息服务RocketMQ控制台,点击右上角地域选择对应资源池。

进入实例列表,点击【管理】按钮进入管理菜单。

3、 进入实例列表,点击【管理】按钮进入管理菜单。

4、 进入主题管理菜单,点击【拨测】按钮,进行生产消费的拨测验证,验证开通的消息实例和主题。

5、 用户应用按照规范接入RocketMQ,发送、消费消息。

连接未开启SSL的RocketMQ实例

在连接未开启SSL的RocketMQ实例时,使用如下代码进行消息的收发。

生产消息:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 填入元数据地址
producer.setNamesrvAddr("192.168.0.1:9876");
producer.start();
for (int i = 0; i < 128; i++)
    try {
        {
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
 
    } catch (Exception e) {
        e.printStackTrace();
    }
producer.shutdown();

消费消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 填入元数据地址
consumer.setNamesrvAddr("192.168.0.1:9876");                            
consumer.subscribe("TopicTest", "*")                                                       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
consumer.start();

连接已开启SSL的RocketMQ实例

在连接已开启SSL的RocketMQ实例时,使用如下代码进行消息的收发。

生产消息:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 填入元数据地址
producer.setNamesrvAddr("192.168.0.1:9876");
producer.setUseTLS(true);
producer.start();
for (int i = 0; i < 128; i++)
    try {
        {
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
 
    } catch (Exception e) {
        e.printStackTrace();
    }
producer.shutdown();

消费消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 填入元数据地址
consumer.setNamesrvAddr("192.168.0.1:9876");
consumer.setUseTLS(true);                          
consumer.subscribe("TopicTest", "*")                                                       consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
consumer.start();


1pW14ZS0R8sM