一个生产者,n个消费者(比如n=20),
中间用一个大小为k的缓冲区连接(比如k=40)生产者不断的产生任务,放入缓冲区;当缓冲区满了之后,生产者等待缓冲区有空位时继续生产
消费者从缓冲区中取任务,并完成任务;当缓冲区没有任务时,则等待我用线程池来做这个,把消费者放入线程池但是遇到几个问题
1.生产者生产的任务,如果缓冲区满了,它不能丢弃这个任务;但是如果用sleep,然后再不断探测缓冲区大小,好像资源消耗会比较多,如何让缓冲区主动来通知生产者?
2.线程池遇到异常,其中执行的某个线程会消亡,然后线程就会减少……
3.如何退出这个程序?反正比较郁闷
谁能给段示例代码?
中间用一个大小为k的缓冲区连接(比如k=40)生产者不断的产生任务,放入缓冲区;当缓冲区满了之后,生产者等待缓冲区有空位时继续生产
消费者从缓冲区中取任务,并完成任务;当缓冲区没有任务时,则等待我用线程池来做这个,把消费者放入线程池但是遇到几个问题
1.生产者生产的任务,如果缓冲区满了,它不能丢弃这个任务;但是如果用sleep,然后再不断探测缓冲区大小,好像资源消耗会比较多,如何让缓冲区主动来通知生产者?
2.线程池遇到异常,其中执行的某个线程会消亡,然后线程就会减少……
3.如何退出这个程序?反正比较郁闷
谁能给段示例代码?
------------------
re:这个需要wait,不是sleep,当然不能丢弃任务2.线程池遇到异常,其中执行的某个线程会消亡,然后线程就会减少……
------------------
re:你没理解池,池的大小是有boss控制的,池遇到异常?什么异常,我见过的池,除了VM的异常没有任何异常。因为她本身不做操作3.如何退出这个程序?
-------------------------
re:boolean控制 加 捕捉异常。这是经典的退出。
public class ThreadTest
{
public static void main(String[] args)
{
StudentInfo stu = new StudentInfo();
Thread t1 = new Thread(new Producer(stu));
Thread t2 = new Thread(new Customer(stu));
t1.start();
t2.start(); }
}
class StudentInfo
{
String name = "";
boolean bFull = false;
public synchronized void pushData(String name)
{
//如果有数据,那么该线程等待,至到消费者线程取完数据以后才被唤醒
if (bFull)
{
try {this.wait();} catch(Exception ex){};
}
this.name = name;
bFull = true;
System.out.println(name);
this.notify();
}
public synchronized String popData()
{
//如果没有数据,那么该线程等待,至到生产者线程放入数据以后才被唤醒
if (!bFull)
{
try {this.wait();} catch(Exception ex){};
}
bFull = false;
System.out.println("My name is "+this.name);
this.notify();
return this.name;
}
}
class Producer implements Runnable
{
StudentInfo stu;
public Producer(StudentInfo stu)
{
this.stu = stu;
}
public void run()
{
while (true)
{
stu.pushData("ysy");
}
}
}
class Customer implements Runnable
{
StudentInfo stu;
public Customer(StudentInfo stu)
{
this.stu = stu;
}
public void run()
{
while (true)
{
stu.popData();
}
}
};
//这是我自己写的两个线程的例子,一个生产者,一个消费者
//生产什么,消费什么
import java.io.*;
public class ThreadTest
{
public static void main(String[] args)
{
StudentInfo stu = new StudentInfo();
Thread t1 = new Thread(new Producer(stu));
Thread t2 = new Thread(new Customer(stu));
t1.start();
t2.start(); }
}
class StudentInfo
{
String name = "";
boolean bFull = false;
public synchronized void pushData(String name)
{
//如果有数据,那么该线程等待,至到消费者线程取完数据以后才被唤醒
if (bFull)
{
try {this.wait();} catch(Exception ex){System.out.println(ex.toString());};
}
this.name = name;
bFull = true;
this.notify();
}
public synchronized String popData()
{
//如果没有数据,那么该线程等待,至到生产者线程放入数据以后才被唤醒
if (!bFull)
{
try {this.wait();} catch(Exception ex){System.out.println(ex.toString());};
}
bFull = false;
System.out.println("My name is "+this.name);
this.notify();
return this.name;
}
}
class Producer implements Runnable
{
StudentInfo stu;
BufferedReader read = new BufferedReader(new InputStreamReader(System.in));
String str;
public Producer(StudentInfo stu)
{
this.stu = stu;
}
public void run()
{
while (true)
{
try
{
str = read.readLine();
}
catch (Exception ex)
{
System.out.println(ex.toString());
}
stu.pushData(str);
}
}
}
class Customer implements Runnable
{
StudentInfo stu;
public Customer(StudentInfo stu)
{
this.stu = stu;
}
public void run()
{
while (true)
{
stu.popData();
}
}
};
为什么啊Thread t1 = new Thread(new Producer(stu));
Thread t2 = new Thread(new Customer(stu));
Thread t3 = new Thread(new Customer(stu));
t1.start();
t2.start();
t3.start(); // 如果不启动这个线程,就是对的,只要启动,运行一段时间就会出错了
* ThreadTest.java
*
* Created on 2005年11月18日, 上午9:49
*
* To change this template, choose Tools | Template Manager
* and open the template in the editor.
*/package http.test.thread;import java.io.BufferedReader;
import java.io.InputStreamReader;public class ThreadTest {
public static void main(String[] args) {
StudentInfo stu = new StudentInfo();
Thread t1 = new Thread(new Producer(stu));
Thread t2 = new Thread(new Customer(stu));
Thread t3 = new Thread(new Customer(stu));
t1.start();
t2.start();
t3.start(); // 如果这个线程不启动,就能正常运行
// 一旦启动,运行一段时间就会出错了
}
}class StudentInfo {
String name = "";
boolean bFull = false;
public synchronized void pushData(String name) {
//如果有数据,那么该线程等待,至到消费者线程取完数据以后才被唤醒
while (bFull) {
try {
this.wait();
} catch(Exception ex){
System.out.println(ex.toString());
};
}
this.name = name;
bFull = true;
System.out.println("生产者线程:"+this.name);
this.notify();
}
public synchronized String popData() {
//如果没有数据,那么该线程等待,至到生产者线程放入数据以后才被唤醒
while (!bFull) {
try {
this.wait();
} catch(Exception ex){
System.out.println(ex.toString());
};
}
bFull = false;
System.out.println("消费者线程:"+this.name);
this.notify();
return this.name;
}
}class Producer implements Runnable {
StudentInfo stu;
public Producer(StudentInfo stu) {
this.stu = stu;
}
static int i=1;
public void run() {
while(true){
// 生产者如何停下来?
// 比如这里的数据是从数据库里面取,当已经取完的时候,
// 如何停下来,并通知消费者?
// if(没有数据){停止生产,通知消费者}
stu.pushData("Name " + i);
}
}
}class Customer implements Runnable {
StudentInfo stu;
public Customer(StudentInfo stu) {
this.stu = stu;
}
public void run() {
while(true){
// 如果生产者已经停止生产了,
// 如何结束程序?
stu.popData();
}
}
};
pop和push中都加了synchronized,为什么多个线程不行呢?
import java.util.*;
public class ThreadTest
{
public static void main(String [] args)
{
MyStack st = new MyStack();
Thread t1 = new Thread(new Producer(st));
Thread t2 = new Thread(new Producer(st));
Thread t3 = new Thread(new Customer(st));
Thread t4 = new Thread(new Customer(st));
t1.start();
t2.start();
t3.start();
t4.start();
}
}
class MyStack
{
//使用Vector类创建一个栈,它有无限大的空间。所以我只考虑栈为空的情况
private Vector buffer = new Vector(400,200);
public synchronized void push(char c,int num)
{
Character charObj = new Character(c);
buffer.addElement(charObj);
System.out.println("Producer" + num + ": " + c);
this.notify();
}
public synchronized char pop(int num)
{
char c;
while (buffer.size() == 0)
{
try {this.wait();} catch (InterruptedException e) {}
}
c = ((Character)buffer.remove(buffer.size()-1)).charValue();
System.out.println(" Customer" + num + ": " + c);
return c; }};
class Producer implements Runnable
{
MyStack st;
private static int counter = 1;
private int num;
public Producer(MyStack st)
{
this.st = st;
this.num=counter++;
}
public void run()
{
char c;
for (int i = 0; i < 200; i++)
{
c = (char)(Math.random() * 26 + 'A');
st.push(c,num);
try
{
Thread.sleep((int)(Math.random() * 300));
} catch (InterruptedException e)
{
}
} }
}
class Customer implements Runnable
{
MyStack st;
private static int counter = 1;
private int num;
public Customer(MyStack st)
{
this.st = st;
num = counter++;
}
public void run()
{
char c;
for (int i = 0; i < 200; i++)
{
c = st.pop(num);
try
{
Thread.sleep((int)(Math.random() * 300));
}
catch (InterruptedException e)
{
}
} }
};
//多个生产者生产完以后,停止生产,消费者开始消费,直到消费完毕。
//Writed by ysy 2005-11-19 16:41:00
import java.util.*;
import java.io.*;
public class ThreadTest
{
public static void main(String [] args)
{
int i=0;
try
{
System.out.println("请输入生产的数量:");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
i = Integer.parseInt(br.readLine());
}
catch (IOException ex)
{
System.out.println(ex.getMessage());
}
MyStack st = new MyStack(i);
Thread t1 = new Thread(new Producer(st));
Thread t2 = new Thread(new Producer(st));
Thread t3 = new Thread(new Customer(st));
Thread t4 = new Thread(new Customer(st));
t1.start();
t2.start();
t3.start();
t4.start();
}
}
class MyStack
{
//使用Vector类创建一个栈,它有无限大的空间。所以我只考虑栈为空的情况
public MyStack(int initialCapacity)
{
this.initialCapacity = initialCapacity;
}
private int initialCapacity=0;
private Vector buffer = new Vector(initialCapacity);
boolean bFull = false; //判断空间是否为满
boolean bCustomer = false; //中止消费者的标志
public synchronized void push(char c,int num)
{
Character charObj = new Character(c);
buffer.addElement(charObj);
System.out.println("Producer" + num + ": " + c);
if (buffer.size() == initialCapacity)
{
System.out.println("生产完毕,开始消费");
bFull = true;
this.notifyAll();
}
}
public synchronized char pop(int num)
{
char c;
if (buffer.size() == 0 || !bFull )
{
try {this.wait();} catch (InterruptedException e) {}
}
c = ((Character)buffer.remove(buffer.size()-1)).charValue();
System.out.println(" Customer" + num + ": " + c);
if (buffer.size() == 0 )
{
System.out.println("消费结束。");
bCustomer = true;
}
return c; }
};
//生产者
class Producer implements Runnable
{
MyStack st;
private static int counter = 1;
private int num;
public Producer(MyStack st)
{
this.st = st;
this.num=counter++;
}
public void run()
{
char c;
while (!st.bFull)
{
c = (char)(Math.random() * 26 + 'A');
st.push(c,num);
try
{
Thread.sleep((int)(Math.random() * 300));
} catch (InterruptedException e)
{
}
} }
}
//消费者
class Customer implements Runnable
{
MyStack st;
private static int counter = 1;
private int num;
public Customer(MyStack st)
{
this.st = st;
num = counter++;
}
public void run()
{
char c;
while (!st.bCustomer)
{
c = st.pop(num);
try
{
Thread.sleep((int)(Math.random() * 300));
}
catch (InterruptedException e)
{
}
} }
};