一个生产者,n个消费者(比如n=20),
中间用一个大小为k的缓冲区连接(比如k=40)生产者不断的产生任务,放入缓冲区;当缓冲区满了之后,生产者等待缓冲区有空位时继续生产
消费者从缓冲区中取任务,并完成任务;当缓冲区没有任务时,则等待我用线程池来做这个,把消费者放入线程池但是遇到几个问题
1.生产者生产的任务,如果缓冲区满了,它不能丢弃这个任务;但是如果用sleep,然后再不断探测缓冲区大小,好像资源消耗会比较多,如何让缓冲区主动来通知生产者?
2.线程池遇到异常,其中执行的某个线程会消亡,然后线程就会减少……
3.如何退出这个程序?反正比较郁闷
谁能给段示例代码?

解决方案 »

  1.   

    1.生产者生产的任务,如果缓冲区满了,它不能丢弃这个任务;但是如果用sleep,然后再不断探测缓冲区大小,好像资源消耗会比较多,如何让缓冲区主动来通知生产者?
    ------------------
    re:这个需要wait,不是sleep,当然不能丢弃任务2.线程池遇到异常,其中执行的某个线程会消亡,然后线程就会减少……
    ------------------
    re:你没理解池,池的大小是有boss控制的,池遇到异常?什么异常,我见过的池,除了VM的异常没有任何异常。因为她本身不做操作3.如何退出这个程序?
    -------------------------
    re:boolean控制 加 捕捉异常。这是经典的退出。
      

  2.   

    //这是我自己写的两个线程的例子,一个生产者,一个消费者
    public class ThreadTest 
    {
    public static void main(String[] args) 
    {
    StudentInfo stu = new StudentInfo();
    Thread t1 = new Thread(new Producer(stu));
    Thread t2 = new Thread(new Customer(stu));
    t1.start();
    t2.start(); }
    }
    class StudentInfo
    {
    String name = "";
    boolean bFull = false;
    public synchronized void pushData(String name)
    {
    //如果有数据,那么该线程等待,至到消费者线程取完数据以后才被唤醒
    if (bFull)
    {
    try {this.wait();} catch(Exception ex){};
    }
    this.name = name;
    bFull = true;
    System.out.println(name);
    this.notify();
    }
    public synchronized String popData()   
    {
    //如果没有数据,那么该线程等待,至到生产者线程放入数据以后才被唤醒
    if (!bFull)
    {
    try {this.wait();} catch(Exception ex){};
    }
    bFull = false;
    System.out.println("My name is "+this.name);
    this.notify();  
    return this.name;
    }
    }
    class Producer implements Runnable
    {
    StudentInfo stu;
    public Producer(StudentInfo stu)
    {
    this.stu = stu;
    }
    public void run()
    {
    while (true)
    {
    stu.pushData("ysy");
    }
    }
    }
    class Customer implements Runnable
    {
    StudentInfo stu;
    public Customer(StudentInfo stu)
    {
    this.stu = stu;
    }
    public void run()
    {
    while (true)
    {
    stu.popData();
    }
    }
    };
      

  3.   

    修正一下上面的例子
    //这是我自己写的两个线程的例子,一个生产者,一个消费者
    //生产什么,消费什么
    import java.io.*;
    public class ThreadTest 
    {
    public static void main(String[] args) 
    {
    StudentInfo stu = new StudentInfo();
    Thread t1 = new Thread(new Producer(stu));
    Thread t2 = new Thread(new Customer(stu));
    t1.start();
    t2.start(); }
    }
    class StudentInfo
    {
    String name = "";
    boolean bFull = false;
    public synchronized void pushData(String name)
    {
    //如果有数据,那么该线程等待,至到消费者线程取完数据以后才被唤醒
    if (bFull)
    {
    try {this.wait();} catch(Exception ex){System.out.println(ex.toString());};
    }
    this.name = name;
    bFull = true;
    this.notify();
    }
    public synchronized String popData()   
    {
    //如果没有数据,那么该线程等待,至到生产者线程放入数据以后才被唤醒
    if (!bFull)
    {
    try {this.wait();} catch(Exception ex){System.out.println(ex.toString());};
    }
    bFull = false;
    System.out.println("My name is "+this.name);
    this.notify();  
    return this.name;
    }
    }
    class Producer implements Runnable
    {
    StudentInfo stu;
    BufferedReader read = new BufferedReader(new InputStreamReader(System.in));
    String str;
    public Producer(StudentInfo stu)
    {
    this.stu = stu;
    }
    public void run()
    {
    while (true)
    {
    try
    {
    str = read.readLine();
    }
    catch (Exception ex)
    {
    System.out.println(ex.toString());
    }
    stu.pushData(str);
    }
    }
    }
    class Customer implements Runnable
    {
    StudentInfo stu;
    public Customer(StudentInfo stu)
    {
    this.stu = stu;
    }
    public void run()
    {
    while (true)
    {
    stu.popData();
    }
    }
    };
      

  4.   

    谢谢,但是如果我再加一个消费者线程,就会出错
    为什么啊Thread t1 = new Thread(new Producer(stu));
    Thread t2 = new Thread(new Customer(stu));
    Thread t3 = new Thread(new Customer(stu));
    t1.start();
    t2.start();
    t3.start();  // 如果不启动这个线程,就是对的,只要启动,运行一段时间就会出错了
      

  5.   

    /*
     * ThreadTest.java
     *
     * Created on 2005年11月18日, 上午9:49
     *
     * To change this template, choose Tools | Template Manager
     * and open the template in the editor.
     */package http.test.thread;import java.io.BufferedReader;
    import java.io.InputStreamReader;public class ThreadTest {
        
        public static void main(String[] args) {
            StudentInfo stu = new StudentInfo();
            Thread t1 = new Thread(new Producer(stu));
            Thread t2 = new Thread(new Customer(stu));
            Thread t3 = new Thread(new Customer(stu));
            t1.start();
            t2.start();
            
            t3.start();  // 如果这个线程不启动,就能正常运行
                          // 一旦启动,运行一段时间就会出错了
        }
    }class StudentInfo {
        String name = "";
        boolean bFull = false;
        public synchronized void pushData(String name) {
            //如果有数据,那么该线程等待,至到消费者线程取完数据以后才被唤醒
            while (bFull) {
                try {
                    this.wait();
                } catch(Exception ex){
                    System.out.println(ex.toString());
                };
            }
            this.name = name;
            bFull = true;
            System.out.println("生产者线程:"+this.name);
            this.notify();
        }
        public synchronized String popData() {
            //如果没有数据,那么该线程等待,至到生产者线程放入数据以后才被唤醒
            while (!bFull) {
                try {
                    this.wait();
                } catch(Exception ex){
                    System.out.println(ex.toString());
                };
            }
            bFull = false;
            System.out.println("消费者线程:"+this.name);
            this.notify();
            return this.name;
        }
    }class Producer implements Runnable {
        StudentInfo stu;
        public Producer(StudentInfo stu) {
            this.stu = stu;
        }
        static int i=1;
        public void run() {
            while(true){
                // 生产者如何停下来?
                // 比如这里的数据是从数据库里面取,当已经取完的时候,
                // 如何停下来,并通知消费者?
                // if(没有数据){停止生产,通知消费者}
                stu.pushData("Name " + i);
            }
        }
    }class Customer implements Runnable {
        StudentInfo stu;
        public Customer(StudentInfo stu) {
            this.stu = stu;
        }
        public void run() {
            while(true){
                // 如果生产者已经停止生产了,
                // 如何结束程序?
                stu.popData();
            }
        }
    };
      

  6.   

    ysycrazy(风中狂)你好,
    pop和push中都加了synchronized,为什么多个线程不行呢?
      

  7.   

    //多个生产者,多个消费者的例子
    import java.util.*;
    public class ThreadTest
    {
    public static void main(String [] args)
    {
    MyStack st = new MyStack();
    Thread t1 = new Thread(new Producer(st));
    Thread t2 = new Thread(new Producer(st));
    Thread t3 = new Thread(new Customer(st));
    Thread t4 = new Thread(new Customer(st));
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    }
    }
    class MyStack
    {
    //使用Vector类创建一个栈,它有无限大的空间。所以我只考虑栈为空的情况
    private Vector buffer = new Vector(400,200);
    public synchronized void push(char c,int num) 
    {
    Character charObj = new Character(c);
    buffer.addElement(charObj);
    System.out.println("Producer" + num + ": " + c);
    this.notify();
    }
    public synchronized char pop(int num) 
    {
    char c;
    while (buffer.size() == 0)
    {
    try {this.wait();} catch (InterruptedException e) {}
    }
    c = ((Character)buffer.remove(buffer.size()-1)).charValue();
    System.out.println(" Customer" + num + ": " + c);
    return c; }};
    class Producer implements Runnable
    {
    MyStack st;
    private static int counter = 1;
    private int num;
    public Producer(MyStack st)
    {
    this.st = st;
    this.num=counter++;
    }
    public void run()
    {
    char c;
    for (int i = 0; i < 200; i++) 
    {
    c = (char)(Math.random() * 26 + 'A');
    st.push(c,num);
    try 
    {
    Thread.sleep((int)(Math.random() * 300));
    } catch (InterruptedException e) 
    {
    }
    } }
    }
    class Customer implements Runnable
    {
    MyStack st;
    private static int counter = 1;
    private int num;
    public Customer(MyStack st)
    {
    this.st = st;
    num = counter++;
    }
    public void run()
    {
    char c;
    for (int i = 0; i < 200; i++) 
    {
    c = st.pop(num);
    try 
    {
    Thread.sleep((int)(Math.random() * 300));
    }
    catch (InterruptedException e) 
    {
    }
    } }
    };
      

  8.   

    //多个生产者,多个消费者的例子
    //多个生产者生产完以后,停止生产,消费者开始消费,直到消费完毕。
    //Writed by ysy 2005-11-19 16:41:00
    import java.util.*;
    import java.io.*;
    public class ThreadTest
    {
    public static void main(String [] args)
    {
    int i=0;
    try
    {
    System.out.println("请输入生产的数量:");
    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    i = Integer.parseInt(br.readLine());
    }
    catch (IOException ex)

    System.out.println(ex.getMessage());

    MyStack st = new MyStack(i);
    Thread t1 = new Thread(new Producer(st));
    Thread t2 = new Thread(new Producer(st));
    Thread t3 = new Thread(new Customer(st));
    Thread t4 = new Thread(new Customer(st));
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    }
    }
    class MyStack
    {
    //使用Vector类创建一个栈,它有无限大的空间。所以我只考虑栈为空的情况
    public MyStack(int initialCapacity)
    {
    this.initialCapacity = initialCapacity;
    }
    private int initialCapacity=0;
    private Vector buffer = new Vector(initialCapacity);
    boolean bFull = false; //判断空间是否为满
    boolean bCustomer = false;  //中止消费者的标志
    public synchronized void push(char c,int num) 
    {
    Character charObj = new Character(c);
    buffer.addElement(charObj);
    System.out.println("Producer" + num + ": " + c);
    if (buffer.size()  == initialCapacity)
    {
    System.out.println("生产完毕,开始消费");
    bFull = true;
    this.notifyAll();
    }
    }
    public synchronized char pop(int num) 
    {
    char c;
    if (buffer.size() == 0 || !bFull )
    {
    try {this.wait();} catch (InterruptedException e) {}
    }
    c = ((Character)buffer.remove(buffer.size()-1)).charValue();
    System.out.println(" Customer" + num + ": " + c);
    if (buffer.size() == 0  )
    {
    System.out.println("消费结束。");
    bCustomer = true;
    }

    return c; }
    };
    //生产者
    class Producer implements Runnable
    {
    MyStack st;
    private static int counter = 1;
    private int num;
    public Producer(MyStack st)
    {
    this.st = st;
    this.num=counter++;
    }
    public void run()
    {
    char c;
    while (!st.bFull)
    {
    c = (char)(Math.random() * 26 + 'A');
    st.push(c,num);
    try 
    {
    Thread.sleep((int)(Math.random() * 300));
    } catch (InterruptedException e) 
    {
    }
    } }
    }
    //消费者
    class Customer implements Runnable
    {
    MyStack st;
    private static int counter = 1;
    private int num;
    public Customer(MyStack st)
    {
    this.st = st;
    num = counter++;
    }
    public void run()
    {
    char c;
    while (!st.bCustomer)
    {
    c = st.pop(num);
    try 
    {
    Thread.sleep((int)(Math.random() * 300));
    }
    catch (InterruptedException e) 
    {
    }
    } }
    };
      

  9.   

    你这程序真是怪,这么经典的思路,竟然把Vector整进来。这程序是不是你在别人的程序的基础上改的?要不怎么看着这么别扭。