RabbitMQ中怎么实现延迟队列

这篇文章给大家介绍RabbitMQ中怎么实现延迟队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

成都创新互联,为您提供网站建设公司成都网站制作公司、网站营销推广、网站开发设计,对服务三维植被网等多个行业拥有丰富的网站建设及推广经验。成都创新互联网站建设公司成立于2013年,提供专业网站制作报价服务,我们深知市场的竞争激烈,认真对待每位客户,为客户提供赏心悦目的作品。 与客户共同发展进步,是我们永远的责任!

在 RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列

在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载

首先我们创建交换机和消息队列

import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig {  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";  public static final String LAZY_QUEUE = "MQ.LazyQueue";  public static final String LAZY_KEY = "lazy.#";  @Bean  public TopicExchange lazyExchange(){    //Map pros = new HashMap<>();    //设置交换机支持延迟消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);    exchange.setDelayed(true);    return exchange;  }  @Bean  public Queue lazyQueue(){    return new Queue(LAZY_QUEUE, true);  }  @Bean  public Binding lazyBinding(){    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);  }}

我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。

//Map pros = new HashMap<>();    //设置交换机支持延迟消息推送    //pros.put("x-delayed-message", "topic");    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。

import com.anqi.mq.config.MQConfig;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Date;@Componentpublic class MQSender {  @Autowired  private RabbitTemplate rabbitTemplate;  //confirmCallback returnCallback 代码省略,请参照上一篇   public void sendLazy(Object message){    rabbitTemplate.setMandatory(true);    rabbitTemplate.setConfirmCallback(confirmCallback);    rabbitTemplate.setReturnCallback(returnCallback);    //id + 时间戳 全局唯一    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());    //发送消息时指定 header 延迟时间    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,        new MessagePostProcessor() {      @Override      public Message postProcessMessage(Message message) throws AmqpException {        //设置消息持久化        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);        //message.getMessageProperties().setHeader("x-delay", "6000");        message.getMessageProperties().setDelay(6000);        return message;      }    }, correlationData);  }}

我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于我们手动设置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */public void setDelay(Integer delay) {  if (delay == null || delay < 0) {    this.headers.remove(X_DELAY);  }  else {    this.headers.put(X_DELAY, delay);  }}

消费端进行消费

import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class MQReceiver {  @RabbitListener(queues = "MQ.LazyQueue")  @RabbitHandler  public void onLazyMessage(Message msg, Channel channel) throws IOException{    long deliveryTag = msg.getMessageProperties().getDeliveryTag();    channel.basicAck(deliveryTag, true);    System.out.println("lazy receive " + new String(msg.getBody()));  }

测试结果

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class MQSenderTest {  @Autowired  private MQSender mqSender;  @Test  public void sendLazy() throws Exception {    String msg = "hello spring boot";    mqSender.sendLazy(msg + ":");  }}

果然在 6 秒后收到了消息 lazy receive hello spring boot:

关于RabbitMQ中怎么实现延迟队列就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


本文名称:RabbitMQ中怎么实现延迟队列
网站链接:http://csdahua.cn/article/pechje.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流