Spring-boot Rabbit-MQ Retry Mechanism

We will try to understand the Spring boot Rabbit-MQ retry mechanism using a foreign exchange example. Suppose a Rabbit-MQ Producer needs a Foreign Exchange rate from INR to USD hence it produces a message to get the foreign exchange rate. The message produced will be of the form :

{
“sendCounntry”: “IN”,
“receiveCountry”: “US”,
“sendCurrency”: “INR”,
“receiveCcy”: “USD”
}

A Rabbit-MQ Consumer consumes the above message and makes a Rest API call to some Foreign Exchange ( FX ) API Service. Some third-party server is hosting this Foreign Exchange API. FX API will provide the rates to the Consumer and the Consumer will send those rates back to the Producer.

If the FX server is down then the Consumer will not get any response and there will be a Timeout Exception. Hence the Consumer will not have any exchange rate. The consumer will send a message back to the producer saying it got a Timeout from the FX server, so cannot provide you the exchange rates.

Now Producer receives the message and comes to know that FX Server is down. So the Producer will wait for some time and trigger a message again to the Consumer asking for rates, this time also it gets the same response ( the same response repeats again and again ). In this approach, the Producer has to produce the message multiple times, it’s an overhead for the Producer to produce the message again and again as an exception occurs every time.

Hence to overcome this, Spring Framework provides a retry mechanism that will take out the burden of continuous retry mechanism from the Producer. Now Spring framework on its own will manage the retry mechanism. With this Retry mechanism, Rabbit-Mq will produce the message to the Consumer in a specified time interval if any specified exception occurs. We will try to understand it with the help of an example but before that, we should be aware of some predefined Classes of the Spring framework which will help in the Retry mechanism.

Predefined classes for retry mechanism

Retry Operations Interceptor

– It helps to automatically retry the calls if the method fails with some exceptions
– It also helps to control the number of retries, we have to specify this number of retries in the Rabbit Template.

Retry Template

It helps out to define the retry operations like :
– Setting of Retry Listeners: It helps to choose a callback interceptor during the retry lifecycle.
– Setting of Back Off Policy: It helps to set the initial interval after which a retry can be done. It also helps to set the max interval at which we can stop the retries.

Simple Retry Policy

It helps to retry a message a specific number of times for a particular set of exceptions. It will help to execute the retry mechanism or callback at least once.

Republish Message Recoverer

It helps to recover the message on a specific exchange after a configured number of retries are done. We can call this a dead letter exchange.

Steps to create a sample application

In the Message producer flow, we created an exchange and a queue. We also bound the Queue to Exchange using a binding key ( routing key ). We were successfully able to see the spike in the Queue which meant a message was properly produced on the Queue.

Create a spring boot project

Using the spring initializer, create a spring-boot application with the below dependencies and import it into the editor you are making use of.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-devtools</artifactId> 
</dependency>

Note * Spring boot creates a SpringBootApplication class like below :

package com.getinputs.retry.retryconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RetryConsumerApplication {
  public static void main(String[] args) {
    SpringApplication.run(RetryConsumerApplication.class, args);
  }
}

Configuration class for Rabbit MQ Connection

Create a Configuration class that will help us to connect to Rabbit-Mq using the required details

package com.getinputs.retry.retryconsumer;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfiguration {
	
    @Bean
    @Qualifier("customRaabitAdmin")
    AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }
	
    @Bean
    @Qualifier("customConnectionFactry")
    ConnectionFactory connectionFactory() {
		final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("localhost:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPort(5672);
        return connectionFactory;
    }
}

Message consumer class

Create a message consumer class that consumes a message produced on “student.queue”

package com.getinputs.retry.retryconsumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;

public class RetryMessageConsumer implements MessageListener {

  Queue queue;

  public RetryMessageConsumer(Queue queue) {
    this.queue = queue;
  }

  Queue getQueue() {
    return queue;
  }

  @Override
  public void onMessage(Message message) {
    System.out.println("Message consumed is : " + message);
    throw new RuntimeException("Hope this is retried by Rabbit-Mq");
  }
}

Note * In this example, we are not making call to any FX Api instead we are throwing the exception from the consumer itself .

Configuration of dead letter queue

Create a configuration class to configure the dead letter queue and exchange. We will also create an interceptor that intercepts the message and move the message to a dead letter queue after a configured number of retries.

package com.getinputs.retry.retryconsumer;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
public class DeadLetterQueueConfiguration {

  public static String EXCHANGE_NAME = "test.deadLetter.exchange";
  public static String QUEUE_NAME = "test.deadLetter.queue";
  public static String ROUTING_KEY = "test.deadLetter.routing.key";

  @Autowired
  @Qualifier("customRaabitAdmin")
  AmqpAdmin amqpAdmin;

  @Autowired
  @Qualifier("customConnectionFactry")
  ConnectionFactory connectionFactory;

