分布式消息服务RabbitMQ

编译工程生产消费

2024-06-27 02:10:54

前提条件

完成分布式消息服务RabbitMQ相应环境准备工作并创建好相关资源。

操作步骤

RabbitMQ是一个开源的消息队列中间件,支持生产者和消费者之间的异步通信。在上述资源准备完成后,接下来需要编译工程生产消费,主要分以下几个步骤:

1、编写生产者代码:使用编程语言编写一个生产者程序。该程序将连接到RabbitMQ服务器,并将消息发送到队列中。

2、编写消费者代码:同样使用编程语言编写一个消费者程序。该程序将连接到RabbitMQ服务器,并从队列中接收消息。

3、运行生产者和消费者:运行生产者程序,它将发送消息到队列中。然后运行消费者程序,它将从队列中接收并处理消息。

4、验证结果:检查生产者和消费者程序的输出,确保消息被正确发送和接收。

引入依赖

在使用RabbitMQ时,你需要在你的项目中引入相应的依赖。具体的依赖项可能会因你的项目和需求而有所不同。在使用RabbitMQ之前,请确保查阅官方文档以获取最新的依赖项和使用说明。

以Java编程语言为例,可以使用RabbitMQ的Java客户端库。你可以在Maven或Gradle构建工具中添加以下依赖项:

1.            <dependency>  
2.                <groupId>com.rabbitmq</groupId>  
3.                <artifactId>amqp-client</artifactId>  
4.                <version>5.7.0</version>  
5.            </dependency>

可以通过下载JAR包来引入依赖。

绑定BindingKey

在RabbitMQ中,绑定键(Binding Key)是用于绑定交换机(Exchange)和队列(Queue)的关键字。当一个消息被发送到交换机时,交换机会根据绑定键将消息路由到相应的队列中。

绑定键是在创建绑定(Binding)时指定的,它定义了消息应该如何被路由到队列。绑定键通常与消息的属性或内容进行匹配,以确定消息应该发送到哪个队列。

绑定键可以具有不同的形式,取决于使用的交换机类型。以下是一些常见的绑定键形式:

l  接匹配(Direct Match):绑定键与消息的路由键(Routing Key)完全匹配时,消息会被路由到相应的队列。

l  通配符匹配(Wildcard Match):绑定键可以使用通配符进行模式匹配。常见的通配符有*和#,其中*表示匹配一个单词,#表示匹配零个或多个单词。

l  主题匹配(Topic Match):绑定键可以使用主题模式进行匹配。主题模式使用.分隔的单词,可以包含*和#通配符。例如,stock.#可以匹配stock.price、stock.quantity等。

绑定键的选择取决于你的需求和消息的路由策略。通过正确设置绑定键,你可以确保消息被正确地路由到相应的队列中,以便消费者进行处理。

代码示例:

import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
public class RabbitmqBindingKey {  
    private final static String EXCHANGE_NAME = "exchangeTest";  
    private final static String QUEUE_NAME = "helloMQ";  
    private final static String ROUTING_KEY = "test";  
    public static void main(String[] args) throws IOException,            TimeoutException {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
         // 设置主机ip  
        factory.setHost("192.168.3.113");  
        // 设置amqp的端口号  
        factory.setPort(5672);  
        // 设置用户名密码  
        factory.setUsername("rabbitmq");  
        factory.setPassword("r@bb!tMQ#3333323");  
          // 设置Vhost,需要在控制台先创建  
        factory.setVirtualHost("vhost");  
         //基于网络环境合理设置超时时间  
        factory.setConnectionTimeout(30 * 1000);  
        factory.setHandshakeTimeout(30 * 1000);  
        factory.setShutdownTimeout(0);  
          Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        channel.exchangeDeclare(EXCHANGE_NAME,      BuiltinExchangeType.DIRECT, true);  
          // 创建 ${QueueName}。Queue 可以在控制台创建,也可以用API创建  
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
          // Queue 与 Exchange进行绑定,注册 BindingKeyTest  
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  
          connection.close();  
      }  
 }

完成后,可以在实例列表的交换器选项卡和队列选项卡查看结果。

生产消息

生产者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的发布。在发布消息之前,生产者通常需要先声明一个队列,以确保消息能够被正确地路由和接收。

一旦连接和通道建立完成,生产者可以使用basicPublish()方法将消息发布到指定的队列。在发布消息时,需要指定目标队列的名称、消息内容以及其他的属性。

发布消息后,RabbitMQ将会将消息存储在队列中,等待消费者来接收。消费者可以使用相同的客户端库来创建连接和通道,并使用basicConsume()方法来订阅队列并接收消息。一旦有消息到达队列,消费者就会收到消息并进行相应的处理。

