public class ProducerConsumerT { public static void main(String[] args) { CubbyHole c = new CubbyHole(); Producer p1 = new Producer(c,1); Producer p2 = new Producer(c,2); Producer p3 = new Producer(c,3); Consumer c1 = new Consumer(c,1); Consumer c2 = new Consumer(c,2); p1.start(); p2.start(); p3.start(); c1.start(); c2.start(); } } class Producer extends Thread{ private CubbyHole cubbyhole; private int number; public Producer(CubbyHole c,int number){ cubbyhole = c; this.number = number; } public void run(){ for(int i=0;i<10;i++){ cubbyhole.put(i); System.out.println("Producer No."+this.number+"put: "+i); try{ sleep((int)Math.random()*100); }catch(InterruptedException e){} } }} class Consumer extends Thread{ private CubbyHole cubbyhole; private int number; public Consumer(CubbyHole c,int number){ cubbyhole = c; this.number = number; } public void run(){ int value = 0; for(int i=0;i<10;i++){ value = cubbyhole.get(); System.out.println("Consumber No." + this.number +"got: "+value); } } } class CubbyHole{ private int contents; private boolean available = false; public synchronized int get(){ while(available == false) { try{ wait(); }catch(InterruptedException e){} } available = false; notifyAll(); return contents; } public synchronized void put(int value){ while(available == true){ try{ wait(); }catch(InterruptedException e){} } contents = value; available = true; notify(); } }
public class Consumer extends Thread{ private MyStack stack;
public Consumer(MyStack stack){ this.stack=stack; }
public void run(){ while(true){ stack.pop(); try { Thread.sleep(400); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public class MyStack { private int count=0; private final int MAX_SIZE; public MyStack(int size){ this.MAX_SIZE=size; }
public synchronized void push(){ while(count==MAX_SIZE){ try { this.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName()+"add a data"+count);
this.notifyAll(); }
public synchronized void pop(){ while(count==0){ try { this.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+"remove a data"+count); count--; this.notifyAll();
} }public class Producter extends Thread{ private MyStack stack; public Producter(MyStack stack){ this.stack=stack; } public void run(){ while(true){ stack.push(); try { Thread.sleep(300); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public class TestThread { public static void main(String[] args) { MyStack my=new MyStack(30); Producter p1=new Producter(my); Producter p2=new Producter(my); p1.setName("viszl"); p2.setName("visual"); p1.start(); p2.start();
public void run(){ try{ while(!stopFlag){ synchronized(test.ArrC){ while (test.pos==test.ArrC.length-1){ test.ArrC.wait(); } //产生一个字符 int a = (int)'a'; int z = (int)'z'; int interval = z - a;
int i = a + (int)(Math.random() * interval); char c = (char)i;
public void shutdown(){ stopFlag = true; interrupt(); } }public class test{ public static Object o = new Object(); public static int pos = -1; public static char ArrC[] = new char[6];
public static void main(String args[]){
createThread ct = new createThread(); customThread cut = new customThread(); ct.start(); cut.start();
public void shutdown(){ stopFlag = true; } }public class test{ public static Object o = new Object(); public static int pos = -1; public static char ArrC[] = new char[6];
public static void main(String args[]){
createThread ct = new createThread(); customThread cut = new customThread(); ct.start(); cut.start();
public class ProduceConsume { public static void main(String[] args) { SyncStack ss = new SyncStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); new Thread(p).start(); new Thread(p).start(); new Thread(p).start();//三个生产者 new Thread(c).start();//一个消费者 } }class WoTou { int id; WoTou(int id) { this.id = id; } public String toString() { return "WoTou : " + id; } }class SyncStack { //存放生产品的类 int index = 0; WoTou[] arrWT = new WoTou[6];
public class ProducerConsumerT {
public static void main(String[] args) {
CubbyHole c = new CubbyHole();
Producer p1 = new Producer(c,1);
Producer p2 = new Producer(c,2);
Producer p3 = new Producer(c,3);
Consumer c1 = new Consumer(c,1);
Consumer c2 = new Consumer(c,2);
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
}
}
class Producer extends Thread{
private CubbyHole cubbyhole;
private int number;
public Producer(CubbyHole c,int number){
cubbyhole = c;
this.number = number;
}
public void run(){
for(int i=0;i<10;i++){
cubbyhole.put(i);
System.out.println("Producer No."+this.number+"put: "+i);
try{
sleep((int)Math.random()*100);
}catch(InterruptedException e){}
}
}}
class Consumer extends Thread{
private CubbyHole cubbyhole;
private int number;
public Consumer(CubbyHole c,int number){
cubbyhole = c;
this.number = number;
}
public void run(){
int value = 0;
for(int i=0;i<10;i++){
value = cubbyhole.get();
System.out.println("Consumber No." + this.number +"got: "+value);
}
}
}
class CubbyHole{
private int contents;
private boolean available = false;
public synchronized int get(){
while(available == false) {
try{
wait();
}catch(InterruptedException e){}
}
available = false;
notifyAll();
return contents;
}
public synchronized void put(int value){
while(available == true){
try{
wait();
}catch(InterruptedException e){}
}
contents = value;
available = true;
notify();
}
}
private MyStack stack;
public Consumer(MyStack stack){
this.stack=stack;
}
public void run(){
while(true){
stack.pop();
try {
Thread.sleep(400);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class MyStack {
private int count=0;
private final int MAX_SIZE;
public MyStack(int size){
this.MAX_SIZE=size;
}
public synchronized void push(){
while(count==MAX_SIZE){
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
count++;
System.out.println(Thread.currentThread().getName()+"add a data"+count);
this.notifyAll();
}
public synchronized void pop(){
while(count==0){
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"remove a data"+count);
count--;
this.notifyAll();
}
}public class Producter extends Thread{
private MyStack stack;
public Producter(MyStack stack){
this.stack=stack;
}
public void run(){
while(true){
stack.push();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class TestThread {
public static void main(String[] args) {
MyStack my=new MyStack(30);
Producter p1=new Producter(my);
Producter p2=new Producter(my);
p1.setName("viszl");
p2.setName("visual");
p1.start();
p2.start();
Consumer c1=new Consumer(my);
Consumer c2=new Consumer(my);
c1.setName("Consumer-1");
c2.setName("Consumer-2");
c1.start();
c2.start();
}}
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,wait()导致当前线程等待。
当队列满了后,isPush = false,isPop 仍为true, 生产者们等待,消费者们仍能运行,消费产品后,自然变isPush = true,rscManager自然会notify生产者们。
如果想更简单一点的话,就是两个线程池加一个队列管理器,比如java.util.concurrent.ArrayBlockingQueue;]
顺便说一个什么叫wait():我去休息了,有事叫我,这就叫wait()是线程自己暂时停下执行。Notify就是:那个谁,别睡了,你妈妈叫你回家吃饭呢,说白了就是告诉别人一下,意为唤醒别人。
1、假设生成者p1获得执行权,执行wati(),线程处于等待状态。
2、同时,生产者p2获得执行权,同时是满的,也执行wait(),线程也处于等待状态。
3、然后消费者c1获得执行权,消费一个,然后调用notify(),唤醒线程。其中是唤醒等待的任一个线程。
4、假如这时候生产者p1被唤醒,然后生产一个,这时候也是满的了。接着p1也会执行notif(),也会唤醒等待的线程
5、这时候有可能等待线程p2被唤醒,p2被唤醒的话,也会生产一个,这不是已经超过满的了吗???满上加一个了。希望可以解释一下我的假设?
模拟生产者和消费者的工作过程,生产者不定时产生字符,放到数组中,消费者不定时地从数组中取字符********
*///生产者线程
class createThread extends Thread{
private boolean stopFlag = false;
public void run(){
try{
while(!stopFlag){
synchronized(test.ArrC){
while (test.pos==test.ArrC.length-1){
test.ArrC.wait();
} //产生一个字符
int a = (int)'a';
int z = (int)'z';
int interval = z - a;
int i = a + (int)(Math.random() * interval);
char c = (char)i;
//将字符放到数组中
test.pos ++;
test.ArrC[test.pos] = c;
System.out.println("生产者产生了字符"+ c);
test.ArrC.notifyAll();
}
}
}catch(InterruptedException e){} System.out.println("停止生产");
}
public void shutdown(){
stopFlag = true;
interrupt();
}
}//消费者线程
class customThread extends Thread{
private boolean stopFlag = false;
public void run(){
try{
while(!stopFlag){
synchronized(test.ArrC){
while (test.pos==-1){
test.ArrC.wait();
}
char c = test.ArrC[test.pos];
test.pos--;
System.out.println("消费者消费了字符"+ c);
test.ArrC.notifyAll();
}
}catch(InterruptedException e){}
System.out.println("停止消费");
}
public void shutdown(){
stopFlag = true;
interrupt();
}
}public class test{
public static Object o = new Object();
public static int pos = -1;
public static char ArrC[] = new char[6];
public static void main(String args[]){
createThread ct = new createThread();
customThread cut = new customThread();
ct.start();
cut.start();
try{
Thread.currentThread().sleep(10000);
}catch(InterruptedException e){}
ct.shutdown();
try{
Thread.currentThread().sleep(6000);
}catch(InterruptedException e){}
cut.shutdown();
}
}
/*
模拟生产者和消费者的工作过程,生产者不定时产生字符,放到数组中,消费者不定时地从数组中取字符********若消费的速度比生产的速度快,则程序出问题
*///生产者线程
class createThread extends Thread{
private boolean stopFlag = false;
public void run(){
while(!stopFlag){
//产生一个字符
int a = (int)'a';
int z = (int)'z';
int interval = z - a;
int i = a + (int)(Math.random() * interval);
char c = (char)i;
//将字符放到数组中
test.pos ++;
test.ArrC[test.pos] = c;
System.out.println("生产者产生了字符"+ c);
try{
sleep(100);
}catch(InterruptedException e){}
}
}
public void shutdown(){
stopFlag = true;
}
}//消费者线程
class customThread extends Thread{
private boolean stopFlag = false;
public void run(){
while(!stopFlag){
char c = test.ArrC[test.pos];
test.pos--;
System.out.println("消费者消费了字符"+ c);
try{
sleep(150); //当把它改的比生产者速度快时,程序要出问题
}catch(InterruptedException e){}
}
}
public void shutdown(){
stopFlag = true;
}
}public class test{
public static Object o = new Object();
public static int pos = -1;
public static char ArrC[] = new char[6];
public static void main(String args[]){
createThread ct = new createThread();
customThread cut = new customThread();
ct.start();
cut.start();
try{
Thread.currentThread().sleep(7000);
}catch(InterruptedException e){}
ct.shutdown();
cut.shutdown();
}
}
public static void main(String[] args) {
SyncStack ss = new SyncStack();
Producer p = new Producer(ss);
Consumer c = new Consumer(ss);
new Thread(p).start();
new Thread(p).start();
new Thread(p).start();//三个生产者
new Thread(c).start();//一个消费者
}
}class WoTou {
int id;
WoTou(int id) {
this.id = id;
}
public String toString() {
return "WoTou : " + id;
}
}class SyncStack { //存放生产品的类
int index = 0;
WoTou[] arrWT = new WoTou[6];
public synchronized void push(WoTou wt) {
while(index == arrWT.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notifyAll();
arrWT[index] = wt;
index ++;
}
public synchronized WoTou pop() {
while(index == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notifyAll();
index--;
return arrWT[index];
}
}class Producer implements Runnable { //生产者的类
SyncStack ss = null;
Producer(SyncStack ss) {
this.ss = ss;
}
public void run() {
for(int i=0; i<20; i++) {
WoTou wt = new WoTou(i);
ss.push(wt);
System.out.println("生产了:" + wt);
try {
Thread.sleep((int)(Math.random() * 200));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}class Consumer implements Runnable { //消费者的类
SyncStack ss = null;
Consumer(SyncStack ss) {
this.ss = ss;
}
public void run() {
for(int i=0; i<60; i++) {
WoTou wt = ss.pop();
System.out.println("消费了: " + wt);
try {
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1、假设生成者p1获得执行权,执行wati(),线程处于等待状态。
2、同时,生产者p2获得执行权,同时是满的,也执行wait(),线程也处于等待状态。
3、然后消费者c1获得执行权,消费一个,然后调用notify(),唤醒线程。其中是唤醒等待的任一个线程。
4、假如这时候生产者p1被唤醒,然后生产一个,这时候也是满的了。接着p1也会执行notif(),也会唤醒等待的线程
5、这时候有可能等待线程p2被唤醒,p2被唤醒的话,也会生产一个,这不是已经超过满的了吗???满上加一个了。 希望可以解释一下我的假设?
public synchronized void push(WoTou wt) {
while(index == arrWT.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notifyAll();
arrWT[index] = wt;
index ++;
}我用的是notifyAll(); 所有wait()的线程都会被唤醒,但只要栈是满的,像你所说的p2即使被唤醒执行push,也是进入里面的while循环,执行了 this.wait();然后生产者继续等待。因为push是synchronized的,所以并发访问时是安全的。
生产者和消费者问题很好的体现了线程间的通信。