  @Bean
  Queue deadLetterQueue() {
    Queue queue = deadLetterQueue(QUEUE_NAME);
    amqpAdmin.declareQueue(queue);
    amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(deadLetterExchange())
      .with(ROUTING_KEY));
    return queue;
  }

  @Bean
  Queue deadLetterQueue(String queueName) {
    return new Queue(queueName);
  }

  @Bean
  TopicExchange deadLetterExchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE_NAME);
    amqpAdmin.declareExchange(exchange);
    return exchange;
  }

  @Bean
  public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(exponentialBackOffPolicy());

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryTemplate.setRetryPolicy(retryPolicy);

    return retryTemplate;
  }

  public ExponentialBackOffPolicy exponentialBackOffPolicy() {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(30000);

    return backOffPolicy;
  }

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setRetryTemplate(this.retryTemplate());
    return rabbitTemplate;
  }

  @Bean
  @Qualifier("deadLetterInterceptor")
  RetryOperationsInterceptor deadLetterInterceptor() {
    return RetryInterceptorBuilder.stateless().maxAttempts(5).recoverer(
      new RepublishMessageRecoverer(rabbitTemplate(), EXCHANGE_NAME,
        ROUTING_KEY)).backOffPolicy(exponentialBackOffPolicy()).build();
  }
}

Configuration of message listener

Create a configuration class that helps to configure the message listener along with the exchange and queues. This configuration is for the actual Rabbit-Mq Consumer which in our case is RetryMessageConsumer class. We tell the listener which interceptor to call in case an exception rises in the Consumer application ( in our case it is deadLetterInterceptor )

package com.getinputs.retry.retryconsumer;

import org.aopalliance.aop.Advice;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;

@Configuration
public class RetryRabbitConfiguration {
	
String queueName = "student.queue";
String exchangeName = "student.exchange";
String bindingKey = "student.routing.key";
	
@Autowired
@Qualifier("customRaabitAdmin")
AmqpAdmin amqpAdmin;
	
@Autowired
@Qualifier("customConnectionFactry")
ConnectionFactory connectionFactory;
	
@Autowired
@Qualifier("deadLetterInterceptor")
private RetryOperationsInterceptor retryOperationsInterceptor;
	
@Bean
RetryMessageConsumer retryMessageConsumer() {
	return new RetryMessageConsumer(studentQueue());
}
	
@Bean
Queue studentQueue() {
        Queue queue = studentQueue(queueName);
        amqpAdmin.declareQueue(queue);
        amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(studentExchange())
                .with(bindingKey));
        return queue;
}
	
@Bean
Queue studentQueue(String queueName) {
	return new Queue(queueName);
}
	
@Bean
TopicExchange studentExchange() {
        TopicExchange exchange = new TopicExchange(exchangeName);
        amqpAdmin.declareExchange(exchange);
        return exchange;
}
		
@Bean
SimpleMessageListenerContainer messageListener() {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
             simpleMessageListenerContainer.setQueues(retryMessageConsumer().getQueue());
        simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(retryMessageConsumer()));
        simpleMessageListenerContainer.setAdviceChain(new Advice[] { retryOperationsInterceptor });
        return simpleMessageListenerContainer;
    }
}

Executing the application

Once the application starts consumer will consume the message from the “student.queue” and throws a runtime exception. This message was produced earlier as part of the Message producer example. Now since an exception has been encountered in the consumer application and an interceptor is already defined so the message gets retried automatically for a specified interval. Once all the retries are completed and if the exception still exists then the message will be moved to DeadLetter Queue.

Output :

Tomcat started on port(s): 8080 (http) with context path ''

2020-11-27 13:10:56.564  INFO 21296 --- [  restartedMain] c.g.r.r.RetryConsumerApplication         : Started RetryConsumerApplication in 2.561 seconds (JVM running for 3.018)

Message consumed is : (Body:'{"rollNumber":101,"name":"Suraj"}' MessageProperties [headers={__TypeId__=com.getinputs.producer.Student}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=student.exchange, receivedRoutingKey=student.routing.key, deliveryTag=1, consumerTag=amq.ctag-dSGoRF2ybRC6xwNXE8C85A, consumerQueue=student.queue])

Message consumed is : (Body:'{"rollNumber":101,"name":"Suraj"}' MessageProperties [headers={__TypeId__=com.getinputs.producer.Student}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=student.exchange, receivedRoutingKey=student.routing.key, deliveryTag=1, consumerTag=amq.ctag-dSGoRF2ybRC6xwNXE8C85A, consumerQueue=student.queue])

Message consumed is : (Body:'{"rollNumber":101,"name":"Suraj"}' MessageProperties [headers={__TypeId__=com.getinputs.producer.Student}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=student.exchange, receivedRoutingKey=student.routing.key, deliveryTag=1, consumerTag=amq.ctag-dSGoRF2ybRC6xwNXE8C85A, consumerQueue=student.queue])

Message consumed is : (Body:'{"rollNumber":101,"name":"Suraj"}' MessageProperties [headers={__TypeId__=com.getinputs.producer.Student}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=student.exchange, receivedRoutingKey=student.routing.key, deliveryTag=1, consumerTag=amq.ctag-dSGoRF2ybRC6xwNXE8C85A, consumerQueue=student.queue])

Message consumed is : (Body:'{"rollNumber":101,"name":"Suraj"}' MessageProperties [headers={__TypeId__=com.getinputs.producer.Student}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=student.exchange, receivedRoutingKey=student.routing.key, deliveryTag=1, consumerTag=amq.ctag-dSGoRF2ybRC6xwNXE8C85A, consumerQueue=student.queue])

2020-11-27 13:12:07.579  WARN 21296 --- [ssageListener-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'test.deadLetter.exchange' with routing key test.deadLetter.routing.key

Note * Based on the time interval the message gets retried 5 times after that it is moved to dead letter Queue.

This is all about the Rabbit-MQ retry mechanism. Hope you did like this article. If you want to add anything extra please comment below and notify me if I have gone wrong anywhere. Also, you can write to me if you want to understand any other concept.

Related Links

Leave a Comment