java 多线程 BlockingQueue 本帖最后由 snailxr 于 2011-08-24 13:32:14 编辑 解决方案 » 免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货 你put的元素,被take掉了BlockingQueue,当queue是空的时候,take会发生等待,刚开始的时候,你的TakeThread有一个会在queue.take处等待,其他的TakeThread在synchronized(lock)处等待,因为TakeThread和PutThread的lock是不同的,所以PutThread执行的时候,有一个线程进入synchronized(lock),执行queue.put,并打印Thread-6 add 8 ......1,此时下一个PutThread还没来得及执行,在queue.take处等待的TakeThread线程先执行了,把add的东西take掉了,所以,下一个PutThread执行的时,再add打印的时候,造成size没有改变,即Thread-6 add 9 ......1 +1, LZ这里上不上锁都无所谓, 反正一个放一个取就对了, 集合为空时所有的take线程等待,一旦有线程put了数据,那么就看哪个线程下手快哪个线程就把数据拿到! 再请教一下,如果我put 或 take后想立即正确的输出queue中剩余的数据该怎么呢? 说的对,就是这个原因。BlockingQueue是线程安全的,不需要synchronized。 这个还真的想一会!这个问题我大概想了下, 貌似你这种要求很难得实现! 因为put和take线程一直都是存在的, 集合是空的时候take线程就一直是阻塞状态, 当刚有put线程放的时候take线程就拿走了, 而拿的同时put又可能会往里面放! 所以LZ,我只是粗略的想了下是实现不了! 除非采取某种特殊的手段处理下!大致给你的思路吧, 你要实现这个就不要用它的阻塞队列了, 你自己写一个线程间的通信, 当put线程put完数据并且打印当前集合中的值以后再唤take线程去取, take那边也是如此! 就是打印完之后再notity对方! //如果你真想在把take方法和size方法同步的话,只能使用ArrayBlockingQueue的内部锁才可以做到。public class BlockingQueueThread { public static ReentrantLock getLock(BlockingQueue queue) throws Exception { Field fields[] = queue.getClass().getDeclaredFields(); for (Field f : fields) { f.setAccessible(true); if (f.getName().equals("lock")) { return (ReentrantLock) f.get(queue); } } return null; } public static void main(String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue(500); ExecutorService pool = Executors.newFixedThreadPool(15); PutThread[] blockThread = new PutThread[10]; TakeThread[] takeThread = new TakeThread[5]; Object lock = new Object(); Object takeLock = new Object(); try { lock = getLock(queue); takeLock = lock; } catch (Exception e) { e.printStackTrace(); } for (int i = 0; i < takeThread.length; i++) { takeThread[i] = new TakeThread(queue, "thread" + i, takeLock); pool.execute(takeThread[i]); } for (int i = 0; i < blockThread.length; i++) { blockThread[i] = new PutThread(queue, "thread" + i, lock); pool.execute(blockThread[i]); } pool.shutdown(); }}class PutThread extends Thread { private BlockingQueue queue; private String name; private Object lock; public PutThread(BlockingQueue queue, String name, Object lock) { this.queue = queue; this.name = name; this.lock = lock; } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } // synchronized (lock) { try { ((ReentrantLock) lock).lock(); Integer i = new Random().nextInt(10) + 1; try { queue.put(i);// 此处应用put take 。如果用add时,成功返回true,失败则抛出异常 } catch (InterruptedException ex) { Logger.getLogger(PutThread.class.getName()).log( Level.SEVERE, null, ex); } System.out.println(this.getName() + " add " + i + " ......" + queue.size()); } catch (Exception e) { e.printStackTrace(); } finally { ((ReentrantLock) lock).unlock(); } // } } }}class TakeThread extends Thread { private BlockingQueue queue; private String name; private Object lock; public TakeThread(BlockingQueue queue, String name, Object lock) { this.queue = queue; this.name = name; this.lock = lock; } @Override public void run() { while (true) { try { Thread.sleep(1000); } catch (Exception ex) { ex.printStackTrace(); } // synchronized (lock) { try { ((ReentrantLock) lock).lock(); Integer i = 0; try { i = (Integer) queue.take(); } catch (InterruptedException ex) { Logger.getLogger(PutThread.class.getName()).log( Level.SEVERE, null, ex); } System.out.println(this.getName() + " take " + i + " ......" + queue.size()); } catch (Exception e) { e.printStackTrace(); } finally { ((ReentrantLock) lock).unlock(); } // } } }} 是阿, 但是这样就失去了阻塞队列的意义了! 比如说现在put线程在put, 拿到了锁, put完之后释放锁, 下次又是put拿到锁,那么take就一直处于等待中! 不知道LZ这样搞的目的是啥! - -! 别用put 和 take 方法了,用下面这两个方法 你看行不行。可以自己控制等待时间offer(E o, long timeout, TimeUnit unit) 将指定的元素插入此队列中,如果没有可用空间,将等待指定的等待时间(如果有必要)。poll(long timeout, TimeUnit unit) 检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)。 参考这篇文章的介绍:http://hi.baidu.com/fangpw/blog/item/1df8b8efc2bb18e2cf1b3e11.html 数组初始化 突然又有点不懂了 split (String) 函数 作何用途?? 有谁能给我一个,表单选择时间的 annotation 关于JTable中的Jbutton 在连接本地Oracle时如何配置Struts-config.xml??? 帮忙看看数组问题 如何设置JToolTip显示为多行? 可不可以用多线程思想做一个纯随机数? applet为什么不能够正常显示?? java 多线程 BlockingQueue ??????? Java 运行不了,出现"Error: could not find java.dll" 怎么解决?
BlockingQueue,当queue是空的时候,take会发生等待,刚开始的时候,你的TakeThread有一个会在queue.take处等待,其他的TakeThread在synchronized(lock)处等待,因为TakeThread和PutThread的lock是不同的,所以PutThread执行的时候,有一个线程进入synchronized(lock),执行queue.put,并打印Thread-6 add 8 ......1,此时下一个PutThread还没来得及执行,在queue.take处等待的TakeThread线程先执行了,把add的东西take掉了,所以,下一个PutThread执行的时,再add打印的时候,造成size没有改变,即Thread-6 add 9 ......1
LZ这里上不上锁都无所谓, 反正一个放一个取就对了, 集合为空时所有的take线程等待,一旦有线程put了数据,那么就看哪个线程下手快哪个线程就把数据拿到!
说的对,就是这个原因。BlockingQueue是线程安全的,不需要synchronized。
这个问题我大概想了下, 貌似你这种要求很难得实现! 因为put和take线程一直都是存在的, 集合是空的时候take线程就一直是阻塞状态, 当刚有put线程放的时候take线程就拿走了, 而拿的同时put又可能会往里面放! 所以LZ,我只是粗略的想了下是实现不了! 除非采取某种特殊的手段处理下!大致给你的思路吧, 你要实现这个就不要用它的阻塞队列了, 你自己写一个线程间的通信, 当put线程put完数据并且打印当前集合中的值以后再唤take线程去取, take那边也是如此! 就是打印完之后再notity对方!
//如果你真想在把take方法和size方法同步的话,只能使用ArrayBlockingQueue的内部锁才可以做到。
public class BlockingQueueThread { public static ReentrantLock getLock(BlockingQueue queue) throws Exception {
Field fields[] = queue.getClass().getDeclaredFields();
for (Field f : fields) {
f.setAccessible(true);
if (f.getName().equals("lock")) {
return (ReentrantLock) f.get(queue);
}
}
return null;
}
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(500);
ExecutorService pool = Executors.newFixedThreadPool(15);
PutThread[] blockThread = new PutThread[10];
TakeThread[] takeThread = new TakeThread[5];
Object lock = new Object();
Object takeLock = new Object();
try {
lock = getLock(queue);
takeLock = lock;
} catch (Exception e) {
e.printStackTrace();
}
for (int i = 0; i < takeThread.length; i++) {
takeThread[i] = new TakeThread(queue, "thread" + i, takeLock);
pool.execute(takeThread[i]);
}
for (int i = 0; i < blockThread.length; i++) {
blockThread[i] = new PutThread(queue, "thread" + i, lock);
pool.execute(blockThread[i]);
}
pool.shutdown();
}
}class PutThread extends Thread { private BlockingQueue queue;
private String name;
private Object lock; public PutThread(BlockingQueue queue, String name, Object lock) {
this.queue = queue;
this.name = name;
this.lock = lock;
} @Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
// synchronized (lock) {
try {
((ReentrantLock) lock).lock();
Integer i = new Random().nextInt(10) + 1;
try {
queue.put(i);// 此处应用put take 。如果用add时,成功返回true,失败则抛出异常
} catch (InterruptedException ex) {
Logger.getLogger(PutThread.class.getName()).log(
Level.SEVERE, null, ex);
}
System.out.println(this.getName() + " add " + i + " ......"
+ queue.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
((ReentrantLock) lock).unlock();
}
// }
} }
}class TakeThread extends Thread { private BlockingQueue queue;
private String name;
private Object lock; public TakeThread(BlockingQueue queue, String name, Object lock) {
this.queue = queue;
this.name = name;
this.lock = lock;
} @Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
// synchronized (lock) {
try {
((ReentrantLock) lock).lock();
Integer i = 0;
try {
i = (Integer) queue.take();
} catch (InterruptedException ex) {
Logger.getLogger(PutThread.class.getName()).log(
Level.SEVERE, null, ex);
}
System.out.println(this.getName() + " take " + i + " ......"
+ queue.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
((ReentrantLock) lock).unlock();
}
// }
}
}
}
用下面这两个方法 你看行不行。可以自己控制等待时间
offer(E o, long timeout, TimeUnit unit)
将指定的元素插入此队列中,如果没有可用空间,将等待指定的等待时间(如果有必要)。poll(long timeout, TimeUnit unit)
检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)。