扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要讲解了“DelayQueue使用方式是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“DelayQueue使用方式是什么”吧!
网站建设哪家好,找成都创新互联公司!专注于网页设计、网站建设、微信开发、小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了奎屯免费建站欢迎大家使用!
假设我们生产者提交一个任务,消费者5秒钟之后才可以执行,那么我们可以把任务定义为如下格式,并实现Delayed接口,其中data是任务存储的信息。
/** * 具体的任务 * @author wangshixiang */ public class Task implements Delayed { /** * 数据 */ private final String data; /** * 任务执行时间 */ private final long time; public Task(String data,TimeUnit timeUnit,long time){ this.data=data; this.time=System.currentTimeMillis()+timeUnit.toMillis(time); } @Override public long getDelay(TimeUnit unit) { long res= time-System.currentTimeMillis(); return unit.convert(res,TimeUnit.MILLISECONDS); } public String getData() { return data; } @Override public int compareTo(Delayed o) { if (o instanceof Task ){ Task task= (Task) o; return (int) (this.time-task.time); } return 0; } }
定义好任务后,我们需要定义一个任务队列 QUEUE_TASK,来存储消息,实现效果为程序运行后 五秒钟后输出Hello...
private static final DelayQueueQUEUE_TASK =new DelayQueue<>(); public static void main(String[] args) throws InterruptedException { QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5)); System.out.println(QUEUE_TASK .take().getData()); }
Delayed 接口定义:
public interface DeDlayed extends Comparable{ long getDelay(TimeUnit unit); }
我们发现Delayed接口继承了Comparable接口,并且有一个getDelay方法,在程序运行的过程中,会调用头部任务的这个方法,来返回该任务具体还有多长时间可以执行。当我们任务实现这个接口时 可以存储任务的执行时间,通过执行时间-当前时间 计算出距离执行时间的差值,因此我们Task定义了一个任务的变量,在创建对象时设置任务的执行时间。
2. DelayQueue 延时队列
首先我们看一下DelayQueue类继承实现结构图
可以理解为 DelayQueue 是一个带延迟执行功能的阻塞队列
为什么Delayed接口继承了Comparable接口 ?
DelayQueue是怎么实现只有到预定时间才能取出任务 ?
向队列里放入一个任务时 发生了什么事情 ?
带着这几个问题,我们来看一下DelayQueeu的源码 首先看一下主要的参数:
//锁 private final transient ReentrantLock lock = new ReentrantLock(); //优先级队列 执行时间最早的排在第一个 private final PriorityQueueq = new PriorityQueue (); //是否有线程在等待任务到执行时间 private Thread leader; //条件唤醒 private final Condition available = lock.newCondition();
那么我们先看add(E e)方法 ,任务入队列时做了哪些操作
public boolean add(E e) { return offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
入队列时做了一下步骤:
获取锁
放入元素 (放入优先级队列)
如果自己排在第一个 则原来标记的leader线程已经失效 直接设置为null,并唤醒消费者
释放锁
接下来在看出队列时take()方法做了哪些操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) //我拿到元素了 唤醒其他的线程 available.signal(); lock.unlock(); } }
出队列做了如下步骤:
获取锁(可中断的锁 获取这种锁允许其他线程中断此线程)
取出第一个元素 如果第一个元素为空 则直接 await(),等待被唤醒(如放队列时的唤醒)
如果第一个元素不为空,查看是否到执行时间,如果没有到执行时间 查看是否有leader已经注意到这个任务 如果他注意到这个任务 我直接await()。如果没人注意,那么我就把自己设置为leader然后设置带时间的await()。
睡眠到执行时间后 醒来后查看leader是否还是自己 如果是的话 取消自己的leader身份。然后在尝试获取任务。
如果我获取到了符合要求的元素,那么我应该唤醒大家 来一块竞争获取下一个元素。
带时间的出队列方法 E poll(long timeout, TimeUnit unit) 的实现逻辑与take()方法的唯一区别就是。只有当自己剩余等待时间大于第一个元素剩余执行时间时 才允许把自己设置为leader
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0L) return null; else //睡眠等待时间 有可能提前返回 那么返回的是剩余等待时间 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); if (nanos <= 0L) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) //如果剩余等待时间比第一个元素剩余执行时间还短 那么应该睡剩余等待时间 nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); //计算剩余等待时间 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
在大多数业务场景中,我们会利用中间件提供的延时消息的功能。比如利用redis zset实现 ,kafka rabbit mq 的延时队列。我们需要根据我们的业务场景,来选择合适的中间件。
订单超时未支付取消.
调用其他系统时失败间隔重试.
调用第三方接口时,过段时间异步获取结果。
感谢各位的阅读,以上就是“DelayQueue使用方式是什么”的内容了,经过本文的学习后,相信大家对DelayQueue使用方式是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流