本帖最后由 mynewcsdn 于 2014-12-19 17:13:22 编辑

解决方案 »

  1.   

    写一个A类实现Runnable接口,data作为A的成员变量,四个线程在new的时候都用A作为参数去构造,这样子四个线程就共享data了
      

  2.   

    共享是共享了,但是这样会出现很大的问题,假如我有10w条数据 可会有2w条数据没有得到处理,而有2w条数据处理了两次,明白吧
      

  3.   

    设计模式有点问题。
    楼主可以参考我提出方案:
    首先,程序启动时,就要有5个线程以及1个阻塞队列和1个特殊对象;
        (5个线程中,1个读取线程从数据库中读取数据,放入阻塞队列,4个处理线程从阻塞队列中读取数据进行处理。)
        (当读取线程将所有数据都读取完毕后,向阻塞队列连续放入4次特殊对象,
          4个线程在处理数据前先判断是否是特殊对象,如果是则直接返回,不做处理。
          这个特殊对象被称为致.命.毒.药,用于杀死处理线程。)
    其次, 阻塞队列要具有大小限制的那种,读取线程最好单独写一个线程类,处理线程写一个类,然后创建4个线程对象。
        (推荐使用LinkedBlockingQueue,注意:构造器要带最大值这个参数,用来限制内存大小。)
    第三,这种模式的适用场景最好是:
        1. 单个数据处理耗时长(处理时间大于数据传输时间);
        2. 单个数据处理逻辑复杂(每个数据都是一个数据库事务);
        3. 对统计型的数据,允许少量误差。等等。
        (以上3点满足任意1个即可)
      

  4.   

    嗯,很对,但是这样代码写起来比较麻烦,而且这种会有产生死锁的风险(哈哈,本人小菜,技术拙劣,自己写的代码经常出问题),其实我上面的思路也正确的,之所以不能正确处理的是因为我少了一个break;
    看这段代码               
    boolean flag = true;
                   while( flag){
                        try {
                            Thread.sleep(500l);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                     for(Future<Integer> future : dataHdlFutures){
                         if( !future.isDone()){
                             flag = true;
                             break;
                         }else{
                             flag = false;
                         }
                     }
    我在原帖中的代码是没有break的所以导致某个线程还没处理完数据,我的主线程就更新data了,从而发生数据丢失的问题
    看一段代码dataHdlFutures.add(FixedThreadPool.getService().submit(linkDataHdl));
    这里面dataHdlFutures是一个List<Future<Integer>>, 而FixedThreadPool.getService()是我封装的一个类用来创建一个固定大小的线程池,Future有个方法isDone()是用来判断线程是否终止的。之所以采用这种方案主要有两个原因:
       一、用线程池创建线程的消耗要比直接创建线程要小一点
       二、可以用Future来很好的判断线程的生命周期,(而不用强制终止线程之类)
    最后谢谢两位的支持
      

  5.   

    最后补充一点,上面有个方法写错了
     
    public synchronized Object getElement(){
                   if( data.size() > 0){
                    return linkList.remove(0);
                }else{
                    return null;
                }
        }
    这应该改成这样,哈哈,不过其实并不影响代码的理解:   public synchronized Object getElement(){
                   if( data.size() > 0){
                    return data.remove(0);
                }else{
                    return null;
                }
        }
      

  6.   

    不好意思还有个地方忘了更正 就是//这个是用来创建线程的一个内部类      
     protected class linkDataHdlThread implements Callable<Integer>{
        public linkDataHdlThread(    ){
        }
        @Override
        public Integer call() throws Exception {
            if(logger.isInfoEnabled()) logger.info("线程类"+this.hashCode()+"......");
              //线程栈上的一个临时变量,这个线程之间不会共享吧??
              List<Object> linkContainer = new ArrayList<Object>();
              while(data.size() > 0){//在这个地方调用data.size()是否会有问题
                    linkContainer.add(getElement());
            }
            while( linkContainer.size() > 0 ){
                linkProcessorUnit(linkContainer.remove(0));
            }
            return null;
        }
    }
    应该改为//这个是用来创建线程的一个内部类      
     protected class linkDataHdlThread implements Callable<Integer>{
        public linkDataHdlThread(    ){
        }
        @Override
        public Integer call() throws Exception {
            if(logger.isInfoEnabled()) logger.info("线程类"+this.hashCode()+"......");
              //线程栈上的一个临时变量,这个线程之间不会共享吧??
              List<Object> linkContainer = new ArrayList<Object>();
              Object link = getElement();
              while( link != null ){//在这个地方调用data.size()是否会有问题
                    linkContainer.add(link);
                    link  = getElement();
            }
           //其实下面这段是画蛇添足了,可以直接在上面的循环里面直接处理 link了
            while( linkContainer.size() > 0 ){
                linkProcessorUnit(linkContainer.remove(0));
            }
            return null;
        }
    }