扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
public interface ThreadPool{ // 执行一个 Job,这个 Job 需要实现 Runnable
void execute(Job job);
// 关闭线程池
void shutdown();
// 增加工作者线程
void addWorkers(int num);
// 减少工作者线程
void removeWorker(int num);
// 得到正在等待执行的任务数量
int getJobSize();
}
客户端可以通过 execute(Job)方法将 Job 提交入线程池执行,而客户端自身不用等待
Job 的执行完成。除了 execute(Job)方法以外,线程池接口提供了增大/减少工作者线程以
及关闭线程池的方法。这里工作者线程代表着一个重复执行 Job 的线程,而每个由客户
端提交的 Job 都将进入到一个工作队列中等待工作者线程的处理。接下来是线程池接口
的默认实现
public class DefaultThreadPoolimplements ThreadPool{ // 线程池大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
// 这是一个工作列表,将会向里面插入工作
private final LinkedListjobs = new LinkedList();
// 工作者列表
private final Listworkers = Collections.synchronizedList(new
ArrayList());
// 工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private AtomicLong threadNum = new AtomicLong();
public DefaultThreadPool() { initializeWokers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) { workerNum = num >MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : Math.max(num,
MIN_WORKER_NUMBERS);
initializeWokers(workerNum);
}
public void execute(Job job) { if (job != null) { // 添加一个工作,然后进行通知
synchronized (jobs) { jobs.addLast(job);
jobs.notify();
}
}
}
public void shutdown() { for (Worker worker : workers) { worker.shutdown();
}
}
public void addWorkers(int num) { synchronized (jobs) { // 限制新增的 Worker 数量不能超过大值
if (num + this.workerNum >MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWokers(num);
this.workerNum += num;
}
}
public void removeWorker(int num) { synchronized (jobs) { if (num >= this.workerNum) { throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止 Worker
int count = 0;
while (count< num) { Worker worker = workers.get(count);
if (workers.remove(worker)) {worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
public int getJobSize() { return jobs.size();
}
// 初始化线程工作者
private void initializeWokers(int num) { for (int i = 0; i< num; i++) { Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
incrementAndGet());
thread.start();
}
}
// 工作者,负责消费任务
class Worker implements Runnable { // 是否工作
private volatile boolean running = true;
public void run() { while (running) { Job job = null;
synchronized (jobs) { // 如果工作者列表是空的,那么就 wait
while (jobs.isEmpty()) { try { jobs.wait();
} catch (InterruptedException ex) { // 感知到外部对 WorkerThread 的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个 Job
job = jobs.removeFirst();
}
if (job != null) { try { job.run();
} catch (Exception ex) { // 忽略 Job 执行中的 Exception
}
}
}
}
public void shutdown() { running = false;
}
}
}
从线程池的实现可以看到,当客户端调用 execute(Job)方法时,会不断地向任务列表
jobs 中添加 Job,而每个工作者线程会不断地从 jobs 上取出一个 Job 进行执行,当 jobs
为空时,工作者线程进入等待状态。
添加一个 Job 后,对工作队列 jobs 调用了其 notify()方法,而不是 notifyAll()方法,
因为能够确定有工作者线程被唤醒,这时使用 notify()方法将会比 notifyAll()方法获得更
小的开销(避免将等待队列中的线程全部移动到阻塞队列中)。
可以看到,线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客
户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队
列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当
有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更
多的工作者线程会被唤醒。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流