大家好,最近要写个基于爬虫的小工具,主要是爬行单个网站并解析数据保存其中需要的内容。
我的想法是这样的,爬行线程从unsearchedUrl队列中取出url,去请求该url将获取的页面响应及其url存入到unparsedResponse队列中,解析线程从unparsedResponse队列中取出一条响应,分析该响应中包含的超链接,过滤掉重复的链接后存入unsearchedUrl队列中,当然,解析线程还要进行其他内容的提取。我这样做主要是为了提高性能,对每个url只进行一次网络访问。
现在主要的问题是单个的线程实现了,可是爬行线程和解析线程同时多个进行不知该怎么弄,麻烦大家给分析分析,谢谢!
下面是参考流程图。

解决方案 »

  1.   

    爬行线程不管如何,每次获得返回结果做解析的时候 new个runnabel丢到线程池里做。
      

  2.   

    不知怎么回事,我写的代码有问题,有些已经访问过的url,在多线程时还是会被重复访问。
    线程间共享的数据有visitedUrl类,webPageDB和unvisitedUrl(这两都是LinkedBlockingQueue),主要代码如下:
    GetPage.java //从unvisitedUrl里取一个url访问,将结果存在webPageDB里
    package org.crawler;
    import java.util.concurrent.LinkedBlockingQueue;
    public class GetPage implements Runnable{
    private VisitedUrl visitedUrl;
    private LinkedBlockingQueue<String> unvisitedUrl;
    private LinkedBlockingQueue<WebPage> webPageDB;
    public GetPage(LinkedBlockingQueue<WebPage> _webPageDB,
    VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
    this.webPageDB = _webPageDB;
    this.visitedUrl = _visitedUrl;
    this.unvisitedUrl = _unvisitedUrl;
    }

    @Override
    public void run() {
    while(!Thread.interrupted()){
    if(!unvisitedUrl.isEmpty()){
    try {
    WebPage webPage = new WebPage();
    String visitingUrl;
    visitingUrl = unvisitedUrl.take();
    String html = new GetHTML().GetHTML(visitingUrl);
    visitedUrl.addVisitedUrl(visitingUrl);
    System.out.println(visitingUrl);
    webPage.setUrl(visitingUrl);
    webPage.setContent(html);
    webPageDB.put(webPage);
    } catch (Exception e) {
    e.printStackTrace();

    }
    else {
    visitedUrl.waitFor();
    }
    }
    }
    }
    Parse.java //解析webPageDB里存放的html,从中获取所有的超链接,并存放到unvisitedUrlpackage org.crawler;import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.LinkedBlockingQueue;public class Parse implements Runnable{
    private LinkedBlockingQueue<WebPage> webPageDB;
    private VisitedUrl  visitedUrl;
    private LinkedBlockingQueue<String> unvisitedUrl;
    public Parse(LinkedBlockingQueue<WebPage> _webPageDB,
    VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
    this.webPageDB = _webPageDB;
    this.visitedUrl = _visitedUrl;
    this.unvisitedUrl = _unvisitedUrl;
    }

    @Override
    public void run() {
    while(!Thread.interrupted()){
    if(!webPageDB.isEmpty()){
    WebPage visitingPage;
    try {
    visitingPage = webPageDB.take();
    Set<String> links = abc.extractLinks(visitingPage.getUrl(), visitingPage.getContent());
    addUnvisitedUrl(links);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    else {
    visitedUrl.waitFor();
    }
    }
    }

    public synchronized void addUnvisitedUrl(Set<String> links){
    for (String link:links)
    {
    if(! visitedUrl.contains(link) && !unvisitedUrl.contains(link) && link != null && !link.trim().equals("") ){
    unvisitedUrl.add(link);
    }
    }
    }

    public static Set<String> extractLinks(String url , String html)  {
    //用htmlparser从html里解析处所有的链接
    ....
    Set<String> links = new HashSet<String>();
    return links;
    }
    }
    VisitedUrl.java//存放已经访问过的urlpackage org.crawler;import java.util.HashSet;public class VisitedUrl {
    private HashSet<String> visitedUrl = new HashSet<String>(); public synchronized void addVisitedUrl(String url){
    synchronized(visitedUrl){
    visitedUrl.add(url);
    }
    notifyAll();
    }

    public synchronized boolean contains(String url){
    boolean contains = true;
    synchronized(visitedUrl){
    if(visitedUrl.contains(url))
    contains = true;
    else
    contains = false;
    }
    notifyAll();
    return contains;
    }

    public synchronized void waitFor(){
    try {
    wait();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }WebPage.java//url及其对应的html内容package org.crawler;public class WebPage { private String url;
    private String content;

    public synchronized String getUrl() {
    return url;
    }
    public synchronized void setUrl(String url) {
    this.url = url;
    }
    public synchronized String getContent() {
    return content;
    }
    public synchronized void setContent(String content) {
    this.content = content;
    }
    }
    Test.java//测试类package org.crawler;import java.util.HashSet;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;public class Test {

    public static void main(String[] args){
    LinkedBlockingQueue<WebPage> webPageDB = new LinkedBlockingQueue<WebPage>();
    VisitedUrl visitedUrl = new VisitedUrl();
    LinkedBlockingQueue<String> unvisitedUrl = new LinkedBlockingQueue<String>();
    String url = "http://www.test.com";
    unvisitedUrl.add(url);
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,1,TimeUnit.DAYS,queue);
    executor.execute(new GetPage(webPageDB,visitedUrl,unvisitedUrl));
    executor.execute(new Parse(webPageDB,visitedUrl,unvisitedUrl));
    executor.shutdown();
    }
    }在重复访问某个出现的时候,我把visitedUrl的内容打印出来看,那个url确实已经在里面了,却重复访问了,很奇怪,麻烦大家看看是什么原因谢谢!!
      

  3.   

    不好意思,重发一遍,刚发现还有源代码显示这功能。
    不知怎么回事,我写的代码有问题,有些已经访问过的url,在多线程时还是会被重复访问。
    线程间共享的数据有visitedUrl类,webPageDB和unvisitedUrl(这两都是LinkedBlockingQueue),主要代码如下:
    GetPage.java //从unvisitedUrl里取一个url访问,将结果存在webPageDB里package org.crawler;
    import java.util.concurrent.LinkedBlockingQueue;
    public class GetPage implements Runnable{
    private VisitedUrl visitedUrl;
    private LinkedBlockingQueue<String> unvisitedUrl;
    private LinkedBlockingQueue<WebPage> webPageDB;
    public GetPage(LinkedBlockingQueue<WebPage> _webPageDB,
    VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
    this.webPageDB = _webPageDB;
    this.visitedUrl = _visitedUrl;
    this.unvisitedUrl = _unvisitedUrl;
    }

    @Override
    public void run() {
    while(!Thread.interrupted()){
    if(!unvisitedUrl.isEmpty()){
    try {
    WebPage webPage = new WebPage();
    String visitingUrl;
    visitingUrl = unvisitedUrl.take();
    String html = new GetHTML().GetHTML(visitingUrl);
    visitedUrl.addVisitedUrl(visitingUrl);
    System.out.println(visitingUrl);
    webPage.setUrl(visitingUrl);
    webPage.setContent(html);
    webPageDB.put(webPage);
    } catch (Exception e) {
    e.printStackTrace();

    }
    else {
    visitedUrl.waitFor();
    }
    }
    }
    }Parse.java //解析webPageDB里存放的html,从中获取所有的超链接,并存放到unvisitedUrl
    package org.crawler;import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.LinkedBlockingQueue;public class Parse implements Runnable{
    private LinkedBlockingQueue<WebPage> webPageDB;
    private VisitedUrl  visitedUrl;
    private LinkedBlockingQueue<String> unvisitedUrl;
    public Parse(LinkedBlockingQueue<WebPage> _webPageDB,
    VisitedUrl _visitedUrl,LinkedBlockingQueue<String> _unvisitedUrl){
    this.webPageDB = _webPageDB;
    this.visitedUrl = _visitedUrl;
    this.unvisitedUrl = _unvisitedUrl;
    }

    @Override
    public void run() {
    while(!Thread.interrupted()){
    if(!webPageDB.isEmpty()){
    WebPage visitingPage;
    try {
    visitingPage = webPageDB.take();
    Set<String> links = abc.extractLinks(visitingPage.getUrl(), visitingPage.getContent());
    addUnvisitedUrl(links);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    else {
    visitedUrl.waitFor();
    }
    }
    }

    public synchronized void addUnvisitedUrl(Set<String> links){
    for (String link:links)
    {
    if(! visitedUrl.contains(link) && !unvisitedUrl.contains(link) && link != null && !link.trim().equals("") ){
    unvisitedUrl.add(link);
    }
    }
    }

    public static Set<String> extractLinks(String url , String html)  {
    //用htmlparser从html里解析处所有的链接
    ....
    Set<String> links = new HashSet<String>();
    return links;
    }
    }VisitedUrl.java//存放已经访问过的urlpackage org.crawler;import java.util.HashSet;public class VisitedUrl {
    private HashSet<String> visitedUrl = new HashSet<String>(); public synchronized void addVisitedUrl(String url){
    synchronized(visitedUrl){
    visitedUrl.add(url);
    }
    notifyAll();
    }

    public synchronized boolean contains(String url){
    boolean contains = true;
    synchronized(visitedUrl){
    if(visitedUrl.contains(url))
    contains = true;
    else
    contains = false;
    }
    notifyAll();
    return contains;
    }

    public synchronized void waitFor(){
    try {
    wait();
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }WebPage.java//url及其对应的html内容
    package org.crawler;public class WebPage { private String url;
    private String content;

    public synchronized String getUrl() {
    return url;
    }
    public synchronized void setUrl(String url) {
    this.url = url;
    }
    public synchronized String getContent() {
    return content;
    }
    public synchronized void setContent(String content) {
    this.content = content;
    }
    }Test.java//测试类
    package org.crawler;import java.util.HashSet;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;public class Test {

    public static void main(String[] args){
    LinkedBlockingQueue<WebPage> webPageDB = new LinkedBlockingQueue<WebPage>();
    VisitedUrl visitedUrl = new VisitedUrl();
    LinkedBlockingQueue<String> unvisitedUrl = new LinkedBlockingQueue<String>();
    String url = "http://www.test.com";
    unvisitedUrl.add(url);
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,1,TimeUnit.DAYS,queue);
    executor.execute(new GetPage(webPageDB,visitedUrl,unvisitedUrl));
    executor.execute(new Parse(webPageDB,visitedUrl,unvisitedUrl));
    executor.shutdown();
    }
    }在重复访问某个出现的时候,我把visitedUrl的内容打印出来看,那个url确实已经在里面了,却重复访问了,很奇怪,麻烦大家看看是什么原因,谢谢!!
      

  4.   

    发现在Parse.java的addUnvisitedUrl(links);这里设置一个断点手动运行后,到本该出现重复访问的那个url时却没有了,正常。而把那个断点去掉后,在getPage的System.out.println(visitingUrl);这句设置断点执行后,仍然会访问那个已经访问过的url...