public Node(Object pair) { this.obj = pair; next=null; }
public Object getObj() { return obj; }
public void setObj(Object obj) { this.obj = obj; }
public Node getNext() { return next; }
public void setNext(Node next) { this.next = next; }
}
public ObjectQueue() { size=0; head=null; tail=null; }
/** * Append o to me. * * @param o */ public synchronized void enqueue(Object o) { Node n=new Node(o); if(empty()){ head=n; tail=n; size=1; notifyAll(); } else {//size should not be little than 0 tail.setNext(n); tail=n; size++; } }
ArrayList类中所有方法都是非同步的。
首先实现一个FIFO的队列
下面是一个接口
package info.bioz.util;
/**
* <p>File: Queue.java</p>
* <p>Description: A Queue, i.e., a simple FIFO list. </p>
* <p>BIOZ.info Copyright (c) 2004</p>
*
* @author <a href="mailto:[email protected]">Chance</a>
*/
public interface Queue { /**
* Append o to me.
* @param o
*/
void enqueue(Object o); /**
* Remove and return my front element. Requires: size() != 0.
* @return
*/
Object dequeue(); /**
* Return my front element. Requires: size() != 0.
* @return
*/
Object head(); /**
* Return the number of elements I contain.
* @return
*/
int size();} // Queue
package info.bioz.util;/**
* <p>File: ObjectQueue.java</p>
* <p>Description: </p>
* <p><a href="http://www.bioz.info/">BIOZ.info</a> Copyright (c) 2004</p>
*
* @author <a href="mailto:[email protected]">Chance</a>
*/
public class ObjectQueue implements Queue {
private int size;
private Node head,tail;
private class Node{
Object obj;
Node next;
public Node(Object pair) {
this.obj = pair;
next=null;
}
public Object getObj() {
return obj;
}
public void setObj(Object obj) {
this.obj = obj;
}
public Node getNext() {
return next;
}
public void setNext(Node next) {
this.next = next;
}
}
public ObjectQueue() {
size=0;
head=null;
tail=null;
}
/**
* Append o to me.
*
* @param o
*/
public synchronized void enqueue(Object o) {
Node n=new Node(o);
if(empty()){
head=n;
tail=n;
size=1;
notifyAll();
} else {//size should not be little than 0
tail.setNext(n);
tail=n;
size++;
}
}
/**
* Remove and return my front element. Requires: size() != 0.
*
* @return
*/
public synchronized Object dequeue() {
while(empty()){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object p;
p=head.getObj();
head=head.getNext();
size--;
return p;
}
/**
* Return my front element. Requires: size() != 0.
*
* @return
*/
public synchronized Object head() {
if(empty()) return null;
return this.head.getObj();
}
/**
* Return the number of elements I contain.
*
* @return
*/
public synchronized int size() {
return this.size;
}
public synchronized boolean empty() {
if(size()==0) return true;
return false;
}
}
package info.bioz.util;import java.util.Random;/**
* <p>File: ProducerThread.java</p>
* <p>Description: </p>
* <p><a href="http://www.bioz.info/">BIOZ.info</a> Copyright (c) 2004</p>
*
* @author <a href="mailto:[email protected]">Chance</a>
*/
public class ProducerThread extends Thread {
ObjectQueue queue;
int total;
public ProducerThread(ObjectQueue queue,String s){
this(queue,null,s);
} public ProducerThread(ObjectQueue queue, ThreadGroup group, String s) {
super(group,s);
this.queue=queue;
total=0;
} public synchronized void run(){
while(true){
System.out.println(getName()+">>producing ...");
produce();
try {
double l=Math.random();
System.out.println(getName()+">>tea break ...");
sleep((long) (l*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} private void produce(){
Object o=new Object();
queue.enqueue(o);
total++;
System.out.println(getName()+">>product number: "+total);
} public synchronized ObjectQueue getQueue() {
return queue;
} public synchronized void setQueue(ObjectQueue queue) {
this.queue = queue;
}}
多谢了,请继续写,我给你分!!
private Elem head = new Elem(null);
private Elem tail; public Queue() {
//head.setNext(tail);
tail = head;
} public void write(Elem e) {
e.setNext(null);
tail.setNext(e);
synchronized (tail) {
tail = e;
}
if (head.getNext() == tail) {
this.notify();
}
} public Elem read() throws InterruptedException {
//链表为空的时候,阻塞
Elem tmp;
if (tail == head) {
this.wait();
}
tmp = head.getNext();
synchronized (tail) {
if (tmp == tail) {
//当为最后一个元素,设置尾结点
tail = head;
}
} head.setNext(tmp.getNext());
return tmp;
}
}public class Elem {
private Elem next;
private Object data;
public Elem(Elem e) {
this.next = null;
}
public Elem getNext() {
return next;
}
public void setNext(Elem next) {
this.next = next;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}}
用这个就够了,比较高效。
消费者线程
package info.bioz.util;/**
* <p>File: ConsumerThread.java</p>
* <p>Description: </p>
* <p><a href="http://www.bioz.info/">BIOZ.info</a> Copyright (c) 2004</p>
*
* @author <a href="mailto:[email protected]">Qiang Wang</a>
*/
public class ConsumerThread extends Thread {
ObjectQueue queue;
int total;
public ConsumerThread(ObjectQueue queue,String s){
this(queue,null,s);
} public ConsumerThread(ObjectQueue queue, ThreadGroup group, String s) {
super(group,s);
this.queue=queue;
total=0;
} public synchronized void run(){
while(true){
consume();
}
} private void consume(){
queue.dequeue();
System.out.println(getName()+">>consuming ...");
total++;
System.out.println(getName()+">>product number: "+total);
} public synchronized ObjectQueue getQueue() {
return queue;
} public synchronized void setQueue(ObjectQueue queue) {
this.queue = queue;
}}
package info.bioz.util;/**
* <p>File: Society.java</p>
* <p>Description: </p>
* <p><a href="http://www.bioz.info/">BIOZ.info</a> Copyright (c) 2004</p>
*
* @author <a href="mailto:[email protected]">Qiang Wang</a>
*/
public class Society {
public static void main(String[] args) {
ObjectQueue oq=new ObjectQueue();
ProducerThread pt1=new ProducerThread(oq,"PRODUCER1");
ProducerThread pt2=new ProducerThread(oq,"PRODUCER2");
ConsumerThread ct1=new ConsumerThread(oq,"CONSUMER1");
ConsumerThread ct2=new ConsumerThread(oq,"CONSUMER2");
pt1.start();
ct1.start();
ct2.start();
}
}
另外修改一下ConsumerThread.java
public synchronized void run(){
while(true){
consume();
}
} private void consume(){
queue.dequeue();
System.out.println(getName()+">>consuming ...");
total++;
System.out.println(getName()+">>product number: "+total);
}某一次的运行结果是:
PRODUCER1>>producing ...
PRODUCER2>>producing ...
PRODUCER1>>product number: 1
PRODUCER2>>product number: 1
CONSUMER1>>consuming ...
CONSUMER1>>product number: 1
CONSUMER2>>consuming ...
CONSUMER2>>product number: 1
PRODUCER1>>tea break ...
PRODUCER2>>tea break ...
PRODUCER2>>producing ...
PRODUCER2>>product number: 2
PRODUCER2>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 2
PRODUCER1>>producing ...
PRODUCER1>>product number: 2
PRODUCER1>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 3
PRODUCER2>>producing ...
PRODUCER2>>product number: 3
PRODUCER2>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 4
PRODUCER1>>producing ...
PRODUCER1>>product number: 3
PRODUCER1>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 5
PRODUCER1>>producing ...
PRODUCER1>>product number: 4
PRODUCER1>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 6
PRODUCER1>>producing ...
PRODUCER1>>product number: 5
PRODUCER1>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 7
PRODUCER2>>producing ...
PRODUCER2>>product number: 4
PRODUCER2>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 8
PRODUCER2>>producing ...
PRODUCER2>>product number: 5
PRODUCER2>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 9
PRODUCER2>>producing ...
PRODUCER2>>product number: 6
PRODUCER2>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 10
PRODUCER1>>producing ...
PRODUCER1>>product number: 6
PRODUCER1>>tea break ...
CONSUMER1>>consuming ...
CONSUMER1>>product number: 11
PRODUCER1>>producing ...
PRODUCER1>>product number: 7
PRODUCER1>>tea break ...
...呵呵,这个程序没有提供终止的方法,另外消费者线程比较贪婪,所以consumer1吃的比较饱,可以试试一个生产者多个消费者的情况。
自己在线程代码中控制同步。
public static List data=new LinkedList();
private Data(){}
private static Data instance;
public static Data getInstance(){
if (null==instance){
instance=new Data();
}
return instance;
} public static Object getData(int i){
return data.get(i);
}
public static void addData(Object o){
data.add(o);
}
public static void removeData(Object o){
data.remove(o);
}
public static int size(){
return data.size();
}
}class Provider extends Thread{
static Integer value=new Integer(0);
private String name;
public String getName(String name){
return name;
}
public Provider(String name){
this.name=name;
}
public void run(){
for (int i=0;i<10;i++){
synchronized(value){
int val=value.intValue();
val++;
value=new Integer(val);
System.out.println(name+": create "+val);
synchronized(Data.data){
System.out.println("In "+name+" : now size="+Data.size());
Data.addData(String.valueOf(val));
}
}
try{
Thread.sleep(50);
}catch(Exception ex){
ex.printStackTrace();
}
}
}
}class Consumer extends Thread{
public int count=0;
public void run(){
while (count<20){
synchronized (Data.data) {
System.out.println("In Consumer: now size="+Data.size());
while(Data.size()>0){
String str=(String)Data.getData(0);
Data.removeData(str);
System.out.println("Process : "+str);
count++;
}
}
try{
Thread.sleep(100);
}
catch(Exception ex){
ex.printStackTrace();
}
}
}
}
public class LinedListThreadTest {
public LinedListThreadTest() {
}
public static void main(String args[]){
Data.getInstance();
Provider provider1=new Provider("provider1");
provider1.start();
Provider provider2=new Provider("provider2");
provider2.start();
Consumer consumer=new Consumer();
consumer.start();
}
}