Redis中的分布式Java队列的应用-创新互联

Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。从2010年3月15日起,Redis的开发工作由VMware主持。

创新互联建站长期为上千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为尧都企业提供专业的成都网站制作、成都网站设计、外贸营销网站建设尧都网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。

在Redis中使用队列

像任何消息代理一样,Redis需要以正确的顺序发送消息。 可以根据消息的年龄或某些其他预定义的优先级等级发送消息。

为了存储这些未决消息,Redis开发人员需要队列数据结构。 Redisson是使用Redis和Java进行分布式编程的框架,它提供了许多分布式数据结构(包括队列)的实现。

Redisson通过提供Java API使Redis开发更加容易。 Redisson不需要开发人员学习Redis命令,而是包括所有众所周知的Java接口,例如Queue和BlockingQueue。 Redisson还处理Redis中繁琐的幕后工作,例如连接管理,故障转移处理和数据序列化。

基于Redis的分布式Java队列

Redisson提供了Java中基本队列数据结构的多个基于Redis的实现,每种实现都有不同的功能。 这使你可以选择最适合你目的的队列类型。

下面,我们将使用Redisson Java框架讨论六种不同类型的基于Redis的分布式队列。

队列
Redisson中的RQueue对象实现了java.util.Queue接口。 队列用于需要从最早的最早的元素开始处理(也称为“先进先出”或FIFO)的情况。

与普通Java一样,可以使用peek()方法检查RQueue的第一个元素,或者使用poll()方法检查和删除RQueue的第一个元素:

RQueue queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

阻塞队列

Redisson中的RBlockingQueue对象实现了java.util.BlockingQueue接口。

BlockingQueues是阻塞线程的队列,这些线程试图从空队列中进行轮询,或者试图在已满的队列中插入元素。 该线程将被阻塞,直到另一个线程将一个元素插入到空队列中,或从完整队列中轮询第一个元素为止。

下面的示例代码演示了RBlockingQueue的正确实例化和使用。 特别是,你可以使用参数指定对象将等待线程变得可用的时间来调用poll()方法:

RBlockingQueue queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

在故障转移或重新连接到Redis服务器的过程中,将自动重新预订poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。

BoundedBlockingQueue
Redisson中的RBoundedBlockingQueue

对象实现了有界的阻塞队列结构。 有界阻塞队列是容量已受限制(即有限)的阻塞队列。

以下代码演示了如何在Redisson中实例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用于尝试设置阻塞队列的容量。 trySetCapacity()返回布尔值“ true”或“ false”,这取决于是否成功设置了容量或是否已经设置了容量:

RBoundedBlockingQueue queue = redisson.getBoundedBlockingQueue("anyQueue");
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

延迟排队

Redisson中的RDelayedQueue对象允许你在Redis中实现延迟队列。 当使用诸如指数补偿的策略将消息传递给消费者时,这可能会很有用。 每次尝试发送邮件失败后,重试之间的时间将成倍增加。

在与元素一起指定的延迟之后,延迟队列中的每个元素将被转移到目标队列。 此目标队列可以是实现RQueue接口的任何队列,例如RBlockingQueue或RBoundedBlockingQueue。

RQueue destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue delayedQueue = getDelayedQueue(destinationQueue);
// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在不再需要队列之后,通过使用destroy()方法销毁延迟的队列是一个好主意。 但是,如果要关闭Redisson,则没有必要。
PriorityQueue

Redisson中的RPriorityQueue对象实现了java.util.Queue接口。 优先级队列是不是按元素的使用期限而是按照与每个元素相关联的优先级排序的队列。
如下面的示例代码所示,RPriorityQueue使用比较器对队列中的元素进行排序:

RPriorityQueue queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();

PriorityBlockingQueue
Redisson中的RPriorityBlockingQueue对象结合了RPriorityQueue和RBlockingQueue的功能。 与RPriorityQueue一样,RPriorityBlockingQueue也使用Comparator对队列中的元素进行排序。

RPriorityBlockingQueue queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();

在故障转移或重新连接到Redis服务器的过程中,将自动重新预订poll(),pollLastAndOfferFirstTo()和take()Java方法。

创新互联www.cdcxhl.cn,专业提供香港、美国云服务器,动态BGP最优骨干路由自动选择,持续稳定高效的网络助力业务部署。公司持有工信部办法的idc、isp许可证, 机房独有T级流量清洗系统配攻击溯源,准确进行流量调度,确保服务器高可用性。佳节活动现已开启,新人活动云服务器买多久送多久。


当前名称:Redis中的分布式Java队列的应用-创新互联
分享URL:http://csdahua.cn/article/dccscd.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流