Distributed Message Service RabbitMQ

Compilation Project Production and Consumption

2024-06-27 06:28:16

Before You Begin

Prepare for the DMS RabbitMQ environment and create related resources.

Procedure

RabbitMQ is an open-source message queue middleware that supports asynchronous communication between producers and consumers. After the above resources are prepared, the next step is to compile the project for production and consumption, which is mainly divided into the following steps:

1. Writing producer code: Write a producer program using programming language. This program will connect to the RabbitMQ server and send messages to the queue.

2. Writing consumer code: Write a consumer program using programming language. This program will connect to the RabbitMQ server and receive messages from the queue.

3. Running Producer and Consumer: Run the producer program, which will send messages to the queue. Then run the consumer program, which will receive and process messages from the queue.

4. Verifying results: Check the output of producer and consumer programs to ensure that messages are sent and received correctly.

Introduce Dependencies

When using RabbitMQ, you need to introduce an appropriate dependency into your project. Specific dependencies may vary depending on your project and needs. Before using RabbitMQ, make sure to consult the official documentation for the latest dependencies and usage instructions.

Taking the Java programming language as an example, you can use the Java client library for RabbitMQ. You can add the following dependencies to your Maven or Gradle build tools:

1.  <dependency>  
2.      <groupId>com.rabbitmq</groupId>  
3.      <artifactId>amqp-client</artifactId>  
4.      <version>5.7.0</version>  
5.  </dependency>

You can introduce dependencies by downloading JAR packages.

Bind BindingKey

In RabbitMQ, Binding Key is the keyword used to bind the Exchange and the Queue. When a message is sent to the exchange, the exchange routes the message to the appropriate queue based on the binding key.

Binding key is specified when the binding is created, and it defines how the message should be routed to the queue. Binding key typically matches the properties or contents of the message to determine which queue the message should be sent to.

Binding key can come in different forms, depending on the type of exchange used. Here are some common forms of binding key:

l  Direct Match: If the binding key exactly matches the routing key of the message, the message will be routed to the corresponding queue.

l  Wildcard Match: Binding key can be pattern matched using wildcards. Common wildcards include * and #, where * represents matching one word and # represents matching zero or more words.

l  Topic Match: Binding key can be matched using topic patterns. Topic patterns use words separated by period (.) and can contain * and # wildcards. For example, stock.# can match stock.price and stock.quantity.

The choice of binding key depends on your needs and the routing strategy of the message. By setting the binding key correctly, you can ensure that messages are correctly routed to the appropriate queue for processing by the consumer.

Code example:

import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
public class RabbitmqBindingKey {  
    private final static String EXCHANGE_NAME = "exchangeTest";  
    private final static String QUEUE_NAME = "helloMQ";  
    private final static String ROUTING_KEY = "test";  
    public static void main(String[] args) throws IOException,            TimeoutException {  
        // Create connection factory  
        ConnectionFactory factory = new ConnectionFactory();  
         // Set host ip  
        factory.setHost("192.168.3.113");  
        // Set port number of amqp  
        factory.setPort(5672);  
        // Set user name and password  
        factory.setUsername("rabbitmq");  
        factory.setPassword("r@bb!tMQ#3333323");  
          // To set up Vhost, you need to create the console first  
        factory.setVirtualHost("vhost");  
         // Set the timeout period based on network environment  
        factory.setConnectionTimeout(30 * 1000);  
        factory.setHandshakeTimeout(30 * 1000);  
        factory.setShutdownTimeout(0);  
          Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        channel.exchangeDeclare(EXCHANGE_NAME,      BuiltinExchangeType.DIRECT, true);  
          // Create ${QueueName}. Queues can be created in the console or using API  
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
          // Queue binds to Exchange and registers BindingKeyTest  
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  
          connection.close();  
      }  
 }

Once completed, the results can be viewed on the Exchanges tab and Queues tab of the instance list.

Producing Messages

The producer needs to create a connection to the RabbitMQ server, and then create a channel to publish the messages. Before publishing messages, the producer usually needs to declare a queue to ensure that messages can be routed and received correctly.

Once the connection and channel are established, the producer can use the basicPublish() method to publish messages to the specified queue. When publishing messages, you need to specify the name of the target queue, message content, and other attributes.

After publishing the message, RabbitMQ will store the message in the queue and wait for the consumer to receive it. Consumer can use the same client library to create connections and channels, and use the basicConsume() method to subscribe to the queue and receive messages. Once a message reaches the queue, the consumer receives the message and processes it accordingly.

