一个Thread缓冲池可以设计成以下这样:缓冲池由几个工作Thread和一个Queue组成,Client负责把任务放到Queue里面(put方法),而工作Thread就依次取出这些任务并执行它们(get方法)。Queue的一个经典实现是使用一个循环数组(这个实现在很多数据结构的书上都有介绍),如下图所示:
图示是一个大小为size的数组,这个循环数组可以被想象成首尾相连的一个环。oldest指向Queue中最老的数据所在的位置,next指向下一个可以放新数据的位置。放入一个新数据到next的位置后,需要更新next:next = (next + 1) % size;
从oldest位置取出一个数据后,需要更新oldest:oldest = (oldest + 1) % size;
当oldest == next的时候,Queue为空,
当(next + 1) % size == oldest的时候,Queue为满。
(注意:为了区分Queue为空和为满的情况,实际上Queue里面最多能放size-1个数据。)因为这个Queue会同时被多个线程访问,需要考虑在这种情况下Queue如何工作。首先,Queue需要是线程安全的,可以用Java里的synchronized关键字来确保同时只有一个Thread在访问Queue.我们还可以注意到当Queue为空的时候,get操作是无法进行的;当Queue为满的时候,put操作又是无法进行的。在多线程访问遇到这种情况时,一般希望执行操作的线程可以等待(block)直到该操作可以进行下去。比如,但一个Thread在一个空Queue上执行get方法的时候,这个Thread应当等待(block),直到另外的Thread执行该Queue的put方法后,再继续执行下去。在Java里面,Object对象的wait(),notify()方法提供了这样的功能。把上面的内容结合起来,就是一个SyncQueue的类:public class SyncQueue { public SyncQueue(int size) { _array = new Object[size]; _size = size; _oldest = 0; _next = 0; } public synchronized void put(Object o) { while (full()) { try { wait(); } catch (InterruptedException ex) { throw new ExceptionAdapter(ex); } } _array[_next] = o; _next = (_next + 1) % _size; notify(); } public synchronized Object get() { while (empty()) { try { wait(); } catch (InterruptedException ex) { throw new ExceptionAdapter(ex); } } Object ret = _array[_oldest]; _oldest = (_oldest + 1) % _size; notify(); return ret; } protected boolean empty() { return _next == _oldest; }
protected boolean full() { return (_next + 1) % _size == _oldest; } protected Object [] _array; protected int _next; protected int _oldest; protected int _size;}
可以注意一下get和put方法中while的使用,如果换成if是会有问题的。这是个很容易犯的错误。;-)
在以上代码中使用了ExceptionAdapter这个类,它的作用是把一个checked Exception包装成RuntimeException。详细的说明可以参考我的避免在Java中使用Checked Exception一文。接下来我们需要一个对象来表现Thread缓冲池所要执行的任务。可以发现JDK中的Runnable interface非常合适这个角色。最后,剩下工作线程的实现就很简单了:从SyncQueue里取出一个Runnable对象并执行它。public class Worker implements Runnable { public Worker(SyncQueue queue) { _queue = queue; }
public void run() { while (true) { Runnable task = (Runnable) _queue.get(); task.run(); } }
protected SyncQueue _queue = null;}我的问题就是这里他使用了
public class Worker implements Runnable {
里面的
Runnable task = (Runnable) _queue.get(); task.run();
为什么要把取出来的东西付给Runnable task?这样写的意义是什么?
我怎样才能把取出来的东西付给我想给的Object?
图示是一个大小为size的数组,这个循环数组可以被想象成首尾相连的一个环。oldest指向Queue中最老的数据所在的位置,next指向下一个可以放新数据的位置。放入一个新数据到next的位置后,需要更新next:next = (next + 1) % size;
从oldest位置取出一个数据后,需要更新oldest:oldest = (oldest + 1) % size;
当oldest == next的时候,Queue为空,
当(next + 1) % size == oldest的时候,Queue为满。
(注意:为了区分Queue为空和为满的情况,实际上Queue里面最多能放size-1个数据。)因为这个Queue会同时被多个线程访问,需要考虑在这种情况下Queue如何工作。首先,Queue需要是线程安全的,可以用Java里的synchronized关键字来确保同时只有一个Thread在访问Queue.我们还可以注意到当Queue为空的时候,get操作是无法进行的;当Queue为满的时候,put操作又是无法进行的。在多线程访问遇到这种情况时,一般希望执行操作的线程可以等待(block)直到该操作可以进行下去。比如,但一个Thread在一个空Queue上执行get方法的时候,这个Thread应当等待(block),直到另外的Thread执行该Queue的put方法后,再继续执行下去。在Java里面,Object对象的wait(),notify()方法提供了这样的功能。把上面的内容结合起来,就是一个SyncQueue的类:public class SyncQueue { public SyncQueue(int size) { _array = new Object[size]; _size = size; _oldest = 0; _next = 0; } public synchronized void put(Object o) { while (full()) { try { wait(); } catch (InterruptedException ex) { throw new ExceptionAdapter(ex); } } _array[_next] = o; _next = (_next + 1) % _size; notify(); } public synchronized Object get() { while (empty()) { try { wait(); } catch (InterruptedException ex) { throw new ExceptionAdapter(ex); } } Object ret = _array[_oldest]; _oldest = (_oldest + 1) % _size; notify(); return ret; } protected boolean empty() { return _next == _oldest; }
protected boolean full() { return (_next + 1) % _size == _oldest; } protected Object [] _array; protected int _next; protected int _oldest; protected int _size;}
可以注意一下get和put方法中while的使用,如果换成if是会有问题的。这是个很容易犯的错误。;-)
在以上代码中使用了ExceptionAdapter这个类,它的作用是把一个checked Exception包装成RuntimeException。详细的说明可以参考我的避免在Java中使用Checked Exception一文。接下来我们需要一个对象来表现Thread缓冲池所要执行的任务。可以发现JDK中的Runnable interface非常合适这个角色。最后,剩下工作线程的实现就很简单了:从SyncQueue里取出一个Runnable对象并执行它。public class Worker implements Runnable { public Worker(SyncQueue queue) { _queue = queue; }
public void run() { while (true) { Runnable task = (Runnable) _queue.get(); task.run(); } }
protected SyncQueue _queue = null;}我的问题就是这里他使用了
public class Worker implements Runnable {
里面的
Runnable task = (Runnable) _queue.get(); task.run();
为什么要把取出来的东西付给Runnable task?这样写的意义是什么?
我怎样才能把取出来的东西付给我想给的Object?
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货