大家好,最近要写个基于爬虫的小工具,主要是爬行单个网站并解析数据保存其中需要的内容。
我的想法是这样的,爬行线程从unsearchedUrl队列中取出url,去请求该url将获取的页面响应及其url存入到unparsedResponse队列中,解析线程从unparsedResponse队列中取出一条响应,分析该响应中包含的超链接,过滤掉重复的链接后存入unsearchedUrl队列中,当然,解析线程还要进行其他内容的提取。我这样做主要是为了提高性能,对每个url只进行一次网络访问。
现在主要的问题是单个的线程实现了,可是爬行线程和解析线程同时多个进行不知该怎么弄,麻烦大家给分析分析,谢谢!
下面是参考流程图。
我的想法是这样的,爬行线程从unsearchedUrl队列中取出url,去请求该url将获取的页面响应及其url存入到unparsedResponse队列中,解析线程从unparsedResponse队列中取出一条响应,分析该响应中包含的超链接,过滤掉重复的链接后存入unsearchedUrl队列中,当然,解析线程还要进行其他内容的提取。我这样做主要是为了提高性能,对每个url只进行一次网络访问。
现在主要的问题是单个的线程实现了,可是爬行线程和解析线程同时多个进行不知该怎么弄,麻烦大家给分析分析,谢谢!
下面是参考流程图。
线程间共享的数据有visitedUrl类,webPageDB和unvisitedUrl(这两都是LinkedBlockingQueue),主要代码如下:
GetPage.java //从unvisitedUrl里取一个url访问,将结果存在webPageDB里
package org.crawler;
import java.util.concurrent.LinkedBlockingQueue;
public class GetPage implements Runnable{
private VisitedUrl visitedUrl;
private LinkedBlockingQueue<String> unvisitedUrl;
private LinkedBlockingQueue<WebPage> webPageDB;
public GetPage(LinkedBlockingQueue<WebPage> _webPageDB,
VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
this.webPageDB = _webPageDB;
this.visitedUrl = _visitedUrl;
this.unvisitedUrl = _unvisitedUrl;
}
@Override
public void run() {
while(!Thread.interrupted()){
if(!unvisitedUrl.isEmpty()){
try {
WebPage webPage = new WebPage();
String visitingUrl;
visitingUrl = unvisitedUrl.take();
String html = new GetHTML().GetHTML(visitingUrl);
visitedUrl.addVisitedUrl(visitingUrl);
System.out.println(visitingUrl);
webPage.setUrl(visitingUrl);
webPage.setContent(html);
webPageDB.put(webPage);
} catch (Exception e) {
e.printStackTrace();
}
}
else {
visitedUrl.waitFor();
}
}
}
}
Parse.java //解析webPageDB里存放的html,从中获取所有的超链接,并存放到unvisitedUrlpackage org.crawler;import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;public class Parse implements Runnable{
private LinkedBlockingQueue<WebPage> webPageDB;
private VisitedUrl visitedUrl;
private LinkedBlockingQueue<String> unvisitedUrl;
public Parse(LinkedBlockingQueue<WebPage> _webPageDB,
VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
this.webPageDB = _webPageDB;
this.visitedUrl = _visitedUrl;
this.unvisitedUrl = _unvisitedUrl;
}
@Override
public void run() {
while(!Thread.interrupted()){
if(!webPageDB.isEmpty()){
WebPage visitingPage;
try {
visitingPage = webPageDB.take();
Set<String> links = abc.extractLinks(visitingPage.getUrl(), visitingPage.getContent());
addUnvisitedUrl(links);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
else {
visitedUrl.waitFor();
}
}
}
public synchronized void addUnvisitedUrl(Set<String> links){
for (String link:links)
{
if(! visitedUrl.contains(link) && !unvisitedUrl.contains(link) && link != null && !link.trim().equals("") ){
unvisitedUrl.add(link);
}
}
}
public static Set<String> extractLinks(String url , String html) {
//用htmlparser从html里解析处所有的链接
....
Set<String> links = new HashSet<String>();
return links;
}
}
VisitedUrl.java//存放已经访问过的urlpackage org.crawler;import java.util.HashSet;public class VisitedUrl {
private HashSet<String> visitedUrl = new HashSet<String>(); public synchronized void addVisitedUrl(String url){
synchronized(visitedUrl){
visitedUrl.add(url);
}
notifyAll();
}
public synchronized boolean contains(String url){
boolean contains = true;
synchronized(visitedUrl){
if(visitedUrl.contains(url))
contains = true;
else
contains = false;
}
notifyAll();
return contains;
}
public synchronized void waitFor(){
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}WebPage.java//url及其对应的html内容package org.crawler;public class WebPage { private String url;
private String content;
public synchronized String getUrl() {
return url;
}
public synchronized void setUrl(String url) {
this.url = url;
}
public synchronized String getContent() {
return content;
}
public synchronized void setContent(String content) {
this.content = content;
}
}
Test.java//测试类package org.crawler;import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;public class Test {
public static void main(String[] args){
LinkedBlockingQueue<WebPage> webPageDB = new LinkedBlockingQueue<WebPage>();
VisitedUrl visitedUrl = new VisitedUrl();
LinkedBlockingQueue<String> unvisitedUrl = new LinkedBlockingQueue<String>();
String url = "http://www.test.com";
unvisitedUrl.add(url);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,1,TimeUnit.DAYS,queue);
executor.execute(new GetPage(webPageDB,visitedUrl,unvisitedUrl));
executor.execute(new Parse(webPageDB,visitedUrl,unvisitedUrl));
executor.shutdown();
}
}在重复访问某个出现的时候,我把visitedUrl的内容打印出来看,那个url确实已经在里面了,却重复访问了,很奇怪,麻烦大家看看是什么原因谢谢!!
不知怎么回事,我写的代码有问题,有些已经访问过的url,在多线程时还是会被重复访问。
线程间共享的数据有visitedUrl类,webPageDB和unvisitedUrl(这两都是LinkedBlockingQueue),主要代码如下:
GetPage.java //从unvisitedUrl里取一个url访问,将结果存在webPageDB里package org.crawler;
import java.util.concurrent.LinkedBlockingQueue;
public class GetPage implements Runnable{
private VisitedUrl visitedUrl;
private LinkedBlockingQueue<String> unvisitedUrl;
private LinkedBlockingQueue<WebPage> webPageDB;
public GetPage(LinkedBlockingQueue<WebPage> _webPageDB,
VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
this.webPageDB = _webPageDB;
this.visitedUrl = _visitedUrl;
this.unvisitedUrl = _unvisitedUrl;
}
@Override
public void run() {
while(!Thread.interrupted()){
if(!unvisitedUrl.isEmpty()){
try {
WebPage webPage = new WebPage();
String visitingUrl;
visitingUrl = unvisitedUrl.take();
String html = new GetHTML().GetHTML(visitingUrl);
visitedUrl.addVisitedUrl(visitingUrl);
System.out.println(visitingUrl);
webPage.setUrl(visitingUrl);
webPage.setContent(html);
webPageDB.put(webPage);
} catch (Exception e) {
e.printStackTrace();
}
}
else {
visitedUrl.waitFor();
}
}
}
}Parse.java //解析webPageDB里存放的html,从中获取所有的超链接,并存放到unvisitedUrl
package org.crawler;import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;public class Parse implements Runnable{
private LinkedBlockingQueue<WebPage> webPageDB;
private VisitedUrl visitedUrl;
private LinkedBlockingQueue<String> unvisitedUrl;
public Parse(LinkedBlockingQueue<WebPage> _webPageDB,
VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
this.webPageDB = _webPageDB;
this.visitedUrl = _visitedUrl;
this.unvisitedUrl = _unvisitedUrl;
}
@Override
public void run() {
while(!Thread.interrupted()){
if(!webPageDB.isEmpty()){
WebPage visitingPage;
try {
visitingPage = webPageDB.take();
Set<String> links = abc.extractLinks(visitingPage.getUrl(), visitingPage.getContent());
addUnvisitedUrl(links);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
else {
visitedUrl.waitFor();
}
}
}
public synchronized void addUnvisitedUrl(Set<String> links){
for (String link:links)
{
if(! visitedUrl.contains(link) && !unvisitedUrl.contains(link) && link != null && !link.trim().equals("") ){
unvisitedUrl.add(link);
}
}
}
public static Set<String> extractLinks(String url , String html) {
//用htmlparser从html里解析处所有的链接
....
Set<String> links = new HashSet<String>();
return links;
}
}VisitedUrl.java//存放已经访问过的urlpackage org.crawler;import java.util.HashSet;public class VisitedUrl {
private HashSet<String> visitedUrl = new HashSet<String>(); public synchronized void addVisitedUrl(String url){
synchronized(visitedUrl){
visitedUrl.add(url);
}
notifyAll();
}
public synchronized boolean contains(String url){
boolean contains = true;
synchronized(visitedUrl){
if(visitedUrl.contains(url))
contains = true;
else
contains = false;
}
notifyAll();
return contains;
}
public synchronized void waitFor(){
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}WebPage.java//url及其对应的html内容
package org.crawler;public class WebPage { private String url;
private String content;
public synchronized String getUrl() {
return url;
}
public synchronized void setUrl(String url) {
this.url = url;
}
public synchronized String getContent() {
return content;
}
public synchronized void setContent(String content) {
this.content = content;
}
}Test.java//测试类
package org.crawler;import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;public class Test {
public static void main(String[] args){
LinkedBlockingQueue<WebPage> webPageDB = new LinkedBlockingQueue<WebPage>();
VisitedUrl visitedUrl = new VisitedUrl();
LinkedBlockingQueue<String> unvisitedUrl = new LinkedBlockingQueue<String>();
String url = "http://www.test.com";
unvisitedUrl.add(url);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,1,TimeUnit.DAYS,queue);
executor.execute(new GetPage(webPageDB,visitedUrl,unvisitedUrl));
executor.execute(new Parse(webPageDB,visitedUrl,unvisitedUrl));
executor.shutdown();
}
}在重复访问某个出现的时候,我把visitedUrl的内容打印出来看,那个url确实已经在里面了,却重复访问了,很奇怪,麻烦大家看看是什么原因,谢谢!!