问题是这样的:我有一大批的任务需要多线程处理,我使用了Executors.newFixedThreadPool()来创建线程池,并使用ExecutorService.submit()方法提交Callable对象用于执行,在Callable对象中存储了一些程序运行所需的对象。由于需要提交的任务数量太大,而且submit方法又是不阻塞的,很快就把内存空间用完了。我想请问,ExecutorService是在什么时候释放Callable对象的空间的?是在Callable被执行完以后吗?我该如何进行内存的管理?我想控制提交任务的速率,比如当内存空间不足的时候就阻塞submit,直到一些任务被执行完以后释放出足够的空间。能做到吗?

解决方案 »

  1.   

    Callable对象是靠GC去释放的,只要已经没有任何线程引用了该对象,GC就能够释放了。我想问题应该不是出在Callable对象上的,除非你对Callable做了其它引用导致GC不认为其可以被释放。建议用控制线程池容量的方式来限制,可参见这里:
    http://bjyzxxds.iteye.com/blog/1267833
      

  2.   


    主要是我一下submit了太多的Callable对象,这些Callable至少在被执行之前是不会被释放的吧,而执行的速度肯定赶不上submit的速度,所以导致了内存耗尽。我是这么认为的。那篇文章说的是控制线程的数量吧,我已经控制线程的数量,但是问题是,正在等待执行的任务肯定还要占据空间,如何管理这部分空间呢?
      

  3.   

    哦,两个建议:
    1、先用队列管理(比如数据库表)待执行任务,企图执行的任务数据先放入数据库中,这样避免一下子全把Callable对象都创建出来;用一个线程去检查ExecutorService的情况,有空闲了再从数据库中拿一条任务。
    2、用一个通用的很小的Callable对象,待其被启动后再去初始化其执行任务所需各类数据和对象等。如果能的话建议用第二种,简化自己的开发。
      

  4.   


    也就是说这件事情还得自己来做,现有的ExecutorService的实现都没提供这样的能力?第二种方法对我的情况可能不大适用,因为我这里是顺序地去读一个文件取得原始数据,然后存入Callable中多线程地执行,如果将读文件的过程也放入每个线程的话,也不是不能实现,但是担心出问题,至少要对文件流还有一个同步的控制,效率也会有影响。我打算不行的话就定期检测ExecutorSerice中等待队列的长度,控制队列长度。但是个人觉得这个方案也不是很完美,因为这个列队长度的上限取决于你的Callable对象的大小,如果能根据队列占用的空间来控制就更好了。
      

  5.   

    你所期望控制的资源,超出了ExecutorService的能力,它只能管理线程规模。如果你一口气把整个文件的数据都变成Callable对象,相当于你把整个文件都已经装入内存了。对于大规模数据处理而言,最好还是采用流式处理,所以你说的方案基本上也就是这样了。至于更为精确的规模控制,只能你自己来处理,也就是每次分配Callable的时候,就记录下它所处理数据量的规模;处理完毕的就扣除这个规模并通知主线程;然后你的主线程负责控制总体规模及读取下一个数据库并分配任务就行了。
      

  6.   

    找到一种相对简单的解决方案,ThreadPoolExecutor本身是允许控制等待队列的长度的,只要不使用Executors.newFixedThreadPool(),自己构造ThreadPoolExecutor对象可以更加灵活。代码如下:ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    最后一个参数是等待队列,传递一个大小有限的队列,并且在下一句将队列满时的处理策略设为CallerRunsPolicy,这种策略会减慢submit的速率(默认的策略是抛一个异常)。Executors.newFixedThreadPool()里的源代码是这样的:new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());这样创建的等待队列是无限长的,所以后导致内存耗尽(个人认为Executors不应该这么干)。参见JDK中对ThreadPoolExecutor的介绍
      

  7.   

    相当于你借助ThreadPoolExecutor来阻塞自己的主线程,仍然是靠任务容量来控制规模,并不能达到你最开始希望的“如果能根据队列占用的空间来控制就更好了”。我5楼提的是按空间来控制。但其实我认为你这个做法就足够好了,没必要精确的按空间来控制。