本帖最后由 snailxr 于 2011-08-24 13:32:14 编辑

解决方案 »

  1.   

    你put的元素,被take掉了
    BlockingQueue,当queue是空的时候,take会发生等待,刚开始的时候,你的TakeThread有一个会在queue.take处等待,其他的TakeThread在synchronized(lock)处等待,因为TakeThread和PutThread的lock是不同的,所以PutThread执行的时候,有一个线程进入synchronized(lock),执行queue.put,并打印Thread-6 add 8 ......1,此时下一个PutThread还没来得及执行,在queue.take处等待的TakeThread线程先执行了,把add的东西take掉了,所以,下一个PutThread执行的时,再add打印的时候,造成size没有改变,即Thread-6 add 9 ......1
      

  2.   

    +1, 
    LZ这里上不上锁都无所谓, 反正一个放一个取就对了, 集合为空时所有的take线程等待,一旦有线程put了数据,那么就看哪个线程下手快哪个线程就把数据拿到!
      

  3.   

    再请教一下,如果我put 或 take后想立即正确的输出queue中剩余的数据该怎么呢?
      

  4.   


    说的对,就是这个原因。BlockingQueue是线程安全的,不需要synchronized。
      

  5.   

    这个还真的想一会!
    这个问题我大概想了下, 貌似你这种要求很难得实现! 因为put和take线程一直都是存在的, 集合是空的时候take线程就一直是阻塞状态, 当刚有put线程放的时候take线程就拿走了,  而拿的同时put又可能会往里面放!  所以LZ,我只是粗略的想了下是实现不了!  除非采取某种特殊的手段处理下!大致给你的思路吧, 你要实现这个就不要用它的阻塞队列了, 你自己写一个线程间的通信, 当put线程put完数据并且打印当前集合中的值以后再唤take线程去取,  take那边也是如此! 就是打印完之后再notity对方!
      

  6.   


    //如果你真想在把take方法和size方法同步的话,只能使用ArrayBlockingQueue的内部锁才可以做到。
    public class BlockingQueueThread { public static ReentrantLock getLock(BlockingQueue queue) throws Exception {
    Field fields[] = queue.getClass().getDeclaredFields();
    for (Field f : fields) {
    f.setAccessible(true);
    if (f.getName().equals("lock")) {
    return (ReentrantLock) f.get(queue);
    }
    }
    return null;
    }
    public static void main(String[] args) throws Exception {
    BlockingQueue queue = new ArrayBlockingQueue(500);
    ExecutorService pool = Executors.newFixedThreadPool(15);
    PutThread[] blockThread = new PutThread[10];
    TakeThread[] takeThread = new TakeThread[5];
    Object lock = new Object();
    Object takeLock = new Object();
    try {
    lock = getLock(queue);
    takeLock = lock;
    } catch (Exception e) {
    e.printStackTrace();
    }
    for (int i = 0; i < takeThread.length; i++) {
    takeThread[i] = new TakeThread(queue, "thread" + i, takeLock);
    pool.execute(takeThread[i]);
    }
    for (int i = 0; i < blockThread.length; i++) {
    blockThread[i] = new PutThread(queue, "thread" + i, lock);
    pool.execute(blockThread[i]);
    }
    pool.shutdown();
    }
    }class PutThread extends Thread { private BlockingQueue queue;
    private String name;
    private Object lock; public PutThread(BlockingQueue queue, String name, Object lock) {
    this.queue = queue;
    this.name = name;
    this.lock = lock;
    } @Override
    public void run() {
    while (true) {
    try {
    Thread.sleep(1000);
    } catch (Exception ex) {
    ex.printStackTrace();
    }
    // synchronized (lock) {
    try {
    ((ReentrantLock) lock).lock();
    Integer i = new Random().nextInt(10) + 1;
    try {
    queue.put(i);// 此处应用put take 。如果用add时,成功返回true,失败则抛出异常
    } catch (InterruptedException ex) {
    Logger.getLogger(PutThread.class.getName()).log(
    Level.SEVERE, null, ex);
    }
    System.out.println(this.getName() + " add " + i + " ......"
    + queue.size());
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    ((ReentrantLock) lock).unlock();
    }
    // }
    } }
    }class TakeThread extends Thread { private BlockingQueue queue;
    private String name;
    private Object lock; public TakeThread(BlockingQueue queue, String name, Object lock) {
    this.queue = queue;
    this.name = name;
    this.lock = lock;
    } @Override
    public void run() {
    while (true) {
    try {
    Thread.sleep(1000);
    } catch (Exception ex) {
    ex.printStackTrace();
    }
    // synchronized (lock) {
    try {
    ((ReentrantLock) lock).lock();
    Integer i = 0;
    try {
    i = (Integer) queue.take();
    } catch (InterruptedException ex) {
    Logger.getLogger(PutThread.class.getName()).log(
    Level.SEVERE, null, ex);
    }
    System.out.println(this.getName() + " take " + i + " ......"
    + queue.size());
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    ((ReentrantLock) lock).unlock();
    }
    // }
    }
    }
    }
      

  7.   

    是阿, 但是这样就失去了阻塞队列的意义了!   比如说现在put线程在put, 拿到了锁, put完之后释放锁, 下次又是put拿到锁,那么take就一直处于等待中!  不知道LZ这样搞的目的是啥! - -!
      

  8.   

    别用put 和 take 方法了,
    用下面这两个方法 你看行不行。可以自己控制等待时间
    offer(E o, long timeout, TimeUnit unit) 
    将指定的元素插入此队列中,如果没有可用空间,将等待指定的等待时间(如果有必要)。poll(long timeout, TimeUnit unit) 
              检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)。 
      

  9.   

    参考这篇文章的介绍:http://hi.baidu.com/fangpw/blog/item/1df8b8efc2bb18e2cf1b3e11.html