扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
public static void main(String[] args) {
创新互联专业为企业提供黄石港网站建设、黄石港做网站、黄石港网站设计、黄石港网站制作等企业网站建设、网页设计与制作、黄石港企业网站模板建站服务,十年黄石港做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
Buffer buffer=new Buffer(); //创建一个临界区对象
new Producer(buffer,100).start(); //创建一个生产者对象,并启动其线程
new Consumer(buffer,200).start(); //创建一个消费者对象,并启动其线程
new Consumer(buffer,201).start(); //创建第二个消费者对象,并启动其线程
}
这一段代码,多加入
new Consumer(buffer,201).start(); //创建第二个消费者对象,并启动其线程
多加一段代码创建一个消费者
多加入
new Producer(buffer,100).start();
创建一个生产者。
想要很多生产者和消费者?加就是了啊。
第四个文件 Buffer.java
这个是实现同步的主要缓存类。想要实现同步
在每个方法的声明前都加入synchronized 就行
synchronized 线程锁,很好用的。源代码已经加入了
就比如
public synchronized void put(int value) { //同步方法控制临界区内容写入
使用的生产者和消费者模型具有如下特点:
(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。
(2)消费者只消费指定生产者的产品。
(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。
(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。
Windows
用来实现同步和互斥的实体。在Windows
中,常见的同步对象有:信号量(Semaphore)、
互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分
为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的
上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程
中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。
1.用锁操作原语实现互斥
为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:
①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。
②执行临界区程序。
③开锁。将锁打开,退出临界区。
2.信号量及WAIT,SIGNAL操作原语
信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。
WAIT(S):顺序执行以下两个动作:
①信号量的值减1,即S=S-1;
②如果S≥0,则该进程继续执行;
如果
S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。
SIGNAL(S):顺序执行以下两个动作
①S值加
1,即
S=S+1;
②如果S)0,则该进程继续运行;
如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。
在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。
从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。
---------------
/**
*
生产者
*
*/
public
class
Producer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Producer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
product
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
---------------
/**
*
消费者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+"
gets
a
product
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
-------------------------
/**
*
缓冲区
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
-----------------
/**
*
此类用来模拟信号量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue--;
if(semValue0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue=0){
this.notify();
}
}
}
------------------------
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Producer("p001",mutex,full,empty,bf)).start();
Producer
p=new
Producer("p001",mutex,full,empty,bf);
new
Thread(new
Producer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------
span style="font-size:18px;"span style="font-size:18px;"package com.ft.com.nextxteam.test;
import java.util.LinkedList;
/**
* 使用wait/notify处理消费者问题
* Created by drjr on 16-9-19.
*/
public class TestProductConm {
public static class Storage {
// 仓库最大存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedListObject list = new LinkedListObject();
// 生产num个产品
public void produce(int num) {
// 同步代码段
synchronized (list) {
// 如果仓库剩余容量不足
while (list.size() + num MAX_SIZE) {
System.out.println("【要生产的产品数量】:" + num + "/t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try {
// 由于条件不满足,生产阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产条件满足情况下,生产num个产品
for (int i = 1; i = num; ++i) {
list.add(new Object());
}
System.out.println("【已经生产产品数】:" + num + "/t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// 消费num个产品
public void consume(int num) {
// 同步代码段
synchronized (list) {
// 如果仓库存储量不足
while (list.size() num) {
System.out.println("【要消费的产品数量】:" + num + "/t【库存量】:"
+ list.size() + "/t暂时不能执行生产任务!");
try {
// 由于条件不满足,消费阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费条件满足情况下,消费num个产品
for (int i = 1; i = num; ++i) {
list.remove();
}
System.out.println("【已经消费产品数】:" + num + "/t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// get/set方法
public LinkedListObject getList() {
return list;
}
public void setList(LinkedListObject list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
public static class Producer extends Thread {
// 每次生产的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Producer(Storage storage) {
this.storage = storage;
}
// 线程run函数
public void run() {
produce(num);
}
// 调用仓库Storage的生产函数
public void produce(int num) {
storage.produce(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
public static class Consumer extends Thread {
// 每次消费的产品数量
private int num;
// 所在放置的仓库
private Storage storage;
// 构造函数,设置仓库
public Consumer(Storage storage) {
this.storage = storage;
}
// 线程run函数
public void run() {
consume(num);
}
// 调用仓库Storage的生产函数
public void consume(int num) {
storage.consume(num);
}
// get/set方法
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public Storage getStorage() {
return storage;
}
public void setStorage(Storage storage) {
this.storage = storage;
}
}
public static void main(String[] args) {
// 仓库对象
Storage storage = new Storage();
// 生产者对象
Producer p1 = new Producer(storage);
Producer p2 = new Producer(storage);
Producer p3 = new Producer(storage);
Producer p4 = new Producer(storage);
Producer p5 = new Producer(storage);
Producer p6 = new Producer(storage);
Producer p7 = new Producer(storage);
// 消费者对象
Consumer c1 = new Consumer(storage);
Consumer c2 = new Consumer(storage);
Consumer c3 = new Consumer(storage);
// 设置生产者产品生产数量
p1.setNum(10);
p2.setNum(10);
p3.setNum(10);
p4.setNum(10);
p5.setNum(10);
p6.setNum(10);
p7.setNum(80);
// 设置消费者产品消费数量
c1.setNum(50);
c2.setNum(20);
c3.setNum(30);
// 线程开始执行
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
p3.start();
p4.start();
p5.start();
p6.start();
p7.start();
}
}/span/span
我这里有个消费者问题能跑的,你可以参考下(其中两处Math.random() * 200多修改几次这里的值看效果跟好):
public class ProducerConsumer {
public static void main(String[] args) {
SyncStack ss = new SyncStack();
Producer p = new Producer(ss);
Consumer c = new Consumer(ss);
new Thread(p).start();
new Thread(c).start();
}
}
class WoTou {
int id;
WoTou(int id) {
this.id = id;
}
}
class SyncStack {
int index = 0;
WoTou[] arrWT = new WoTou[6];
public synchronized void push(WoTou wt) {
while(index == arrWT.length) {
try {
System.out.println("生产者说篓子满了,我去休息");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
arrWT[index] = wt;
index++;
System.out.println("生产前后" + index +"个");
}
public synchronized WoTou pop() {
while(index ==0) {
try {
System.out.println("消费者说篓子空了,我去休息");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
index--;
System.out.println("消费后剩余" + index +"个");
return arrWT[index];
}
}
class Producer implements Runnable {
SyncStack ss = null;
Producer(SyncStack ss) {
this.ss = ss;
}
public void run() {
for(int i=0; i20; i++){
WoTou wt = new WoTou(i);
ss.push(wt);
try {
Thread.sleep((int)(Math.random() * 200));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
SyncStack ss = null;
Consumer(SyncStack ss) {
this.ss = ss;
}
public void run() {
for(int i=0; i20; i++) {
WoTou wt = ss.pop();
try {
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.作业分Producwer,Consumer,Space,Apple,Test5个类编写。
2.Space为中转存储空间,Apple为标记生产物品。
3.所有类都存放于sycnchronized包内。
***********************************************************
package sycnchronized;
//产品标识
public class Apple {
int id;
public Apple(int id){
this.id=id;
}
// public String toString(){
// return ""+id;
// }
}
…………………………………………………………
package sycnchronized;
//产品的存储空间
public class Space {
Apple ap[]=new Apple[10];
int index=0;
//产品存入
public synchronized void push(Apple AP) {
while(index==ap.length){
try {
this.wait();
} catch (InterruptedException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}
ap[index]=AP;
index++;
this.notify();
}
// 产品的取出
public synchronized Apple pop(){
while (index==0){
try {
this.wait();
} catch (InterruptedException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}
index--;
this.notify();
return ap[index];
}
}
……………………………………………………………………
package sycnchronized;
//摘苹果
public class Producer implements Runnable{
Space sp=new Space();
Producer (Space sp){
this.sp=sp;
}
public void run() {
for(int i=0;i100;i++){
Apple Ap=new Apple(i);
sp.push(Ap);
System.out.println("摘了"+(i+1)+"个苹果!");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}
}
}
………………………………………………………………………
package sycnchronized;
//吃苹果
public class Consumer implements Runnable {
Space sp=new Space();
Consumer(Space sp){
this.sp=sp;
}
public void run() {
for(int i=0;i100;i++){
Apple Ap=sp.pop();
System.out.println("吃掉了"+(i+1)+"个苹果!");
System.out.println();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}
}
}
……………………………………………………………………
package sycnchronized;
//测试
public class Test {
public static void main(String[] args) {
Space sp=new Space();
Producer pd=new Producer(sp);
Consumer cs=new Consumer(sp);
new Thread(pd).start();
new Thread(cs).start();
}
}
生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。
解决生产者/消费者问题的方法可分为两类:
采用某种机制保护生产者和消费者之间的同步;
在生产者和消费者之间建立一个管道。
第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
wait()
/
notify()方法
await()
/
signal()方法
BlockingQueue阻塞队列方法
PipedInputStream
/
PipedOutputStream
通过
wait()
/
notify()方法实现:
wait()
/
nofity()方法是基类Object的两个方法:
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。
notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
通过await()
/
signal()方法实现:
await()和signal()的功能基本上和wait()
/
nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
通过BlockingQueue方法实现:
它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()
/
signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:
put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流