By using RabbitMQ, producer and consumer can be decoupled, that is, they can be developed and deployed independently. Producers can publish messages at their own pace and needs, while consumers can receive and process messages according to their own processing capacity and load.

Code example:

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import java.io.IOException;  
import java.nio.charset.StandardCharsets;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
public class RabbitmqProducer {  
    // private final static String EXCHANGE_NAME = "exchangeTest";  
    private final static String QUEUE_NAME = "helloMQ";  
    // private final static String ROUTING_KEY = "test";  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        // Create connection factory  
        ConnectionFactory factory = new ConnectionFactory();  
          // Set host ip  
        factory.setHost("192.168.3.113");  
        // Set port number of amqp  
        factory.setPort(5672);  
        // Set user name and password  
        factory.setUsername("username");  
        factory.setPassword("password");  
          // To set up Vhost, you need to create the console first  
        factory.setVirtualHost("test");  
          // Set the timeout period based on network environment  
        factory.setConnectionTimeout(30 * 1000);  
        factory.setHandshakeTimeout(30 * 1000);  
        factory.setShutdownTimeout(0);  
          // Create a connection  
        Connection connection = factory.newConnection();  
          // Create a channel  
        Channel channel = connection.createChannel();  
         // Sender message acknowledgment, channel.confirmSelect ();  
        // Enable sender transaction mechanism, channel.txSelect();   
        // Specify a queue  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        for (int i = 0; i < 100; i++) {  
          // Messages sent  
          String message = "Hello rabbitMQ!_" + i;  
          // Send a message to the queue using the default exchange  
          channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));  
          // To use a custom exchange, you need to build it in advance on the management console and set the routing key  
          // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));  
          System.out.println(" [x] Sent '" + message + "'");  
          TimeUnit.MILLISECONDS.sleep(100);  
        }  
          // Close channel and connection  
        channel.close();  
        connection.close();  
      }  
}

After the message is sent, you can enter the console and check the message sending status on the queue tab of the instance list.

Consuming Messages

The consumer needs to create a connection to the RabbitMQ server, and then create a channel to subscribe the messages. Before subscribing to a message, the consumer usually needs to declare a queue to ensure that the message can be received and processed correctly.

Once the connection and channel are established, the consumer can use the basicConsume() method to subscribe to the specified queue and register a callback function to process the received messages. When a message reaches the queue, RabbitMQ will push the message to the consumer, and the consumer's callback function will be called to process the message.

Consumers can set up the message ack mechanism according to their own needs. By default, upon receiving a message, the consumer automatically sends an ack message to RabbitMQ, indicating that the message has been successfully received and processed. If an error occurs while a consumer is processing the message, it can choose not to send the ack message, causing the message to be re-queued for reprocessing by other consumers.

By using RabbitMQ, consumers can be decoupled, that is, they can be developed and deployed independently. Consumers can receive and process messages according to their processing capacity and load, thus achieving load balancing and horizontal expansion.

Code example:

import com.rabbitmq.client.*;  
import java.io.IOException;  
import java.nio.charset.StandardCharsets;  
import java.util.concurrent.TimeoutException;  
public class RabbitmqConsumer {  
    //Queue name  
    private final static String QUEUE_NAME = "helloMQ";  
    public static void main(String[] args) throws IOException, TimeoutException {  
     //Create connection factory  
      ConnectionFactory factory = new ConnectionFactory();  
     //Set host ip  
     factory.setHost("192.168.3.113");  
     //Set port number of amqp  
     factory.setPort(5672);  
     //Set user name and password  
     factory.setUsername("username");  
     factory.setPassword("password");  
     //To set up Vhost, you need to create the console first  
     factory.setVirtualHost("test");  
     // Set the timeout period based on network environment  
     factory.setConnectionTimeout(30 * 1000);  
     factory.setHandshakeTimeout(30 * 1000);  
     factory.setShutdownTimeout(0);  
     Connection connection = factory.newConnection();  
     Channel channel = connection.createChannel();  
      //Declaring a queue is to prevent the message receiver from running the program first, and creating a queue when the queue does not exist yet.  
     channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
     Consumer consumer = new DefaultConsumer(channel) {  
      @Override  
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        String message = new String(body, StandardCharsets.UTF_8);  
        System.out.println(" [x] Received '" + message + "'");  
         }  
     };  
     channel.basicConsume(QUEUE_NAME, true, consumer);  
   }  
 }

After completing the above steps, you can check whether the consumer started successfully on the console.

After completing the above steps, you have successfully accessed the RabbitMQ service and can send and subscribe messages using message queues.


WRVP9LycXNg1