通过使用RabbitMQ,生产者和消费者可以实现解耦,即它们可以独立地进行开发和部署。生产者可以按照自己的节奏和需求发布消息,而消费者可以根据自己的处理能力和负载来接收和处理消息

代码示例:

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import java.io.IOException;  
import java.nio.charset.StandardCharsets;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
public class RabbitmqProducer {  
    // private final static String EXCHANGE_NAME = "exchangeTest";  
    private final static String QUEUE_NAME = "helloMQ";  
    // private final static String ROUTING_KEY = "test";  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        // 创建连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
          // 设置主机ip  
        factory.setHost("192.168.3.113");  
        // 设置amqp的端口号  
        factory.setPort(5672);  
        // 设置用户名密码  
        factory.setUsername("username");  
        factory.setPassword("password");  
          // 设置Vhost,需要在控制台先创建  
        factory.setVirtualHost("test");  
          //基于网络环境合理设置超时时间  
        factory.setConnectionTimeout(30 * 1000);  
        factory.setHandshakeTimeout(30 * 1000);  
        factory.setShutdownTimeout(0);  
          // 创建一个连接  
        Connection connection = factory.newConnection();  
          // 创建一个频道  
        Channel channel = connection.createChannel();  
         // 发送方消息确认,channel.confirmSelect();  
        // 启用发送方事务机制,channel.txSelect();   
        // 指定一个队列  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        for (int i = 0; i < 100; i++) {  
          // 发送的消息  
          String message = "Hello rabbitMQ!_" + i;  
          // 往队列中发送一条消息,使用默认的交换器  
          channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));  
          // 使用自定义交换器,需要在管理台预先建好,并设置routing key  
          // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));  
          System.out.println(" [x] Sent '" + message + "'");  
          TimeUnit.MILLISECONDS.sleep(100);  
        }  
          //关闭频道和连接  
        channel.close();  
        connection.close();  
      }  
}

消息发送后,可以进入控制台,在实例列表的队列选项卡查看消息发送状态。

消费消息

消费者需要创建一个连接到RabbitMQ服务器,然后创建一个通道(Channel)来进行消息的订阅。在订阅消息之前,消费者通常需要先声明一个队列,以确保能够正确地接收和处理消息。

一旦连接和通道建立完成,消费者可以使用basicConsume()方法来订阅指定的队列,并注册一个回调函数来处理接收到的消息。当有消息到达队列时,RabbitMQ会将消息推送给消费者,消费者的回调函数将被调用,从而可以对消息进行处理。

消费者可以根据自己的需求设置消息的确认机制。在默认情况下,消费者在接收到消息后,会自动向RabbitMQ发送一个确认(ack)消息,表示已成功接收并处理该消息。如果消费者在处理消息时发生错误,可以选择不发送确认消息,从而使消息重新进入队列,以便其他消费者重新处理。

通过使用RabbitMQ,消费者可以实现解耦,即它们可以独立地进行开发和部署。消费者可以根据自己的处理能力和负载来接收和处理消息,从而实现负载均衡和水平扩展。

代码示例:

import com.rabbitmq.client.*;  
import java.io.IOException;  
import java.nio.charset.StandardCharsets;  
import java.util.concurrent.TimeoutException;  
public class RabbitmqConsumer {  
    //队列名称  
    private final static String QUEUE_NAME = "helloMQ";  
    public static void main(String[] args) throws IOException, TimeoutException {  
     //创建连接工厂  
      ConnectionFactory factory = new ConnectionFactory();  
     //设置主机ip  
     factory.setHost("192.168.3.113");  
     //设置amqp的端口号  
     factory.setPort(5672);  
     //设置用户名密码  
     factory.setUsername("username");  
     factory.setPassword("password");  
     //设置Vhost,需要在控制台先创建  
     factory.setVirtualHost("test");  
     //基于网络环境合理设置超时时间  
     factory.setConnectionTimeout(30 * 1000);  
     factory.setHandshakeTimeout(30 * 1000);  
     factory.setShutdownTimeout(0);  
     Connection connection = factory.newConnection();  
     Channel channel = connection.createChannel();  
      //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。  
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
     Consumer consumer = new DefaultConsumer(channel) {  
      @Override  
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        String message = new String(body, StandardCharsets.UTF_8);  
        System.out.println(" [x] Received '" + message + "'");  
         }  
     };  
     channel.basicConsume(QUEUE_NAME, true, consumer);  
   }  
 }

完成上述步骤后,可以在控制台查看消费者是否启动成功。

完成以上所有步骤后,就成功接入了RabbitMQ服务,可以用消息队列进行消息发送和订阅了。


xc3eOZOGXp6B