const unsigned short SIZE_OF_BUFFER = 10; //缓冲区长度 unsigned short ProductID = 0; //产品号 unsigned short ConsumeID = 0; //将被消耗的产品号 unsigned short in = 0; //产品进缓冲区时的缓冲区下标 unsigned short out = 0; //产品出缓冲区时的缓冲区下标
//创建生产者线程 for (int i=0;i<PRODUCERS_COUNT;++i){ hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]); if (hThreads[i]==NULL) return -1; } //创建消费者线程 for (int i=0;i<CONSUMERS_COUNT;++i){ hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]); if (hThreads[i]==NULL) return -1; }
class Producter extends Thread { Queue q; Producter (Queue q) { this.q=q; } public void run() { for(int i=0;i<10;i++) { q.put(i); System.out.println("producter :"+i); } } } class Consumer extends Thread { Queue q; Consumer(Queue q) { this.q=q; } public void run() { while(true) { System.out.println("Consumer:"+q.get()); } } } class Queue //key { int value; boolean bFull=false; public synchronized void put(int i) { if(!bFull) { value=i; bFull=true; notify();//必须用在synchronized } try { wait();//必须捕获异常 // 等待消费者取走数据 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public synchronized int get() { if(!bFull) try { wait(); // 等待生产者写入数据 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } bFull=false; // 通知生产者数据已经被取走,可以再次写入数据 notify(); return value; } } public class test //测试类 { public static void main(String[] args) { Queue q=new Queue(); Producter p=new Producter(q); Consumer c=new Consumer(q); p.start(); c.start(); } } 问题补充:System.out.println("Consumer:"+q.get()); } } } class Queue //key { int value; boolean bFull=false; public synchronized void put(int i) { if(!bFull) { value=i; bFull=true; notify();//必须用在synchronized } try { wait();//必须捕获异常 // 等待消费者取走数据 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public synchronized int get() { if(!bFull) try { wait(); // 等待生产者写入数据 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } bFull=false; // 通知生产者数据已经被取走,可以再次写入数据 notify(); return value; } } public class test //测试类 { public static void main(String[] args) { Queue q=new Queue(); Producter p=new Producter(q); Consumer c=new Consumer(q); p.start(); c.start(); } }
import java.util.Random; import java.util.concurrent.BlockingQueue;public class Producer implements Runnable { private BlockingQueue<String> drop; public Producer(BlockingQueue<String> drop) { this.drop = drop; } public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; Random random = new Random(); try { for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); Thread.sleep(random.nextInt(5000)); } drop.put("DONE"); } catch (InterruptedException e) {} } }import java.util.Random; import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable { private BlockingQueue<String> drop; public Consumer(BlockingQueue<String> drop) { this.drop = drop; } public void run() { Random random = new Random(); try { for (String message = drop.take(); ! message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); Thread.sleep(random.nextInt(5000)); } } catch (InterruptedException e) {} } }import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;public class ProducerConsumerExample { public static void main(String[] args) { BlockingQueue<String> drop = new SynchronousQueue<String> (); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
厨师和侍者 import java.util.concurrent.*; class Meal{ private final int orderNum; public Meal(int orderNum){this.orderNum=orderNum;} public String toString(){return "meal "+orderNum;} }class WaitPerson implements Runnable{ private Restaurant restaurant; public WaitPerson(Restaurant restaurant){this.restaurant=restaurant;} public void run(){ try{ while(!Thread.interrupted()){ synchronized(this){ while(restaurant.meal==null) wait(); } System.out.println("Waitperson got "+restaurant.meal); synchronized(restaurant.chef){ restaurant.meal = null; restaurant.chef.notifyAll();//ready for another } } }catch(InterruptedException e){ System.out.println("waitPerson interrupted"); } } } class Chef implements Runnable{ private Restaurant restaurant; private int count = 0; public Chef(Restaurant r){restaurant=r;} public void run(){ try{ while(!Thread.interrupted()){ synchronized(this){ while(restaurant.meal!=null){ wait();//wait for the meal to be taken } if(++count==10){ System.out.println("out of food,closing"); restaurant.exec.shutdownNow(); } System.out.println("Order up!"); synchronized(restaurant.waitPerson){ restaurant.meal = new Meal(count); restaurant.waitPerson.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100); } } }catch(InterruptedException e){ System.out.println("chef interrupted"); } } }public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); public Restaurant(){ exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); } }
共有5个类 分别是 1)生产者 2)消费者 3)窝头 4) 筐(用栈实现的) 5) Main/** * * @author */ public class Producer implements Runnable //生产者线程类 { SnycStack ss = null; public Producer(SnycStack ss) //因为生产者生产完,放入栈中,所以给构造方法传递 //一个栈的引用 { this.ss = ss; } public void run() { for(int i=0;i<20;i++) { WoTou wt = new WoTou(i); ss.push(wt); System.out.println("生产了: "+wt); try { Thread.sleep(1000); }catch(InterruptedException e) { e.printStackTrace(); } } } } /*******************************************/ /** * * @author */ public class Consumer implements Runnable //消费者线程类 { SnycStack ss = null; public Consumer(SnycStack ss) { this.ss = ss; } public void run() { for(int i=0;i<20;i++) { WoTou wt = ss.pop(); System.out.println("消费了: "+wt); try { Thread.sleep(1000); }catch(InterruptedException e) { e.printStackTrace(); } } } } /************************************/ /** * * @author */ public class WoTou { int ID; WoTou(int ID) { this.ID = ID; } public String toString() { return "Wotou "+ID; } } /*********************************/ /** * * @author */ public class SnycStack { int index = 0; WoTou []arrwt = new WoTou[6]; public synchronized void push(WoTou wt) //向栈里放 { while(index == arrwt.length) try { this.wait(); }catch(InterruptedException e) { e.printStackTrace(); } this.notify(); arrwt[index++] = wt; } public synchronized WoTou pop() //从栈里取 { while(index==0) try { this.wait(); }catch(InterruptedException e) { e.printStackTrace(); } this.notify(); index--; return arrwt[index]; } } /************************************/ /** * * @author */ public class ProducerConsumer { public static void main(String []args) { SnycStack ss = new SnycStack(); Producer p = new Producer(ss); Consumer c = new Consumer(ss); Thread pro = new Thread(p); Thread con = new Thread(c); pro.start(); con.start(); } } 共三个线程 Main 、Producer 和 Consumer 注意要用 synchronized 使其同步
#include <windows.h>
#include <iostream>
const unsigned short SIZE_OF_BUFFER = 10; //缓冲区长度
unsigned short ProductID = 0; //产品号
unsigned short ConsumeID = 0; //将被消耗的产品号
unsigned short in = 0; //产品进缓冲区时的缓冲区下标
unsigned short out = 0; //产品出缓冲区时的缓冲区下标
int g_buffer[SIZE_OF_BUFFER]; //缓冲区是个循环队列
bool g_continue = true; //控制程序结束
HANDLE g_hMutex; //用于线程间的互斥
HANDLE g_hFullSemaphore; //当缓冲区满时迫使生产者等待
HANDLE g_hEmptySemaphore; //当缓冲区空时迫使消费者等待
DWORD WINAPI Producer(LPVOID); //生产者线程
DWORD WINAPI Consumer(LPVOID); //消费者线程
int main()
{
//创建各个互斥信号
g_hMutex = CreateMutex(NULL,FALSE,NULL);
g_hFullSemaphore = CreateSemaphore(NULL,SIZE_OF_BUFFER-1,SIZE_OF_BUFFER-1,NULL);
g_hEmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);
//调整下面的数值,可以发现,当生产者个数多于消费者个数时,
//生产速度快,生产者经常等待消费者;反之,消费者经常等待
const unsigned short PRODUCERS_COUNT = 3; //生产者的个数
const unsigned short CONSUMERS_COUNT = 1; //消费者的个数
//总的线程数
const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT;
HANDLE hThreads[PRODUCERS_COUNT]; //各线程的handle
DWORD producerID[CONSUMERS_COUNT]; //生产者线程的标识符
DWORD consumerID[THREADS_COUNT]; //消费者线程的标识符
//创建生产者线程
for (int i=0;i<PRODUCERS_COUNT;++i){
hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]);
if (hThreads[i]==NULL) return -1;
}
//创建消费者线程
for (int i=0;i<CONSUMERS_COUNT;++i){
hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
if (hThreads[i]==NULL) return -1;
}
while(g_continue){
if(getchar()){ //按回车后终止程序运行
g_continue = false;
}
}
return 0;
}
//生产一个产品。简单模拟了一下,仅输出新产品的ID号
void Produce()
{
std::cerr << "Producing " << ++ProductID << " ... ";
std::cerr << "Succeed" << std::endl;
}
//把新生产的产品放入缓冲区
void Append()
{
std::cerr << "Appending a product ... ";
g_buffer[in] = ProductID;
in = (in+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;
//输出缓冲区当前的状态
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 生产";
if (i==out) std::cout << " <-- 消费";
std::cout << std::endl;
}
}
//从缓冲区中取出一个产品
void Take()
{
std::cerr << "Taking a product ... ";
ConsumeID = g_buffer[out];
out = (out+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;
//输出缓冲区当前的状态
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 生产";
if (i==out) std::cout << " <-- 消费";
std::cout << std::endl;
}
}
//消耗一个产品
void Consume()
{
std::cerr << "Consuming " << ConsumeID << " ... ";
std::cerr << "Succeed" << std::endl;
}
//生产者
DWORD WINAPI Producer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hFullSemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Produce();
Append();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
}
return 0;
}
//消费者
DWORD WINAPI Consumer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hEmptySemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Take();
Consume();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hFullSemaphore,1,NULL);
}
return 0;
}
{
Queue q;
Producter (Queue q)
{
this.q=q;
}
public void run()
{
for(int i=0;i<10;i++)
{
q.put(i);
System.out.println("producter :"+i);
}
}
}
class Consumer extends Thread
{
Queue q;
Consumer(Queue q)
{
this.q=q;
}
public void run()
{
while(true)
{
System.out.println("Consumer:"+q.get());
}
}
}
class Queue //key
{
int value;
boolean bFull=false;
public synchronized void put(int i)
{
if(!bFull)
{
value=i;
bFull=true;
notify();//必须用在synchronized
}
try {
wait();//必须捕获异常 // 等待消费者取走数据
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public synchronized int get()
{
if(!bFull)
try
{
wait(); // 等待生产者写入数据
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
bFull=false; // 通知生产者数据已经被取走,可以再次写入数据
notify();
return value;
}
}
public class test //测试类
{
public static void main(String[] args)
{
Queue q=new Queue();
Producter p=new Producter(q);
Consumer c=new Consumer(q);
p.start();
c.start();
}
}
问题补充:System.out.println("Consumer:"+q.get());
}
}
}
class Queue //key
{
int value;
boolean bFull=false;
public synchronized void put(int i)
{
if(!bFull)
{
value=i;
bFull=true;
notify();//必须用在synchronized
}
try {
wait();//必须捕获异常 // 等待消费者取走数据
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public synchronized int get()
{
if(!bFull)
try
{
wait(); // 等待生产者写入数据
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
bFull=false; // 通知生产者数据已经被取走,可以再次写入数据
notify();
return value;
}
}
public class test //测试类
{
public static void main(String[] args)
{
Queue q=new Queue();
Producter p=new Producter(q);
Consumer c=new Consumer(q);
p.start();
c.start();
}
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;public class Producer implements Runnable {
private BlockingQueue<String> drop; public Producer(BlockingQueue<String> drop) {
this.drop = drop;
} public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
Random random = new Random(); try {
for (int i = 0; i < importantInfo.length; i++) {
drop.put(importantInfo[i]);
Thread.sleep(random.nextInt(5000));
}
drop.put("DONE");
} catch (InterruptedException e) {}
}
}import java.util.Random;
import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable {
private BlockingQueue<String> drop; public Consumer(BlockingQueue<String> drop) {
this.drop = drop;
} public void run() {
Random random = new Random();
try {
for (String message = drop.take(); ! message.equals("DONE");
message = drop.take()) {
System.out.format("MESSAGE RECEIVED: %s%n", message);
Thread.sleep(random.nextInt(5000));
}
} catch (InterruptedException e) {}
}
}import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<String> drop = new SynchronousQueue<String> ();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
import java.util.concurrent.*;
class Meal{
private final int orderNum;
public Meal(int orderNum){this.orderNum=orderNum;}
public String toString(){return "meal "+orderNum;}
}class WaitPerson implements Runnable{
private Restaurant restaurant;
public WaitPerson(Restaurant restaurant){this.restaurant=restaurant;}
public void run(){
try{
while(!Thread.interrupted()){
synchronized(this){
while(restaurant.meal==null)
wait();
}
System.out.println("Waitperson got "+restaurant.meal);
synchronized(restaurant.chef){
restaurant.meal = null;
restaurant.chef.notifyAll();//ready for another
}
}
}catch(InterruptedException e){
System.out.println("waitPerson interrupted");
}
}
}
class Chef implements Runnable{
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r){restaurant=r;}
public void run(){
try{
while(!Thread.interrupted()){
synchronized(this){
while(restaurant.meal!=null){
wait();//wait for the meal to be taken
}
if(++count==10){
System.out.println("out of food,closing");
restaurant.exec.shutdownNow();
}
System.out.println("Order up!");
synchronized(restaurant.waitPerson){
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
}
}catch(InterruptedException e){
System.out.println("chef interrupted");
}
}
}public class Restaurant
{
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant(){
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args)
{
new Restaurant();
}
}
*
* @author
*/
public class Producer implements Runnable //生产者线程类
{
SnycStack ss = null;
public Producer(SnycStack ss) //因为生产者生产完,放入栈中,所以给构造方法传递 //一个栈的引用
{
this.ss = ss;
} public void run()
{
for(int i=0;i<20;i++)
{
WoTou wt = new WoTou(i);
ss.push(wt);
System.out.println("生产了: "+wt);
try
{
Thread.sleep(1000);
}catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
}
/*******************************************/
/**
*
* @author
*/
public class Consumer implements Runnable //消费者线程类
{
SnycStack ss = null; public Consumer(SnycStack ss)
{
this.ss = ss;
} public void run()
{
for(int i=0;i<20;i++)
{
WoTou wt = ss.pop();
System.out.println("消费了: "+wt);
try
{
Thread.sleep(1000);
}catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
}
/************************************/
/**
*
* @author
*/
public class WoTou
{
int ID;
WoTou(int ID)
{
this.ID = ID;
}
public String toString()
{
return "Wotou "+ID;
}
}
/*********************************/
/**
*
* @author
*/
public class SnycStack
{
int index = 0;
WoTou []arrwt = new WoTou[6];
public synchronized void push(WoTou wt) //向栈里放
{
while(index == arrwt.length)
try
{
this.wait();
}catch(InterruptedException e)
{
e.printStackTrace();
}
this.notify();
arrwt[index++] = wt;
}
public synchronized WoTou pop() //从栈里取
{
while(index==0)
try
{
this.wait();
}catch(InterruptedException e)
{
e.printStackTrace();
}
this.notify();
index--;
return arrwt[index];
}
}
/************************************/
/**
*
* @author
*/
public class ProducerConsumer
{
public static void main(String []args)
{
SnycStack ss = new SnycStack();
Producer p = new Producer(ss);
Consumer c = new Consumer(ss);
Thread pro = new Thread(p);
Thread con = new Thread(c);
pro.start();
con.start();
}
}
共三个线程 Main 、Producer 和 Consumer
注意要用 synchronized 使其同步
private Thread customer,maker;
private Products pd;
public CustomerAndMaker(){
customer=new Thread(this); //创建线程对象
maker=new Thread(this);
customer.setName("customer");//给线程对象取名字
maker.setName("maker");
}
public void run() {
buy();
}
public void buy() {
if(Thread.currentThread()==customer) {
maker.join();
//maker线程调用join()方法,则customer线程立即停止执行,把CPU使用权交给maker线程,
//必须等maker线程执行完毕才会继续执行。如果maker线程已经死亡,则执行maker.join()语句不会有任何效果;
System.out.println("buy Sucess!"+pd.getName());
}else (Thread.currentThread()==maker) {
System.out.println("produce ...");
try {
this.sleep(3000); //假设用时3秒生产产品
}catch(InterrputedException e) {
//处理异常的语句
}
pd=new Products("中南海");
}
}
public static void main(String[] args) {
CustomerAndMaker cam=new CustomerAndMaker();
cam.customer.start();
cam.maker.start();
}
}public class Products { //产品类,根据实际情况写
private String name;
private Products(String name) {
setName(name);
}
public void setName(String name) {
this.name=name;
}
public String getName() {
return name;
}
}