String pageName = "fileName" + (index++) + ".txt";
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>(); if (url != null || url != "") {
results.add(exec.submit(new GetPage(url))); for (Future<String> fi : results) {
try {
String txt = fi.get();
if (txt == null || txt == "") {
logger.info("failed downloading!");
} else {
// 生成文件名及设置编码
TxtCreater txtFile = new TxtCreater(
Crawl.destPath + "/" + pageName, ",", "utf-8");
// 写入文件
txtFile.writeStr(txt);
txtFile.close();
logger.info("finished downloading!");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
exec.shutdown();
}
}
}
我想用多线程去下在网页的内容,在GetPage里有
public Object call() throws Exception {
return GetPage.getPages(url);
}
我对Callable接口不熟悉,照着网上的资料写的,但发现还是单线程,求大婶指教怎么改Callable
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>(); if (url != null || url != "") {
results.add(exec.submit(new GetPage(url))); for (Future<String> fi : results) {
try {
String txt = fi.get();
if (txt == null || txt == "") {
logger.info("failed downloading!");
} else {
// 生成文件名及设置编码
TxtCreater txtFile = new TxtCreater(
Crawl.destPath + "/" + pageName, ",", "utf-8");
// 写入文件
txtFile.writeStr(txt);
txtFile.close();
logger.info("finished downloading!");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
exec.shutdown();
}
}
}
我想用多线程去下在网页的内容,在GetPage里有
public Object call() throws Exception {
return GetPage.getPages(url);
}
我对Callable接口不熟悉,照着网上的资料写的,但发现还是单线程,求大婶指教怎么改Callable
return GetPage.getPages(url);
}
这段代码跟上面什么关系
你这里在获取返回结果,但是获取返回结果的时候都会阻塞,如果你需要同时一起获取,那么你需要开启多个线程来获取返回结果
或
new ThreadPoolExecutor(线程数, 最大线程数,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class Test2 { public static void main(String[] args) throws Exception{
MyCallableClass task1 = new MyCallableClass(0);
MyCallableClass task2 = new MyCallableClass(1);
MyCallableClass task3 = new MyCallableClass(2);
MyCallableClass task4 = new MyCallableClass(3);
MyCallableClass task5 = new MyCallableClass(4);
MyCallableClass task6 = new MyCallableClass(5);
MyCallableClass task7 = new MyCallableClass(6);
MyCallableClass task8 = new MyCallableClass(7);
MyCallableClass task9 = new MyCallableClass(8);
MyCallableClass task0 = new MyCallableClass(9);
List<MyCallableClass> list = new ArrayList<MyCallableClass>();
list.add(task1);
list.add(task2);
list.add(task3);
list.add(task4);
list.add(task5);
list.add(task6);
list.add(task7);
list.add(task8);
list.add(task9);
list.add(task0);
List<Future<String>> results = new ArrayList<Future<String>>();
ExecutorService es = Executors.newFixedThreadPool(list.size());
for(MyCallableClass task : list){
results.add(es.submit(task));
}
Cache cache = new Cache();
cache.setWaitSize(results.size());
for(Future<String> future : results){
new Thread(new Receiver(future, cache)).start();
}
while(!cache.isOver()){
Thread.sleep(1000);// 目的是为了等待任务获得返回值
}
for(String s : cache.getResults()){
System.out.println(s);
}
}
public static class Cache{
private int size;
private List<String> results = Collections.synchronizedList(new ArrayList<String>());
private Object lock = new Object();
public void setWaitSize(int size){
this.size = size;
}
public List<String> getResults() {
return results;
} public void setResults(List<String> results) {
this.results = results;
}
public void addResult(String result){
synchronized (lock) {
results.add(result);
}
}
public boolean isOver(){
if(results.size() == size){
return true;
}
return false;
}
}
public static class MyCallableClass implements Callable<String>{
private int index;
public MyCallableClass(int index){
this.index = index;
}
public String call() throws Exception {
return "hello Callable -- " + index;
}
}
public static class Receiver implements Runnable{ private Future<String> future;
private Cache cache;
public Receiver(Future<String> future, Cache cache){
this.future = future;
this.cache = cache;
}
public void run() {
try {
cache.addResult(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
String pageName = "fileName" + (index++) + ".txt";
int threadNum = 5;
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
ArrayList<Future<String>> futures = new ArrayList<Future<String>>(); if (productUrl != null || productUrl != "") { futures.add(exec.submit(new GetPage(url))); Cache cache = new Cache();
cache.setWaitSize(futures.size());
for (Future<String> future : futures) {
new Thread(new Receiver(future, cache)).start();
}
try {
while(!cache.isOver()){
Thread.sleep(500);
}
for(String txt : cache.getResults()){
if (txt.length() == 0) {
logger.info("failed downloading!");
} else {
TxtCreater txtFile = new TxtCreater(
Crawl.destPath + "/" + pageName, ",", "gbk");
txtFile.writeStr(txt);
txtFile.close();
logger.info("finished downloading!");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
exec.shutdown();
}
}
public class GetPage implements Callable {
public String url;
public GetPage(String url){
this.url = url;
} public Object call() throws Exception {
return GetPage.getWebContentByHttp(url);
}
//getWebContentByHttp方法...
}
while(true){
int cou=ThreadPoolExecutor#getActiveCount() ;
if(cou==0){
break;
}
Thread.sleep(1000);
}
下面就是线程都执行完了
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
api:http://www.ostools.net/apidocs/apidoc?api=jdk-zh
百度去看ThreadPoolExecutor