分布式消息服务RabbitMQ

消息幂等

2024-06-27 02:56:49

如果消息重复消费会影响您的业务处理,要对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。

概念

在消息领域,幂等是指Consumer重复消费某条消息时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响。

例如,在支付场景下,Consumer消费扣款消息,对一笔订单执行扣款操作,扣款金额为500元。如果因网络不稳定等原因导致扣款消息重复投递,Consumer重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费500元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消息幂等。

适用场景

在互联网应用中,尤其在网络不稳定的情况下,分布式消息服务RabbitMQ的消息有可能会出现重复。如果消息重复消费会影响您的业务处理,请对消息做幂等处理。消息重复的可能原因如下:

l  发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。

l  投递时消息重复

消息消费的场景下,消息已投递到Consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,分布式消息服务RabbitMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息。

l  负载均衡时消息重复(包括但不限于网络抖动、服务端重启以及Consumer应用重启)

当分布式消息服务RabbitMQ的服务端或客户端重启、扩容或缩容时,会触发Rebalance,此时Consumer可能会收到重复消息。

处理方法

以Message ID为幂等键对消息进行幂等处理的步骤如下:

(1)在数据库中创建一张unique key索引为唯一Message ID的表。

(2)在Producer客户端为每条消息设置唯一Message ID。

设置唯一Message ID的示例代码如下:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
channel.basicPublish("ExchangeName", "RoutingKey", true, props, ("消息发送" + i).getBytes());

(3)在Consumer客户端根据唯一Message ID对消息进行幂等处理。

根据唯一Message ID进行幂等处理的示例代码如下:

channel.basicConsume(Producer.QueueName, false, "MyConsumerTag",
    new DefaultConsumer(channel) {
    @Override public void handleDelivery(String consumerTag, Envelope env,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 1. 获取业务唯一性索引数据。
        try{
            String messageId = properties.getMessageId();
            // Message ID或者其他作为unique key的信息。
            // 2. 开启数据库事务。
            idempTable.insert(messageId);
            // 3. 对接收到的消息,进行业务逻辑处理。
            // 4. 提交或回滚事务。// 处理成功,则进行ACK,否则不要进行ACK。
            channel.basicAck(env.getDeliveryTag(), false);
        }
        catch (数据库主键冲突异常 e){
            // 重复消息,直接确认掉。
            channel.basicAck(env.getDeliveryTag(), false);
        }
    }
}
);


Fe5Bgk3pSs5V