当有多个生产者、多个消费者时,自己写的总是有错误。
谁能给个多个生产者、多个消费者的多线程例子。谢谢!

解决方案 »

  1.   

    可以发到我邮箱:[email protected]
      

  2.   

    对wait()、notify(),总是不明白,谁解析一下
      

  3.   


    public class ProducerConsumerT {
       public static void main(String[] args) {
          CubbyHole c = new CubbyHole();  
          Producer p1 = new Producer(c,1);
      Producer p2 = new Producer(c,2);
      Producer p3 = new Producer(c,3);
          Consumer c1 = new Consumer(c,1);
      Consumer c2 = new Consumer(c,2);
          p1.start();
      p2.start();
      p3.start();
          c1.start();
      c2.start();
       }
    }
    class Producer extends Thread{
       private CubbyHole cubbyhole;
       private int number;
       public Producer(CubbyHole c,int number){
          cubbyhole = c;
          this.number = number;
       }
       public void run(){
          for(int i=0;i<10;i++){
             cubbyhole.put(i);
             System.out.println("Producer No."+this.number+"put: "+i);
             try{
                sleep((int)Math.random()*100);
             }catch(InterruptedException e){}
          }
       }}
    class Consumer extends Thread{
       private CubbyHole cubbyhole;
       private int number;
       public Consumer(CubbyHole c,int number){
          cubbyhole = c;
          this.number = number;
       }
       public void run(){
          int value = 0;
          for(int i=0;i<10;i++){
             value = cubbyhole.get();
             System.out.println("Consumber No." + this.number +"got: "+value);
          }
       }
    }
    class CubbyHole{
       private int contents;
       private boolean available = false;  
       public synchronized int get(){
          while(available == false) {
             try{
                wait();
             }catch(InterruptedException e){}
          }
       available = false;
       notifyAll();             
       return contents;
       }
       public synchronized void put(int value){
          while(available == true){
             try{
                wait();
             }catch(InterruptedException e){}
          }
          contents = value;
          available = true;
          notify();
       }
    }
      

  4.   

    public class Consumer extends Thread{
    private MyStack stack;

    public Consumer(MyStack stack){
    this.stack=stack;
    }

    public void run(){
    while(true){
    stack.pop();
    try {
    Thread.sleep(400);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }
    }
    public class MyStack {
    private int count=0;
    private final int MAX_SIZE;
    public MyStack(int size){
    this.MAX_SIZE=size;
    }

    public synchronized void push(){
    while(count==MAX_SIZE){
    try {
    this.wait();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    count++;
    System.out.println(Thread.currentThread().getName()+"add a data"+count);

    this.notifyAll();
    }

    public synchronized void pop(){
    while(count==0){
    try {
    this.wait();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    System.out.println(Thread.currentThread().getName()+"remove a data"+count);
    count--;
    this.notifyAll();

    }
    }public class Producter extends Thread{
    private MyStack stack;
    public Producter(MyStack stack){
    this.stack=stack;
    }
    public void run(){
    while(true){
    stack.push();
    try {
    Thread.sleep(300);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }
    }
    public class TestThread {
    public static void main(String[] args) {
    MyStack my=new MyStack(30);
    Producter p1=new Producter(my);
    Producter p2=new Producter(my);
    p1.setName("viszl");
    p2.setName("visual");
    p1.start();
    p2.start();

    Consumer c1=new Consumer(my);
    Consumer c2=new Consumer(my);
    c1.setName("Consumer-1");
    c2.setName("Consumer-2");
    c1.start();
    c2.start();
    }}
    在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,wait()导致当前线程等待。
      

  5.   

        简单说一下吧,这里需要加入一个角色,资源管理者---rscManager,由它来统一管理资源,它有两个标记:isPush,isPop:刚开始时,isPush = true, isPop = false;这样生产者们可以生产,消费者们不可以消费(wait),当有生产者们生产完产品后,通过rscManager入库,rscManager将isPop改为true,同时notify一下(所有wait的消费者),消费者们通过调用rscManager的出库方法得到产品。
         当队列满了后,isPush = false,isPop 仍为true, 生产者们等待,消费者们仍能运行,消费产品后,自然变isPush = true,rscManager自然会notify生产者们。
         如果想更简单一点的话,就是两个线程池加一个队列管理器,比如java.util.concurrent.ArrayBlockingQueue;]
         
        顺便说一个什么叫wait():我去休息了,有事叫我,这就叫wait()是线程自己暂时停下执行。Notify就是:那个谁,别睡了,你妈妈叫你回家吃饭呢,说白了就是告诉别人一下,意为唤醒别人。
      

  6.   

    回复7楼的viszl:生产满的时候,
    1、假设生成者p1获得执行权,执行wati(),线程处于等待状态。
    2、同时,生产者p2获得执行权,同时是满的,也执行wait(),线程也处于等待状态。
    3、然后消费者c1获得执行权,消费一个,然后调用notify(),唤醒线程。其中是唤醒等待的任一个线程。
    4、假如这时候生产者p1被唤醒,然后生产一个,这时候也是的了。接着p1也会执行notif(),也会唤醒等待的线程
    5、这时候有可能等待线程p2被唤醒,p2被唤醒的话,也会生产一个,这不是已经超过满的了吗???满上加一个了。希望可以解释一下我的假设?
      

  7.   

    /*
    模拟生产者和消费者的工作过程,生产者不定时产生字符,放到数组中,消费者不定时地从数组中取字符********
    *///生产者线程
    class createThread extends Thread{
    private boolean stopFlag = false;

    public  void run(){
    try{
       while(!stopFlag){
         synchronized(test.ArrC){
          while (test.pos==test.ArrC.length-1){
         test.ArrC.wait();
        }        //产生一个字符
           int a = (int)'a';
           int z = (int)'z';
           int interval = z - a;

           int i = a + (int)(Math.random() * interval);
           char c = (char)i;

           //将字符放到数组中
           test.pos ++;
           test.ArrC[test.pos] = c;
           System.out.println("生产者产生了字符"+ c);
           test.ArrC.notifyAll();
        }
       }
     }catch(InterruptedException e){}   System.out.println("停止生产");
    }

    public void shutdown(){
    stopFlag = true;
      interrupt();
    }
    }//消费者线程
    class customThread extends Thread{

    private boolean stopFlag = false;

    public  void run(){
    try{
         while(!stopFlag){
          synchronized(test.ArrC){
      
     while (test.pos==-1){
    test.ArrC.wait();
    }
        
        char c = test.ArrC[test.pos];
          test.pos--;
            System.out.println("消费者消费了字符"+ c);
        test.ArrC.notifyAll();       
      }
      
      }catch(InterruptedException e){}
    System.out.println("停止消费");
    }

    public void shutdown(){
    stopFlag = true;
    interrupt();
    }
    }public class test{
    public static Object o = new Object();
    public static int pos = -1;
    public static char ArrC[] = new char[6];

    public static void main(String args[]){

    createThread ct = new createThread();
    customThread cut = new customThread();
    ct.start();
    cut.start();

    try{
    Thread.currentThread().sleep(10000);
    }catch(InterruptedException e){}

    ct.shutdown();
    try{
    Thread.currentThread().sleep(6000);
    }catch(InterruptedException e){}

    cut.shutdown();

    }
    }
    /*
    模拟生产者和消费者的工作过程,生产者不定时产生字符,放到数组中,消费者不定时地从数组中取字符********若消费的速度比生产的速度快,则程序出问题
    *///生产者线程
    class createThread extends Thread{
    private boolean stopFlag = false;

    public  void run(){
    while(!stopFlag){

       //产生一个字符
       int a = (int)'a';
       int z = (int)'z';
       int interval = z - a;

       int i = a + (int)(Math.random() * interval);
       char c = (char)i;

       //将字符放到数组中
       test.pos ++;
       test.ArrC[test.pos] = c;
       System.out.println("生产者产生了字符"+ c);

       try{
       sleep(100);
       }catch(InterruptedException e){}
      }
    }

    public void shutdown(){
    stopFlag = true;
    }
    }//消费者线程
    class customThread extends Thread{

    private boolean stopFlag = false;

    public  void run(){
      while(!stopFlag){
       char c = test.ArrC[test.pos];
         test.pos--;
           System.out.println("消费者消费了字符"+ c);

       try{
         sleep(150); //当把它改的比生产者速度快时,程序要出问题
       }catch(InterruptedException e){}

    }

    }

    public void shutdown(){
    stopFlag = true;
    }
    }public class test{
    public static Object o = new Object();
    public static int pos = -1;
    public static char ArrC[] = new char[6];

    public static void main(String args[]){

    createThread ct = new createThread();
    customThread cut = new customThread();
    ct.start();
    cut.start();

    try{
    Thread.currentThread().sleep(7000);
    }catch(InterruptedException e){}

    ct.shutdown();
    cut.shutdown();

    }
    }
      

  8.   

    public class ProduceConsume {
    public static void main(String[] args) {
    SyncStack ss = new SyncStack();
    Producer p = new Producer(ss);
    Consumer c = new Consumer(ss);
    new Thread(p).start();
    new Thread(p).start();
    new Thread(p).start();//三个生产者
    new Thread(c).start();//一个消费者
    }
    }class WoTou {
    int id; 
    WoTou(int id) {
    this.id = id;
    }
    public String toString() {
    return "WoTou : " + id;
    }
    }class SyncStack {         //存放生产品的类
    int index = 0;
    WoTou[] arrWT = new WoTou[6];

    public synchronized void push(WoTou wt) {
    while(index == arrWT.length) {
    try {
    this.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    this.notifyAll();
    arrWT[index] = wt;
    index ++;
    }

    public synchronized WoTou pop() {
    while(index == 0) {
    try {
    this.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    this.notifyAll();
    index--;
    return arrWT[index];
    }
    }class Producer implements Runnable {      //生产者的类
    SyncStack ss = null;
    Producer(SyncStack ss) {
    this.ss = ss;
    }

    public void run() {
    for(int i=0; i<20; i++) {
    WoTou wt = new WoTou(i);
    ss.push(wt);
    System.out.println("生产了:" + wt);
    try {
    Thread.sleep((int)(Math.random() * 200));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }class Consumer implements Runnable {          //消费者的类
    SyncStack ss = null;
    Consumer(SyncStack ss) {
    this.ss = ss;
    }

    public void run() {
    for(int i=0; i<60; i++) {
    WoTou wt = ss.pop();
    System.out.println("消费了: " + wt);
    try {
    Thread.sleep((int)(Math.random() * 1000));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
      

  9.   

    大家写得都差不多。不过我觉得有个问题:生产满的时候, 
    1、假设生成者p1获得执行权,执行wati(),线程处于等待状态。 
    2、同时,生产者p2获得执行权,同时是满的,也执行wait(),线程也处于等待状态。 
    3、然后消费者c1获得执行权,消费一个,然后调用notify(),唤醒线程。其中是唤醒等待的任一个线程。 
    4、假如这时候生产者p1被唤醒,然后生产一个,这时候也是满的了。接着p1也会执行notif(),也会唤醒等待的线程 
    5、这时候有可能等待线程p2被唤醒,p2被唤醒的话,也会生产一个,这不是已经超过满的了吗???满上加一个了。 希望可以解释一下我的假设?
      

  10.   

    生产者消费者模型可以参考一下这个帖子http://topic.csdn.net/u/20090910/10/1bad6c51-565d-4935-bf4d-a25163649f5a.html?72814
      

  11.   

    看我的代码
    public synchronized void push(WoTou wt) {
            while(index == arrWT.length) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.notifyAll();        
            arrWT[index] = wt;
            index ++;
        }我用的是notifyAll(); 所有wait()的线程都会被唤醒,但只要栈是满的,像你所说的p2即使被唤醒执行push,也是进入里面的while循环,执行了 this.wait();然后生产者继续等待。因为push是synchronized的,所以并发访问时是安全的。
    生产者和消费者问题很好的体现了线程间的通信。 
      

  12.   

    http://download.csdn.net/source/428187