做出来的部分在下面,相信高手10分钟就能搞定吧.在线等待
public class Queue { private Semaphore sem = new Semaphore(1);
private Object[] ar;
private int SIZE;
private int index_begin = 0;
private int index_end = 0; public Queue(int size) {
SIZE = size;
ar = new Object[SIZE];
} public void q_put(Object o) {
if( SIZE == q_size()) {
try{
sem.sem_wait();
}catch(InterruptedException e){
}
}
System.out.println("PUT:"+o);
index_end = (index_end+1) % SIZE;
ar[index_end] = o;
System.out.println("Size after put:"+ q_size() );
sem.sem_post();
}
public Object q_get() {
return new Object();
}
public int q_size() {
return (index_end + SIZE - index_begin) % SIZE;
} public static void main(String[] args) {
// testing
Queue queue1 = new Queue(1);
}
}
public class Queue { private Semaphore sem = new Semaphore(1);
private Object[] ar;
private int SIZE;
private int index_begin = 0;
private int index_end = 0; public Queue(int size) {
SIZE = size;
ar = new Object[SIZE];
} public void q_put(Object o) {
if( SIZE == q_size()) {
try{
sem.sem_wait();
}catch(InterruptedException e){
}
}
System.out.println("PUT:"+o);
index_end = (index_end+1) % SIZE;
ar[index_end] = o;
System.out.println("Size after put:"+ q_size() );
sem.sem_post();
}
public Object q_get() {
return new Object();
}
public int q_size() {
return (index_end + SIZE - index_begin) % SIZE;
} public static void main(String[] args) {
// testing
Queue queue1 = new Queue(1);
}
}
{
private volatile int value = 0; public Semaphore (int initialValue)
{
if(initialValue >= 0)
value = initialValue;
else
System.out.println("Semaphore defaulted to 0");
} public synchronized void sem_post()
{
if(++value <= 0) notify(); // if some thread was waiting
} public synchronized void sem_wait() throws InterruptedException
{
if(--value < 0) wait(); // if no resource, join the waiting threads
}
}
class queue
{
private static final int MAX = 10; //set the queue length to 10
private String[] string= new String[MAX]; //define the string array
public int count,outs,ins;
Semaphore spaces,items; queue(int initcount)
{
if(0 >= MAX)
System.out.println("WARNING : Buffer size <= 0");
count = initcount;
outs = ins = 0;
spaces = new Semaphore(MAX);
items = new Semaphore(0);
} public synchronized void updatecount(int i)
{
count += i;
if(i >= 0)
{
ins += i;
string[(ins-1)%(MAX)]=("String number " + (ins-1));//make circular
System.out.print("Put: " + string[(ins-1)%(MAX)]);
//System.out.print(" ins="+ins);
q_size();
}
else
{
outs -= i;
System.out.print(" Get: " + string[(outs-1)%(MAX)]);
//System.out.print("outs="+outs);
q_size();
}
}
public synchronized void q_size()
{
System.out.println(" SIZE=" + count);
}
}class q_put extends Thread
{
queue current; public q_put(queue current)
{
this.current = current;
} public void run()
{ while(true)
{
/* Wait for a space to be available */
try
{
current.spaces.sem_wait();
}
catch (InterruptedException e)
{
System.out.println("Unexpected interruption");
}
/* Update the count - n.b. updatecount() is synchronized */
current.updatecount(1);
/* Signal that an item is available */
current.items.sem_post();
}
}
}
class q_get extends Thread
{ queue current; public q_get(queue current)
{
this.current = current;
} public void run()
{
while(true)
{
/* Wait for an item to be available */
try
{
current.items.sem_wait();
}
catch (InterruptedException e)
{
System.out.println("Unexpected interruption");
} /* Update the count - n.b. updatecount() is synchronized */
current.updatecount(-1); /* Signal that a space is available */
current.spaces.sem_post();
}
}
}
class solution2
{
public static void main(String[] args)
{
queue current = new queue(0); q_put put1 = new q_put(current);
q_put put2 = new q_put(current);
q_put put3 = new q_put(current); q_get get1 = new q_get(current);
q_get get2 = new q_get(current); put1.start();
put2.start();
put3.start(); get1.start();
get2.start(); }
}
final Queue queue = new Queue();
Random rand = new Random();
Thread t1 = new Thread(new Runnable() {
public void run() {
try{
Thread.sleep(1000 + System.currentTimeMillis() % 2000);
}catch(Exception e){}
queue.q_put(rand.next...);
}
});
Thread t2 = ...
t1.start();
t2.start();
t3.start();
}
“use a circular array (not Java library queue objects) to implement a queue and the following queue functions :”
原文是让你实现一个队列的实例并提供q_get()、q_put(Object obj)和q_size()这3个方法。
然后起3个典型的线程去调用此3个方法--典型的消费者(调用q_get())、生产者(调用q_put())和观察者(调用q_size())模式,主要可以检查线程间同步是否做得好。
而且不能用synchronized等关键字作同步之用、只能用它提供的Semaphore类来实现线程同步功能。
import java.io.*;class Semaphore {
private volatile int value = 0; public Semaphore (int initialValue){
if(initialValue >= 0) value = initialValue;
else System.out.println("Semaphore defaulted to 0");
} public synchronized void sem_post() {
if(++value <= 0) notify(); // if some thread was waiting
} public synchronized void sem_wait() throws InterruptedException {
if(--value < 0) wait(); // if no resource, join the waiting threads
}
}class Queue
{
private int MaxSize = 0;
private Object[] queue = null;
private int beginIndex = 0;
private int size = 0;
//用于同步多线程访问Queue对象
private Semaphore sem1 = null;
//用于存放等待有数据再get的线程
private Semaphore semGet = null;
//用于存放等待空闲再put数据的线程
private Semaphore semPut = null;
//等待线程计数
private int GetWaitCount = 0;
//存放线程计数
private int PutWaitCount = 0; public Queue()
{
this(10);
} public Queue(int size)
{
this.MaxSize = size;
queue = new Object[size];
beginIndex = 0;
this.size = 0;
sem1 = new Semaphore(1);
semGet = new Semaphore(0);
semPut = new Semaphore(0);
} public void q_put(Object obj)
{
//同步开始
sem1.sem_wait();
if ( this.size == this.MaxSize )
{
PutWaitCount ++;
sem1.sem_post();
semPut.sem_wait();
sem1.sem_wait();
}
/*
写放obj到数组queue的实现
*/
queue[size] = obj;
size++;
if ( GetWaitCount > 0 )
{
GetWaitCount --;
semGet.sem_post();
}
sem1.sem_post();
//同步结束
} public int q_size()
{
return size;
} public Object q_get()
{
Object obj = null;
sem1.sem_wait();
if ( this.size == 0 )
{
GetWaitCount++;
sem1.sem_post();
semGet.sem_wait();
sem1.sem_wait();
}
obj = queue[0];
size--;
System.arraycopy(queue,0,queue,1,size);
if ( PutWaitCount > 0 )
{
PutWaitCount --;
semPut.sem_post();
}
sem1.sem_post();
return obj;
}}class Producer extends Thread
{
private String name = null;
private Queue queue = null;
public Producer(String name,Queue queue)
{
this.name = name;
this.queue = queue;
} public void run()
{
if ( queue == null )
{ }
}}
class Consumer extends Thread
{
private String name = null;
private Queue queue = null;
public Consumer(String name,Queue queue)
{
this.name = name;
this.queue = queue;
} public void run()
{
if ( queue == null )
{ }
}}class Observer extends Thread
{
private String name = null;
private Queue queue = null;
public Observer(String name,Queue queue)
{
this.name = name;
this.queue = queue;
} public void run()
{
if ( queue == null )
{ }
}}public class test
{
public static void main(String[] args)
{ try
{ }
catch(Exception e)
{
e.printStackTrace();
}
}
}
那3个线程还没实现,看你好像很急的样子就先放出来了。那3个线程其实比较简单,就是一个不断地放东西到queue中,另外一个不断地取,还有一个不断地读size
还有是对InterruptException没有捕获,你自己考虑一下改怎么处理那些被异常中断的线程吧---理论上不会出现这种情况:)。
至于system.arraycopy那段主要是没时间去考虑利用beginIndex来识别头、尾。所以采用那个比较简单的方法--当然效率会差点,有时间也自己改了它吧:)希望能帮到你。日后有空我再完成这个程序,或者再讨论其他线程相关的编程技术吧--不局限于java的实现。
{
private volatile int value = 0; public Semaphore (int initialValue)
{
if(initialValue >= 0)
value = initialValue;
else
System.out.println("Semaphore defaulted to 0");
} public synchronized void sem_post()
{
if(++value <= 0) notify(); // if some thread was waiting
} public synchronized void sem_wait() throws InterruptedException
{
if(--value < 0) wait(); // if no resource, join the waiting threads
}
}
class queue
{
private static final int MAX = 10; //set the queue length to 10
private String[] string= new String[MAX]; //define the string array
public int count,outs,ins;
Semaphore spaces,items,mutex; queue(int initcount)
{
if(0 >= MAX)
System.out.println("WARNING : Buffer size <= 0");
count = initcount;
outs = ins = 0;
spaces = new Semaphore(MAX);
items = new Semaphore(0);
mutex= new Semaphore(1); //mutual exclusion
} public void updatecount(int i)
{
count += i;
if(i >= 0)
{
ins += i;
string[(ins-1)%(MAX)]=("String number " + (ins-1));//make circular
System.out.print("Put: " + string[(ins-1)%(MAX)]);
//System.out.print(" ins="+ins);
q_size();
}
else
{
outs -= i;
System.out.print(" Get: " + string[(outs-1)%(MAX)]);
//System.out.print("outs="+outs);
q_size();
}
}
public void q_size()
{
System.out.println(" SIZE=" + count);
}
}class q_put extends Thread
{
queue current; public q_put(queue current)
{
this.current = current;
} public void run()
{ while(true)
{
/* Wait for a space to be available */
try
{
current.spaces.sem_wait();
}
catch (InterruptedException e)
{
System.out.println("Unexpected interruption");
}
try
{
current.mutex.sem_wait();
}
catch (InterruptedException e)
{
System.out.println("Unexpected interruption");
}
/* Update the count - n.b. updatecount() is synchronized */
current.updatecount(1);
current.mutex.sem_post();
/* Signal that an item is available */
current.items.sem_post();
}
}
}
class q_get extends Thread
{ queue current; public q_get(queue current)
{
this.current = current;
} public void run()
{
while(true)
{
/* Wait for an item to be available */
try
{
current.items.sem_wait();
}
catch (InterruptedException e)
{
System.out.println("Unexpected interruption");
}
try
{
current.mutex.sem_wait();
}
catch (InterruptedException e)
{
System.out.println("Unexpected interruption");
}
current.updatecount(-1);
current.mutex.sem_post();
current.spaces.sem_post();
}
}
}
class solution3
{
public static void main(String[] args)
{
queue current = new queue(0); q_put put1 = new q_put(current);
q_put put2 = new q_put(current);
q_get get1 = new q_get(current);
q_get get2 = new q_get(current); put1.start();
put2.start(); get1.start();
get2.start(); }
}
by the way, where are you?