RabbitMQ-创新互联

什么是消息的TTL?

TTL = time to live,消息的TTL = 消息的存活时间或过期时间

成都创新互联专注于蓬安企业网站建设,响应式网站设计,成都商城网站开发。蓬安网站建设公司,为蓬安等地区提供建站服务。全流程按需网站建设,专业设计,全程项目跟踪,成都创新互联专业和态度为您提供的服务什么是死信队列?

当队列中的消息到达存活时间或过期时间后,若未设置死信队列,则该消息将被抛弃,反之则转入死信队列

死信队列

配置类

死信队列仅需配置类,其与消息队列的绑定在消息队列的配置类中完成

// 死信队列
@Configuration
public class DeadLetterConfiguration {

    //订单交换机
    @Bean
    public DirectExchange deadLetterDirectExchange(){
        return new DirectExchange("dead_letter_direct_exchange_order", true, false);
    }

    //消息通知队列
    @Bean
    public Queue deadLetterQueue(){
        return new Queue("dead_letter_queue", true, false, false);
    }

    //绑定交换机与队列
    @Bean
    public Binding deadLetterDirectBinding(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterDirectExchange()).with("dead_letter_order");
    }
}

消息的TTL

消息的TTL分为消息队列的TTL与消息本身的TTL

消息队列的TTL:被转发至该队列中的所有消息均遵循该队列的TTL规则

消息本身的TTL:消息遵循本身携带的TTL规则,不影响所处队列中的其他消息

问:若同时存在消息队列的TTL与消息本身的TTL,优先级如何?

答:取时间短者

消息队列的TTL

配置类

关于示例代码中的参数,可于RabbitMQ Management---Queue---Add a new queue---Arguments中查看

@Configuration
public class QueueTTLConfiguration {

    //订单交换机
    @Bean
    public DirectExchange queueTTLDirectExchange(){
        return new DirectExchange("ttl_direct_exchange_order", true, false);
    }

    //消息通知队列
    @Bean
    public Queue TTLQueue(){
        //TTL参数设置
        Mapargs = new HashMap<>();
        args.put("x-message-ttl", 5000);                                        //time to live 5000ms
        args.put("x-max-length", 5);                                            //队列大消息数(溢出的消息移至死信队列)
        args.put("x-dead-letter-exchange", "dead_letter_direct_exchange_order");//死信队列交换机
        args.put("x-dead-letter-routing-key", "dead_letter_order");             //死信队列routingKey(当然 fanout交换机无需配置routingKey)
        return new Queue("ttl_queue", true, false, false, args);
    }

    //绑定交换机与队列
    @Bean
    public Binding TTLDirectBinding(){
        return BindingBuilder.bind(TTLQueue()).to(queueTTLDirectExchange()).with("queue_ttl");
    }
}

生产者

@Service
public class QueueTTLOrderService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void makeOrder(String userID, String producerID, int num){
        // 1.根据需求查询仓库 判断是否能满足需求

        // 2.若能满足则生成订单
        String orderID = UUID.randomUUID().toString();
        System.out.println("成功生成订单");

        // 3.通过RabbitMQ发送消息
        String exchangeName = "ttl_direct_exchange_order";
        String routingKey = "queue_ttl";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID + " queue_ttl");
        System.out.println("订单发送成功");
    }
}

消费者

此处仅演示消息队列的TTL与死信队列,故不添加消费者

消息本身的TTL

配置类

此处演示不与死信队列绑定情况下的TTL

@Configuration
public class MessageTTLConfiguration {

    //订单交换机
    @Bean
    public DirectExchange messageTTLDirectExchange(){
        return new DirectExchange("ttl_direct_exchange_order", true, false);
    }

    //消息通知队列
    @Bean
    public Queue messageTTLQueue(){
        return new Queue("message_ttl_queue", true, false, false);
    }

    //绑定交换机与队列
    @Bean
    public Binding MessageTTLDirectBinding(){
        return BindingBuilder.bind(messageTTLQueue()).to(messageTTLDirectExchange()).with("message_ttl");
    }
}

生产者

@Service
public class MessageTTLOrderService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void makeOrder(String userID, String producerID, int num){
        // 1.根据需求查询仓库 判断是否能满足需求

        // 2.若能满足则生成订单
        String orderID = UUID.randomUUID().toString();
        System.out.println("成功生成订单");

        // 3.通过RabbitMQ发送消息
        String exchangeName = "ttl_direct_exchange_order";
        String routingKey = "message_ttl";
        //消息传输加工机 此处用于为消息设置TTL时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");//Expiration 到期
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        // 将messagePostProcessor对象同消息一起传递
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderID + " message_ttl", messagePostProcessor);
        System.out.println("订单发送成功");
    }
}

消费者

此处仅演示消息本身的TTL,故不添加消费者

测试

消息队列的TTL:此处发送十条消息,溢出的五条会马上被送至死信队列,五秒后余下的五条也会被送至死信队列

@Autowired
    QueueTTLOrderService queueTtlOrderService;
    @Test
    void ttlOrders(){
        for (int i = 0; i< 10; i++) {
            queueTtlOrderService.makeOrder("1", "1", 1);
        }
    }

消息本身的TTL:消息队列并未设置上限,也未绑定死信队列,故消息失效后将被直接抛弃

@Autowired
    MessageTTLOrderService messageTTLOrderService;
    @Test
    void messageTTLOrder() {
        messageTTLOrderService.makeOrder("1", "1", 1);
    }

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


本文题目:RabbitMQ-创新互联
本文地址:http://csdahua.cn/article/dhsiec.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流