String pageName = "fileName" + (index++) + ".txt";
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>(); if (url != null || url != "") {
results.add(exec.submit(new GetPage(url))); for (Future<String> fi : results) {
try {
String txt = fi.get();
if (txt == null || txt == "") {
logger.info("failed downloading!");
} else {
// 生成文件名及设置编码
TxtCreater txtFile = new TxtCreater(
Crawl.destPath + "/" + pageName, ",", "utf-8");
// 写入文件
txtFile.writeStr(txt);
txtFile.close();
logger.info("finished downloading!");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
exec.shutdown();
}
}
}
我想用多线程去下在网页的内容,在GetPage里有
public Object call() throws Exception {
return GetPage.getPages(url);
}
我对Callable接口不熟悉,照着网上的资料写的,但发现还是单线程,求大婶指教怎么改Callable

解决方案 »

  1.   

    public Object call() throws Exception {
    return GetPage.getPages(url);
    }
    这段代码跟上面什么关系
      

  2.   

    你的意思是说在exec.submit时就应当开启多线程吗?
      

  3.   

    String txt = fi.get();
    你这里在获取返回结果,但是获取返回结果的时候都会阻塞,如果你需要同时一起获取,那么你需要开启多个线程来获取返回结果
      

  4.   

    newFixedThreadPool(10);

    new ThreadPoolExecutor(线程数, 最大线程数,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
      

  5.   

    newFixedThreadPool(10);这种方式也尝试过,但不知与newCachedThreadPool();的区别
      

  6.   

    newFixedThreadPool(10);定义一个有10个线程的线程池,但fi.get()时,不还是发生线程阻塞吗
      

  7.   

    看看这个吧import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;public class Test2 { public static void main(String[] args) throws Exception{
    MyCallableClass task1 = new MyCallableClass(0);     
    MyCallableClass task2 = new MyCallableClass(1);     
    MyCallableClass task3 = new MyCallableClass(2);  
    MyCallableClass task4 = new MyCallableClass(3);  
    MyCallableClass task5 = new MyCallableClass(4);  
    MyCallableClass task6 = new MyCallableClass(5);  
    MyCallableClass task7 = new MyCallableClass(6);  
    MyCallableClass task8 = new MyCallableClass(7);  
    MyCallableClass task9 = new MyCallableClass(8);  
    MyCallableClass task0 = new MyCallableClass(9);  
    List<MyCallableClass> list = new ArrayList<MyCallableClass>();
    list.add(task1);
    list.add(task2);
    list.add(task3);
    list.add(task4);
    list.add(task5);
    list.add(task6);
    list.add(task7);
    list.add(task8);
    list.add(task9);
    list.add(task0);
    List<Future<String>> results = new ArrayList<Future<String>>();
    ExecutorService es = Executors.newFixedThreadPool(list.size());
    for(MyCallableClass task : list){
    results.add(es.submit(task));
    }
    Cache cache = new Cache();
    cache.setWaitSize(results.size());
    for(Future<String> future : results){
    new Thread(new Receiver(future, cache)).start();
    }
    while(!cache.isOver()){
    Thread.sleep(1000);// 目的是为了等待任务获得返回值
    }
    for(String s : cache.getResults()){
    System.out.println(s);
    }
    }

    public static class Cache{
    private int size;
    private List<String> results = Collections.synchronizedList(new ArrayList<String>());
    private Object lock = new Object();
    public void setWaitSize(int size){
    this.size = size;
    }
    public List<String> getResults() {
    return results;
    } public void setResults(List<String> results) {
    this.results = results;
    }

    public void addResult(String result){
    synchronized (lock) {
    results.add(result);
    }
    }
    public boolean isOver(){
    if(results.size() == size){
    return true;
    }
    return false;
    }
    }

    public static class MyCallableClass implements Callable<String>{
    private int index;
    public MyCallableClass(int index){
    this.index = index;
    }

    public String call() throws Exception {
    return "hello Callable -- " + index;
    }
    }

    public static class Receiver implements Runnable{ private Future<String> future;
    private Cache cache;
    public Receiver(Future<String> future, Cache cache){
    this.future = future;
    this.cache = cache;
    }

    public void run() {
    try {
    cache.addResult(future.get());
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }
    }
    }
    }
      

  8.   

    我跟据你上面的代码改了下,发现在get()时还是发生了线程阻塞。前面线程在get值时,后面的线程依然在等待,只有在前面线程get值完成后,后面的线程才依次运行。这是我照着改了之后的代码:
    String pageName = "fileName" + (index++) + ".txt";
    int threadNum = 5;
    ExecutorService exec = Executors.newFixedThreadPool(threadNum);
    ArrayList<Future<String>> futures = new ArrayList<Future<String>>(); if (productUrl != null || productUrl != "") { futures.add(exec.submit(new GetPage(url))); Cache cache = new Cache();
            cache.setWaitSize(futures.size());
    for (Future<String> future : futures) {
    new Thread(new Receiver(future, cache)).start();
    }
    try {
    while(!cache.isOver()){
    Thread.sleep(500);
    }
    for(String txt : cache.getResults()){
                if (txt.length() == 0) {
    logger.info("failed downloading!");
                } else {
    TxtCreater txtFile = new TxtCreater(
    Crawl.destPath + "/" + pageName, ",", "gbk");
    txtFile.writeStr(txt);
    txtFile.close();
    logger.info("finished downloading!");
    }
            }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally{
    exec.shutdown();
    }
    }
    public class GetPage implements Callable {
    public String url;
    public GetPage(String url){
    this.url = url;
    } public Object call() throws Exception {
    return GetPage.getWebContentByHttp(url);
    }
    //getWebContentByHttp方法...
    }
      

  9.   

    Future#get()本来就阻塞的,你是想线程执行完就掉方法吧,要不在线程中各做各的,要不可以这样
    while(true){
    int cou=ThreadPoolExecutor#getActiveCount() ;
    if(cou==0){
    break;
    }
    Thread.sleep(1000);
    }
    下面就是线程都执行完了
      

  10.   

    最好看懂
    int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler
    api:http://www.ostools.net/apidocs/apidoc?api=jdk-zh
    百度去看ThreadPoolExecutor