Distributed Message Service RabbitMQ

Message Idempotence

2024-06-27 07:03:58

If repeated consumption of messages affects your business processing, you should perform idempotent processing on messages. This section describes the concept, application scenarios, and solutions of message idempotence.

Concept

In the message domain, idempotent means that when a consumer consumes a message repeatedly, the result of repeated consumption is the same as that of consuming it once, and the multiple consumption does not have any negative impact on the business system.

For example, in a payment scenario, the consumer consumes a deduction message and deducts the payment amount of RMB 500 for an order. If the deduction message is repeatedly delivered due to network instability or other reasons, the consumer consumes the deduction message repeatedly, but the final business result is that the deduction is only once, with a deduction of RMB 500. The corresponding order in the user's deduction record has only one deduction flow, and the fee will not be deducted multiple times. Then this deduction operation is compliant and the whole consumption process realizes message idempotency.

Application Scenario

In Internet applications, especially when the network is unstable, messages from DMS RabbitMQ may be duplicated. If repeated consumption of messages affects your business processing, you should perform idempotent processing on messages. Possible reasons for duplicate messages are:

l  Duplicate messages when sending

When a message has been successfully sent to the server and completed persistence, a network flash occurs or the client is down, causing the server to fail to respond to the client. If the producer realizes that the message failed and tries to send the message again, the consumer will then receive two messages with the same content and the same Message ID.

l  Duplicate messages on delivery

In the message consumption scenario, the message has been delivered to the Consumer and business processing completed, when the client responds to the server with feedback, a network crash occurs. In order to ensure that the message is consumed at least once, the server of the DMS RabbitMQ will try to deliver the previously processed message again after the network is restored, and the consumer will then receive two messages with the same content and the same Message ID.

l  Duplicate messages during load balancing (including but not limited to network jitter, server restart, and consumer application restart)

When the server or client of the DMS RabbitMQ restarts, scale-ups or scale-downs, Rebalance will be triggered, and the consumer may receive duplicate messages at this time.

Solution

The steps for idempotent processing of a message using the Message ID as the idempotent key are as follows:

(1) Create a table in the database with a unique key index as the unique Message ID.

(2) Set a unique Message ID for each message in the producer client.

The sample code for setting a unique Message ID is as follows:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("ExchangeName", "RoutingKey", true, props, ("MessageSending" + i).getBytes());

(3) Idempotent processing of a message based on a unique Message ID at the consumer client.

The sample code for idempotent processing based on the unique Message ID is as follows:

channel.basicConsume(Producer.QueueName, false, "MyConsumerTag",
    new DefaultConsumer(channel) {
    @Override public void handleDelivery(String consumerTag, Envelope env,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 1. Obtain business uniqueness index data.
        try{
            String messageId = properties.getMessageId();
            // Message ID or other information used as the unique key.
            // 2. Enable database transactions.
            idempTable.insert(messageId);
            // 3. Business logic processing of received messages.
            // 4. Submit or roll back transactions.// If the processing is successful, proceed with Ack; otherwise, do not proceed with Ack.
            channel.basicAck(env.getDeliveryTag(), false);
        }
        catch (database primary key conflict exception e){
            // Duplicate message, direct acknowledgment.
            channel.basicAck(env.getDeliveryTag(), false);
        }
    }
}
);


beunRAMwolQa