.NET多线程编程(14)——用C#实现蜘蛛/爬虫程序的多线程控制
在爬虫/蜘蛛制作(C#语言)文中已经介绍了爬虫实现基本思路方法可以说已经实现了爬虫功能只是它存在个效率问题下载速度可能很慢这是两方面原因造成:
1.分析和下载不能同步进行在爬虫/蜘蛛制作
(C#语言)中已经介绍了爬虫两个步骤:分析和下载在单线程中两者是无法同时进行也就是说分析时会造成网络空闲分析时间越长下载效率越低反的也是样下载时无法同时进行分析只有停下下载后才能进行下步分析问题浮出水面我想大家都会想到:把分析和下载用区别线程进行问题不就解决了吗?
2.只是单线程下载相信大家都有用过网际快车等下载资源经历它里面是可以设置线程数(近年版本默认是10曾经默认是5)它会将文件分成和线程数相同部分然后每个线程下载自己那部分这样下载效率就有可能提高相信大家都有加多线程数提升下载效率经历但细心用户会发现在带宽定情况下并不是线程越多速度越快而是在某
点达到峰值爬虫作为特殊下载工具不具备多线程能力何以有效率可谈?爬虫在信息时代目难道不是快速获取信息吗?所以爬虫需要有多线程(可控数量)同时下载网页
好了认识、分析完问题就是解决问题了:
多线程在C#中并不难实现它有个命名空间:.Threading提供了多线程支持
要开启个新线程需要以下化:
ThreadStart startDownload = ThreadStart( DownLoad );
//线程起始设置:即每个线程都执行DownLoad注意:DownLoad必须为不带有参数思路方法
Thread downloadThread = Thread( startDownload ); //例子化要开启新类
downloadThread.Start;//开启线程 由于线程起始时启动思路方法不能带有参数这就为多线程共享资源添加
了麻烦不过我们可以用类级变量(当然也可以使用其它思路方法笔者认为此思路方法最简单易用)来解决这个问题
知道开启多线程下载思路方法后大家可能会产生几个疑问:
1.如何控制线程数量?
2.如何防止多线程下载同网页?
3.如何判断线程结束?
4.如何控制线程结束?
下面就这几个问题提出解决思路方法:
1.线程数量我们可以通过for循环来实现就如同当年初学编程打点样
比如已知用户指定了n(它是个型变量)个线程吧可以用如下思路方法开启 5个线程
Thread downloadThread;
//声名下载线程这是C#优势即化时不需要指定其长度可以在使用时才指定
这个声名应为类级这样也就为其它思路方法Control控件它们提供了可能
ThreadStart startDownload = ThreadStart( DownLoad );
//线程起始设置:即每个线程都执行DownLoad
downloadThread = Thread[ n ];//为线程申请资源确定线程总数
for( i = 0; i < n; i )//开启指定数量线程数
{
downloadThread[i] = Thread( startDownload );//指定线程起始设置
downloadThread[i].Start;//逐个开启线程
} 好了实现控制开启线程数是不是很简单啊?
2.下面出现个问题:所有线程都DonwLoad思路方法这样如何避免它们同时下载同个网页呢?
这个问题也好解决只要建立下Url地址表表中每个地址只允许被个线程申请即可具体实现:
可以利用数据库建立个表表中有 4列其中列专门用于存储Url地址另外两列分别存放地址对应线程以及该地
址被申请次数最后列存放下载内容(当然对应线程列不是必要)当有线程申请后将对应线程列设定为当前线程编号
并将是否申请过列设置为申请次这样别线程就无法申请该页如果下载成功则将内容存入内容列如果不成功内容
列仍为空作为是否再次下载依据的如果反复不成功则进程将于达到重试次数(对应该地址被申请次数用户可设)后
申请下个Url地址主要代码如下(以VFP为例):
<建立表>
CREATE TABLE (ctablename) ( curl M , ctext M , ldowned I , threadNum I )
&&建立个表ctablename.dbf含有地址、文本内容、已经尝试下载次数、
线程标志(初值为-1线程标志是从0开始整数) 4个字段
<提取Url地址>
cfullname = (ctablename) + '.dbf'&&为表添加扩展名
USE (cfullname)
GO TOP
LOCATE FOR (EMPTY( ALLTRIM( ctext ) ) AND ldowned < 2 AND
( threadNum = thisNum OR threadNum = - 1) )
&&查找尚未下载成功且应下载属于本线程权限Url地址thisNum是当前线程编号
可以通过参数传递得到
gotUrl = curl
recNum = RECNO
IF recNum <= RECCOUNT THEN &&如果在列表中找到这样Url地址
UPDATE (cfullname) SET ldowned = ( ldowned + 1 ) , threadNum =
thisNum WHERE RECNO = recNum &&更新表将此记录更新为已申请即下载次数加1
线程标志列设为本线程编号
<下载内容>
cfulltablename = (ctablename) + '.dbf'
USE (cfulltablename)
SET EXACT _disibledevent= 0; i < n; i )//关闭指定数量n线程数
{
downloadThread[i].Abort;//逐个关闭线程
} 好了个蜘蛛就这样完成了在C#面前它实现原来如此简单
这里笔者还想提醒读者:笔者只是提供了个思路及个可以实现解决方案但它并不是最佳即使这个方案本身也
有好多可以改进地方留给读者研究
.NET多线程编程——一个简单的C#多线程间同步的例子
在开发中经常会遇到线程例子如果某个后台操作比较费时间我们就可以启动个线程去执行那个费时操作同时继续执行在某些情况下可能会出现多个线程同步协同问题下面例子就展示了在两个线程的间如何协同工作
这个思路是共同做件事情(从个ArrayList中删除元素),如果执行完成了两个线程都停止执行
代码如下:
using;
using.Collections;
using.Collections.Generic;
using.Threading;
///<summary>
///在开发中经常会遇到线程例子如果某个后台操作比较费时间我们就可以启动个线程去执行那个费时操作同时继续执行在某些情况下可能会出现多个线程同步协同问题下面例子就展示了在两个线程的间如何协同工作
///
///这个思路是共同做件事情(从个ArrayList中删除元素),如果执行完成了两个线程都停止执行
///作者:周公
///时间:2008-5-17
///原发地址:http://blog.csdn.net/zhoufoxcn
///</summary>
publicThreadDemo
{
privateThreadthreadOne;
privateThreadthreadTwo;
privateArrayListList;
privateeventEventHandlerOnNumberClear;//数据删除完成引发事件
publicvoidMain
{
ThreadDemodemo=ThreadDemo(1000);
demo.Action;
}
publicThreadDemo(number)
{
Randomrandom=Random(1000000);
List=ArrayList(number);
for(i=0;i<number;i)
{
List.Add(random.Next.);
}
threadOne=Thread(ThreadStart(Run));//两个线程共同做件事情
threadTwo=Thread(ThreadStart(Run));//两个线程共同做件事情
threadOne.Name="线程1";
threadTwo.Name="线程2";
OnNumberClearEventHandler(ThreadDemo_OnNumberClear);
}
///<summary>
///开始工作
///</summary>
publicvoidAction
{
threadOne.Start;
threadTwo.Start;
}
///<summary>
///共同做工作
///</summary>
privatevoidRun
{
Value=null;
while(true)
{
Monitor.Enter(this);//锁定保持同步
Value=()List[0];
Console.WriteLine(Thread.CurrentThread.Name+"删除了"+Value);
List.RemoveAt(0);//删除ArrayList中元素
(List.Count0)
{
OnNumberClear(this,EventArgs);//引发完成事件
}
Monitor.Exit(this);//取消锁定
Thread.Sleep(5);
}
}
//执行完成的后停止所有线程
voidThreadDemo_OnNumberClear(objectsender,EventArgse)
{
Console.WriteLine("执行完了停止了所有线程执行");
threadTwo.Abort;
threadOne.Abort;
}
}
.NET多线程编程——多线程的自动管理
在多线程中经常会出现两种情况:
一种情况: 应用中线程把大部分时间花费在等待状态等待某个事件发生然后才能给予响应,这般使用ThreadPool(线程池)来解决;
另种情况:线程平时都处于休眠状态只是周期性地被唤醒,这一般使用Timer(定时器)来解决;
ThreadPool类提供个由系统维护线程池(可以看作个线程容器)该容器需要 Windows 2000 以上系统支持其中某些思路方法了只有高版本Windows才有API
将线程安放在线程池里需使用ThreadPool.QueueUserWorkItem思路方法该思路方法原型如下:
//将个线程放进线程池该线程Start思路方法将WaitCallback代理对象代表
public bool QueueUserWorkItem(WaitCallback);
//重载思路方法如下参数object将传递给WaitCallback所代表思路方法
public bool QueueUserWorkItem(WaitCallback, object);
注意:
ThreadPool类是个静态类你不能也不必要生成它对象而且旦使用该思路方法在线程池中添加了个项目那么该项目将是无法取消
在这里你无需自己建立线程只需把你要做工作写成然后作为参数传递给
ThreadPool.QueueUserWorkItem思路方法就行了传递思路方法就是依靠WaitCallback代理对象而线程建立、管理、运行等工作都是由系统自动完成你无须考虑那些复杂细节问题
ThreadPool 使用方法:
首先创建了个ManualReEvent对象该对象就像个信号灯可以利用它信号来通知其它线程
本例中当线程池中所有线程工作都完成以后ManualReEvent对象将被设置为有信号从而通知主线程继续运行
ManualReEvent对象有几个重要思路方法:
化该对象时用户可以指定其默认状态(有信号/无信号);
在化以后该对象将保持原来状态不变直到它Re或者Set思路方法被:
Re思路方法:将其设置为无信号状态;
Set思路方法:将其设置为有信号状态
WaitOne思路方法:使当前线程挂起直到ManualReEvent对象处于有信号状态此时该线程将被激活然后将向线程池中添加工作项这些以形式提供工作项被系统用来化自动建立线程当所有线程都运行完了以后ManualReEvent.Set思路方法被了ManualReEvent.WaitOne思路方法而处在等待状态主线程将接收到这个信号于是它接着往下执行完成后边工作
ThreadPool 使用方法举例:
using;
using.Collections;
using.Threading;
ThreadExample
{
//这是用来保存信息数据结构将作为参数被传递
publicSomeState
{
publicCookie;
publicSomeState(iCookie)
{
Cookie=iCookie;
}
}
publicAlpha
{
publicHashtableHashCount;
publicManualReEventeventX;
publiciCount=0;
publiciMaxCount=0;
publicAlpha(MaxCount)
{
HashCount=Hashtable(MaxCount);
iMaxCount=MaxCount;
}
//线程池里线程将Beta思路方法
publicvoidBeta(Objectstate)
{
//输出当前线程hash编码值和Cookie值
Console.WriteLine("{0}{1}:",Thread.CurrentThread.GetHashCode,((SomeState)state).Cookie);
Console.WriteLine("HashCount.Count{0},Thread.CurrentThread.GetHashCode{1}",HashCount.Count,Th
read.CurrentThread.GetHashCode);
lock(HashCount)
{
//如果当前Hash表中没有当前线程Hash值则添加的
(!HashCount.ContainsKey(Thread.CurrentThread.GetHashCode))
HashCount.Add(Thread.CurrentThread.GetHashCode,0);
HashCount[Thread.CurrentThread.GetHashCode]=
(()HashCount[Thread.CurrentThread.GetHashCode])+1;
}
iX=2000;
Thread.Sleep(iX);
//Interlocked.Increment操作是个原子操作具体请看下面介绍说明
Interlocked.Increment(refiCount);
(iCountiMaxCount)
{
Console.WriteLine;
Console.WriteLine("SettingeventX");
eventX.Set;
}
}
}
publicSimplePool
{
publicMain(args)
{
Console.WriteLine("ThreadPoolSample:");
boolW2K=false;
MaxCount=10;//允许线程池中运行最多10个线程
//新建ManualReEvent对象并且化为无信号状态
ManualReEventeventX=ManualReEvent(false);
Console.WriteLine("Queuing{0}itemstoThreadPool",MaxCount);
AlphaoAlpha=Alpha(MaxCount);
//创建工作项
//注意化oAlpha对象eventX属性
oAlpha.eventX=eventX;
Console.WriteLine("QueuetoThreadPool0");
try
{
//将工作项装入线程池
//这里要用到Windows2000以上版本才有API所以可能出现NotSupportException异常
ThreadPool.QueueUserWorkItem(WaitCallback(oAlpha.Beta),SomeState(0));
W2K=true;
}
catch(NotSupportedException)
{
Console.WriteLine("TheseAPI'smayfailwhencalledonanon-
Windows2000system.");
W2K=false;
}
(W2K)//如果当前系统支持ThreadPool思路方法.
{
for(iItem=1;iItem<MaxCount;iItem)
{
//插入队列元素
Console.WriteLine("QueuetoThreadPool{0}",iItem);
ThreadPool.QueueUserWorkItem(WaitCallback(oAlpha.Beta),SomeState(iItem));
}
Console.WriteLine("WaitingforThreadPooltodrain");
//等待事件完成即线程ManualReEvent.Set思路方法
eventX.WaitOne(Timeout.Infinite,true);
//WaitOne思路方法使它线程等待直到eventX.Set思路方法被
Console.WriteLine("ThreadPoolhasbeendrained(Eventfired)");
Console.WriteLine;
Console.WriteLine("Loadacrossthreads");
foreach(objectoinoAlpha.HashCount.Keys)
Console.WriteLine("{0}{1}",o,oAlpha.HashCount[o]);
}
Console.ReadLine;
0;
}
}
}
}
中应该引起注意地方:
SomeState类是个保存信息数据结构它在中作为参数被传递给每个线程你需要把些有用信息封装起来提供
给线程而这种方式是非常有效
出现InterLocked类也是专为多线程而存在它提供了些有用原子操作
原子操作:就是在多线程中如果这个线程这个操作修改个变量那么其他线程就不能修改这个变量了这跟lock关键字在本质上是样
输出结果:
ThreadPoolSample:
Queuing10itemstoThreadPool
QueuetoThreadPool0
QueuetoThreadPool1
QueuetoThreadPool2
QueuetoThreadPool3
QueuetoThreadPool4
QueuetoThreadPool5
20:
HashCount.Count0,Thread.CurrentThread.GetHashCode2
QueuetoThreadPool6
QueuetoThreadPool7
QueuetoThreadPool8
QueuetoThreadPool9
WaitingforThreadPooltodrain
41:
HashCount.Count1,Thread.CurrentThread.GetHashCode4
62:
HashCount.Count1,Thread.CurrentThread.GetHashCode6
73:
HashCount.Count1,Thread.CurrentThread.GetHashCode7
24:
HashCount.Count1,Thread.CurrentThread.GetHashCode2
85:
HashCount.Count2,Thread.CurrentThread.GetHashCode8
96:
HashCount.Count2,Thread.CurrentThread.GetHashCode9
107:
HashCount.Count2,Thread.CurrentThread.GetHashCode10
118:
HashCount.Count2,Thread.CurrentThread.GetHashCode11
49:
HashCount.Count2,Thread.CurrentThread.GetHashCode4
SettingeventX
ThreadPoolhasbeendrained(Eventfired)
Loadacrossthreads
111
101
91
81
71
61
42
22
我拼凑了一段可以打印出结果来的代码总感觉,请高手给看看
class Program
{
static void Main(string[] args)
{
MyLetter.contentLength = 70;//字符串长
MyLetter.lineCount = 10;//行数
MyLetter.isNewLine = true;
Printer printer = new Printer(); int letterCount = Enum.GetValues(typeof(Letters)).Length;
MyLetter.letterCount = letterCount; Thread[] threads = new Thread[letterCount];
for (int i = 0; i < letterCount; i++)
{
MyLetter myLetter = new MyLetter(i, Enum.GetName(typeof(Letters), i), printer);
Thread thread = new Thread(myLetter.Print);
threads[i] = thread;
} for (int i = 0; i < letterCount; i++)
{
threads[i].Start();
} for (int i = 0; i < letterCount; i++)
{
threads[i].Join();
}
//MyLetter a = new MyLetter(0, "A", printer);
//MyLetter b = new MyLetter(1, "B", printer);
//MyLetter c = new MyLetter(2, "C", printer);
//MyLetter d = new MyLetter(3, "D", printer);
//Thread ta = new Thread(a.Print);
//Thread tb = new Thread(b.Print);
//Thread tc = new Thread(c.Print);
//Thread td = new Thread(d.Print); //ta.Start();
//tb.Start();
//tc.Start();
//td.Start();
//ta.Join();
//tb.Join();
//tc.Join();
//td.Join();
Console.ReadLine();
}
} public enum Letters
{
A,
B,
C,
D,
E
} public class MyLetter
{
private readonly int id; private readonly string name; public static int letterCount; public static int contentLength; public static int stopNum; public static int letterIndex; public static int lineCount; public static int lineNum; public static bool isNewLine; private readonly Printer printer; public MyLetter(int id, string name, Printer printer)
{
this.id = id;
this.printer = printer;
this.name = name;
} public void Print()
{
while (lineNum < lineCount)
{
while (isNewLine)
{
printer.SetHeader(id, name);
} while (letterIndex < stopNum)
{
printer.PrintBody(id, name);
} if (!isNewLine)
{
isNewLine = true;
}
}
}
} public class Printer
{
public void SetHeader(int id, string name)
{
if (MyLetter.lineNum < MyLetter.lineCount)
{
lock (this)
{
if (MyLetter.lineNum % MyLetter.letterCount == id && MyLetter.isNewLine)
{
if (MyLetter.lineNum != 0)
{
Console.WriteLine("");
}
Console.Write(name);
Console.Write(MyLetter.lineNum + ":");
MyLetter.isNewLine = false;
MyLetter.letterIndex = MyLetter.lineNum % MyLetter.letterCount;
MyLetter.stopNum = MyLetter.contentLength + MyLetter.lineNum % MyLetter.letterCount;
MyLetter.lineNum++;
}
Monitor.Pulse(this);
}
}
} public void PrintBody(int id, string name)
{
lock (this)
{
if (MyLetter.letterIndex < MyLetter.stopNum)
{
if (MyLetter.letterIndex % MyLetter.letterCount != id)
{
Monitor.Wait(this);
}
else
{
Console.Write(name);
MyLetter.letterIndex++;
}
Monitor.Pulse(this);
}
}
}
}