This section describes how consumer reconnects after node restart, taking the RabbitMQ client amqp-client used in Java as an example.
The amqp-client comes with a reconnection mechanism, but this mechanism only retries once, and will not be executed again after it fails to connect. At this time, if the consumer does not have an additional retry mechanism, then the consumer will completely lose the ability to consume.
After the node is disconnected, amqp-client will generate different errors depending on the node established with the channel.
If the channel is connected to the node where the queue is located, the consumer will receive a shutdown signal. At this time, the amqp client's reconnection mechanism will take effect, attempting to reconnect to the server. If connected, this channel will continue to be connected for consumption. If not, the channel.close method will be executed to close this channel.
If the channel is not connected to the node where the queue is located, the consumer will not trigger a shutdown action, but a cancellation action sent by the server, which is not an abnormal behavior for amqp-client, so there will be no obvious error on the log, but the connection will eventually be closed.
When amqp-client encounters the above two errors, it will call back the handleShutdownSignal and handleCancel methods respectively. By rewriting these two methods and executing the rewritten reconnection logic during the callback, you can create a new channel for the consumer to continue consumption after the channel is closed.
The following provides a simple code example that can resolve the two errors above and enable continuous consumption.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class MyRabbitConsumer {
public static void main(String... args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.x.x");
factory.setPort(5672);
factory.setUsername("name");
factory.setPassword("password");
Connection connection = factory.newConnection();
createNewConnection(connection);
}
public static void createNewConnection(Connection connection) {
try {
Channel channel = connection.createChannel();
channel.basicQos(64);
channel.basicConsume("queue-1", false, new CustomConsumer(channel, connection));
} catch (Exception e) {
createNewConnection(connection);
}
}
static class CustomConsumer implements Consumer {
private final Channel _channel;
private final Connection _connection;
public CustomCons