本帖最后由 matrix1984 于 2013-06-17 18:37:44 编辑

解决方案 »

  1.   

    ThreadPool不是重点,重点是要保证异步线程取bean时,如何并发、互斥的获取数据。我们可以把beans放到队列或堆栈中,任意线程来了,只取第一个bean(这个“取”操作要synchronized),然后run方法只做单一bean的处理即可。最后,我们初始化的任意个(LZ这里要求5个就5个吧,其实ThreadPool更适合不定个数个异步线程的处理)线程,使用ThreadPool实例对象(单例)的excute(Runnable r)方法加入到线程池即可。
      

  2.   

    楼主你好,看到你这个需求,我想我们是不是可以换个思路。
    既然那么多数据都要做处理,不如,我们把这些数据用后台,比如shell程序去处理,将数据后的结果集添加到新的数据库(表)中。这样我们只需要在新的数据库(表)中去读就可以了。
    楼主你觉得呢。
      

  3.   


    1. 关于获取bean的互斥,我是那么做的,一个线程获取5000条数据后,将其状态更新为“processing”表示正在被处理,这样下一个thread就不会再取这部分数据;
    2. 你的建议是一个线程只去处理一个bean,这样跟我一个线程处理5000个,效率方面能解释下为什么吗?另外,我现在的处理方法类似如下:        ThreadPoolExecutor executor = new ThreadPoolExecutor(PROCESS_THREADS_COUNT, // core size
                    PROCESS_THREADS_COUNT_MAX, // max size
                    10 * 60, // idle timeout
                    TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(PROCESS_THREADS_COUNT_MAX), 
                    new ThreadPoolExecutor.CallerRunsPolicy());...
    ...
    long start = System.currentTimeMillis();
    while(true) {
      DataBean[] beans = queryData(5000); //每次取5000条
      if(beans.length > 0) {
        
        updateDataStatusToProcessing(beans); //状态更新为正在处理
        ProcessTask subtask = new ProcessTask(result); //具体业务处理Thread
        executor.execute(subtask);
       
      } else break;  if(beans.length < 5000) break; //没有更多的数据
    }executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    long end = System.currentTimeMillis();
    System.out.println("使用时间: " + (end - start) + "ms"); //打印当所有任务都完成时所用时间
    ...
    ...
      

  4.   


    1. 关于获取bean的互斥,我是那么做的,一个线程获取5000条数据后,将其状态更新为“processing”表示正在被处理,这样下一个thread就不会再取这部分数据;
    2. 你的建议是一个线程只去处理一个bean,这样跟我一个线程处理5000个,效率方面能解释下为什么吗?另外,我现在的处理方法类似如下:        ThreadPoolExecutor executor = new ThreadPoolExecutor(PROCESS_THREADS_COUNT, // core size
                    PROCESS_THREADS_COUNT_MAX, // max size
                    10 * 60, // idle timeout
                    TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(PROCESS_THREADS_COUNT_MAX), 
                    new ThreadPoolExecutor.CallerRunsPolicy());...
    ...
    long start = System.currentTimeMillis();
    while(true) {
      DataBean[] beans = queryData(5000); //每次取5000条
      if(beans.length > 0) {
        
        updateDataStatusToProcessing(beans); //状态更新为正在处理
        ProcessTask subtask = new ProcessTask(result); //具体业务处理Thread
        executor.execute(subtask);
       
      } else break;  if(beans.length < 5000) break; //没有更多的数据
    }executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    long end = System.currentTimeMillis();
    System.out.println("使用时间: " + (end - start) + "ms"); //打印当所有任务都完成时所用时间
    ...
    ...
    第一点,如果数据量很大,可以这样用数据库的标志位(这样要注意一点:在你查询出来,再更新的时候,有没有并发的一个查询操作。建议对查询结果的主键做个排序,查询的时候直接first 5000这样查),如果表不大(字段不多)或数据量不大,建议一次性全部查出来,放到beansQueue中(要保证内存足够),再起线程处理。第二点,一个线程处理5000条,个人感觉这是个串行的。一个线程只处理一个,从queue里取出来首节点,新启一个线程放到ThreadPool中处理,单个处理完毕线程自动结束,这样效率会高。你的单个处理5000条的方法没问题,建议更新状态的操作,在每个bean处理成功后更新,失败的bean不更新,下次查询还可以查出来再次处理。
      

  5.   

    m个线程从表中读取,n个线程进行业务处理m和n之间比例值的确定,我觉得应该由单线程读取一条数据和单线程处理一条数据业务的时间比例决定比如一个线程读取所有数据如果快过n个线程处理所有数据业务,那根本无需多线程读取,因为即使多线程读取快过单线程读取,还是会受制于n个线程业务处理时间的瓶颈。n的确定可以单独对数据处理做多线程测试来确定。