可以使用线程池来根据应用程序的需要更为有效地利用多个线程。许多应用程序使用多个线程,但这些线程经常在休眠状态中耗费大量的时间来等待事件发生。其他线程可能进入休眠状态,并且仅定期被唤醒以轮询更改或更新状态信息,然后再次进入休眠状态。使用线程池就可以为应用程序提供一个由系统管理的辅助线程池,从而使您可以集中精力于应用程序任务而不是线程管理。实际上,如果要执行一些需要多个线程的较短任务,则使用 ThreadPool 类是利用多个线程的最方便且最好的方法。使用线程池使系统能够不仅针对此进程而且针对计算机上的其他进程(您的应用程序对其一无所知)对此情况进行优化以达到更好的吞吐量。使用线程池使系统能够在考虑到计算机上的所有当前进程后对线程时间片进行优化。.NET 框架出于以下几个目的使用线程池:异步调用、System.Net 套接字连接、异步 I/O 完成以及计时器与注册的等待操作等等。通过从托管代码调用 ThreadPool.QueueUserWorkItem(或者从非托管代码调用 CorQueueUserWorkItem)并传递用来包装要添加到队列中的方法的 WaitCallback 委托来使用线程池。也可以通过使用 ThreadPool.RegisterWaitForSingleObject 并传递 WaitHandle(在向其发出信号或超时时,它将引发对由 WaitOrTimerCallback 委托包装的方法的调用)来将与等待操作相关的工作项排队到线程池中。在这两种情况下,线程池都使用或创建一个后台线程来调用回调方法。如果您知道调用方的堆栈与在排队任务执行期间执行的所有安全检查不相关,则还可以使用不安全的方法 ThreadPool.UnsafeQueueUserWorkItem 和 ThreadPool.UnsafeRegisterWaitForSingleObject。QueueUserWorkItem 和 RegisterWaitForSingleObject 都会捕获调用方的堆栈,此堆栈将在线程池线程开始执行任务时合并到线程池线程的堆栈中。如果需要进行安全检查,则必须检查整个堆栈。尽管此检查提供了安全,但它还具有一定的性能开销。使用“不安全的”方法调用并不会提供绝对的安全,但它会提供更好的性能。在应用程序域中只有一个 ThreadPool 对象,并且在您首次调用 ThreadPool.QueueUserWorkItem 时或者当计时器或已注册的等待操作将某一回调方法排队时,将创建线程池。一个线程监视所有已排队到线程池中的任务。当某项任务完成后,线程池中的线程将执行相应的回调方法。在对一个工作项进行排队之后将无法取消它。可以排队到线程池中的操作的数目仅受可用内存的限制;但是,线程池将对允许在进程中同时处于活动状态的线程数目强制实施限制(这取决于 CPU 的数目和其他因素)。每个线程都使用默认堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间之后创建另一个辅助线程。但线程的数目永远不会超过最大值。在执行 ThreadPool 回调时,ThreadPool 还切换到正确的 AppDomain。在若干种情况下,适合于创建并管理自己的线程而不是使用 ThreadPool。您应在以下几种情况下创建并管理自己的线程: 如果您需要使一个任务具有特定的优先级。
如果您具有可能会长时间运行(并因此阻塞其他任务)的任务。
如果您需要将线程放置到单线程单元中(所有 ThreadPool 线程均处于多线程单元中)。
如果您需要与该线程关联的稳定标识。例如,您可能想使用专用线程来中止该线程、将其挂起或按名称发现它。
下面的代码示例同时使用了 QueueUserWorkItem 和 RegisterWaitForSingleObject 方法。前者用于在每次循环时两次请求对 ThreadFunc 方法的调用。后者用于将 AutoResetEvent 注册到 ThreadPool。然后,就会在某次循环中向 AutoResetEvent 发出信号,而 ThreadPool 将分配一个线程来调用 WaitThreadFunc。[Visual Basic]
Imports System
Imports System.ThreadingPublic Class Sensor
Private Shared A As Integer = 32
Private Shared S As [String] = "Initial string"
Public Shared Sub Main()
Dim ev As New AutoResetEvent(False)
ThreadPool.RegisterWaitForSingleObject(ev, New WaitOrTimerCallback(AddressOf WaitThreadFunc), Nothing, 20000, False)
Dim count As Integer
For count = 1 To 5
Console.WriteLine("Hello everyone!!")
Console.WriteLine("{0}, {1}", A, S)
ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf ThreadFunc), count)
ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf ThreadFunc), count + 1)
' Signal the AutoResetEvent, which raises the ThreadPool to
' invoke the WaitThreadFunc.
If count = 2 Then
ev.Set()
End If
Thread.Sleep(1000)
Next count
End Sub 'Main
Public Shared Sub ThreadFunc(O As Object)
Console.WriteLine("ThreadPool: WorkItem number: {0}, Values: {1}, {2}", O, A, S)
SyncLock S
S = "Replacement string"
End SyncLock
Console.WriteLine("ThreadPool done. {0}", S)
End Sub 'ThreadFunc
Public Shared Sub WaitThreadFunc(O As Object, signaled As Boolean)
If Not (O Is Nothing) Then
Console.WriteLine("****ThreadPool: WaitWorkItem Type: {0}, Values: {1}", O.GetType().Name, S)
Else
Console.WriteLine("****ThreadPool: WaitWorkItem value is: {0}", S)
End If
SyncLock S
S = "WaitOrTimer: Special timerstring."
End SyncLock
' Demonstrates use of Interlocked to show locking on an Int32.
Interlocked.Increment(A)
End Sub 'WaitThreadFunc
End Class 'Sensor
[C#]
using System;
using System.Threading;public class Sensor
{
static int A = 32;
static String S = "Initial string";
public static int Main(string[] args){ AutoResetEvent ev = new AutoResetEvent(false);
ThreadPool.RegisterWaitForSingleObject(
ev,
new WaitOrTimerCallback(WaitThreadFunc),
null,
20000,
false
); for(int count = 0; count < 5; ++count){
Console.WriteLine("Hello everyone!!");
Console.WriteLine("{0}, {1}", A, S); ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadFunc), 5);
ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadFunc), 6); // Signal the AutoResetEvent, which raises the ThreadPool to
// invoke the WaitThreadFunc.
if (count == 2)
ev.Set(); Thread.Sleep(1000);
}
return 0;
}
public static void ThreadFunc(Object O){ Console.WriteLine("ThreadPool: WorkItem number: {0}, Values: {1}, {2}", O, A, S); lock(S){
S = "Replacement string";
}
Console.WriteLine("ThreadPool done. {0}", S);
} public static void WaitThreadFunc(object O, bool signaled){
if (O != null){
Console.WriteLine("****ThreadPool: WaitWorkItem Type: {0}, Values: {1}", O.GetType().Name, S);
}
else
Console.WriteLine("****ThreadPool: WaitWorkItem value is: {0}", S); lock(S)
{
S = "WaitOrTimer: Special timerstring.";
}
// Demonstrates use of Interlocked to show locking on an Int32.
Interlocked.Increment(ref A);
}
}
如果您具有可能会长时间运行(并因此阻塞其他任务)的任务。
如果您需要将线程放置到单线程单元中(所有 ThreadPool 线程均处于多线程单元中)。
如果您需要与该线程关联的稳定标识。例如,您可能想使用专用线程来中止该线程、将其挂起或按名称发现它。
下面的代码示例同时使用了 QueueUserWorkItem 和 RegisterWaitForSingleObject 方法。前者用于在每次循环时两次请求对 ThreadFunc 方法的调用。后者用于将 AutoResetEvent 注册到 ThreadPool。然后,就会在某次循环中向 AutoResetEvent 发出信号,而 ThreadPool 将分配一个线程来调用 WaitThreadFunc。[Visual Basic]
Imports System
Imports System.ThreadingPublic Class Sensor
Private Shared A As Integer = 32
Private Shared S As [String] = "Initial string"
Public Shared Sub Main()
Dim ev As New AutoResetEvent(False)
ThreadPool.RegisterWaitForSingleObject(ev, New WaitOrTimerCallback(AddressOf WaitThreadFunc), Nothing, 20000, False)
Dim count As Integer
For count = 1 To 5
Console.WriteLine("Hello everyone!!")
Console.WriteLine("{0}, {1}", A, S)
ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf ThreadFunc), count)
ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf ThreadFunc), count + 1)
' Signal the AutoResetEvent, which raises the ThreadPool to
' invoke the WaitThreadFunc.
If count = 2 Then
ev.Set()
End If
Thread.Sleep(1000)
Next count
End Sub 'Main
Public Shared Sub ThreadFunc(O As Object)
Console.WriteLine("ThreadPool: WorkItem number: {0}, Values: {1}, {2}", O, A, S)
SyncLock S
S = "Replacement string"
End SyncLock
Console.WriteLine("ThreadPool done. {0}", S)
End Sub 'ThreadFunc
Public Shared Sub WaitThreadFunc(O As Object, signaled As Boolean)
If Not (O Is Nothing) Then
Console.WriteLine("****ThreadPool: WaitWorkItem Type: {0}, Values: {1}", O.GetType().Name, S)
Else
Console.WriteLine("****ThreadPool: WaitWorkItem value is: {0}", S)
End If
SyncLock S
S = "WaitOrTimer: Special timerstring."
End SyncLock
' Demonstrates use of Interlocked to show locking on an Int32.
Interlocked.Increment(A)
End Sub 'WaitThreadFunc
End Class 'Sensor
[C#]
using System;
using System.Threading;public class Sensor
{
static int A = 32;
static String S = "Initial string";
public static int Main(string[] args){ AutoResetEvent ev = new AutoResetEvent(false);
ThreadPool.RegisterWaitForSingleObject(
ev,
new WaitOrTimerCallback(WaitThreadFunc),
null,
20000,
false
); for(int count = 0; count < 5; ++count){
Console.WriteLine("Hello everyone!!");
Console.WriteLine("{0}, {1}", A, S); ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadFunc), 5);
ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadFunc), 6); // Signal the AutoResetEvent, which raises the ThreadPool to
// invoke the WaitThreadFunc.
if (count == 2)
ev.Set(); Thread.Sleep(1000);
}
return 0;
}
public static void ThreadFunc(Object O){ Console.WriteLine("ThreadPool: WorkItem number: {0}, Values: {1}, {2}", O, A, S); lock(S){
S = "Replacement string";
}
Console.WriteLine("ThreadPool done. {0}", S);
} public static void WaitThreadFunc(object O, bool signaled){
if (O != null){
Console.WriteLine("****ThreadPool: WaitWorkItem Type: {0}, Values: {1}", O.GetType().Name, S);
}
else
Console.WriteLine("****ThreadPool: WaitWorkItem value is: {0}", S); lock(S)
{
S = "WaitOrTimer: Special timerstring.";
}
// Demonstrates use of Interlocked to show locking on an Int32.
Interlocked.Increment(ref A);
}
}
// Simple thread pool example
using System;
using System.Collections;
using System.Threading;// Useful way to store info that can be passed as a state on a work item
public class SomeState
{
public int Cookie;
public SomeState(int iCookie)
{
Cookie = iCookie;
}
}public class Alpha
{
public Hashtable HashCount;
public ManualResetEvent eventX;
public static int iCount = 0;
public static int iMaxCount = 0;
public Alpha(int MaxCount)
{
HashCount = new Hashtable(MaxCount);
iMaxCount = MaxCount;
} // Beta is the method that will be called when the work item is
// serviced on the thread pool.
// That means this method will be called when the thread pool has
// an available thread for the work item.
public void Beta(Object state)
{
// Write out the hashcode and cookie for the current thread
Console.WriteLine(" {0} {1} :", Thread.CurrentThread.GetHashCode(),
((SomeState)state).Cookie);
// The lock keyword allows thread-safe modification
// of variables accessible across multiple threads.
Console.WriteLine(
"HashCount.Count=={0}, Thread.CurrentThread.GetHashCode()=={1}",
HashCount.Count,
Thread.CurrentThread.GetHashCode());
lock (HashCount)
{
if (!HashCount.ContainsKey(Thread.CurrentThread.GetHashCode()))
HashCount.Add (Thread.CurrentThread.GetHashCode(), 0);
HashCount[Thread.CurrentThread.GetHashCode()] =
((int)HashCount[Thread.CurrentThread.GetHashCode()])+1;
} // Do some busy work.
// Note: Depending on the speed of your machine, if you
// increase this number, the dispersement of the thread
// loads should be wider.
int iX = 2000;
Thread.Sleep(iX);
// The Interlocked.Increment method allows thread-safe modification
// of variables accessible across multiple threads.
Interlocked.Increment(ref iCount);
if (iCount == iMaxCount)
{
Console.WriteLine();
Console.WriteLine("Setting eventX ");
eventX.Set();
}
}
}public class SimplePool
{
public static int Main(string[] args)
{
Console.WriteLine("Thread Pool Sample:");
bool W2K = false;
int MaxCount = 10; // Allow a total of 10 threads in the pool
// Mark the event as unsignaled.
ManualResetEvent eventX = new ManualResetEvent(false);
Console.WriteLine("Queuing {0} items to Thread Pool", MaxCount);
Alpha oAlpha = new Alpha(MaxCount); // Create the work items.
// Make sure the work items have a reference to the signaling event.
oAlpha.eventX = eventX;
Console.WriteLine("Queue to Thread Pool 0");
try
{
// Queue the work items, which has the added effect of checking
// which OS is running.
ThreadPool.QueueUserWorkItem(new WaitCallback(oAlpha.Beta),
new SomeState(0));
W2K = true;
}
catch (NotSupportedException)
{
Console.WriteLine("These API's may fail when called on a non-Windows 2000 system.");
W2K = false;
}
if (W2K) // If running on an OS which supports the ThreadPool methods.
{
for (int iItem=1;iItem < MaxCount;iItem++)
{
// Queue the work items:
Console.WriteLine("Queue to Thread Pool {0}", iItem);
ThreadPool.QueueUserWorkItem(new WaitCallback(oAlpha.Beta),new SomeState(iItem));
}
Console.WriteLine("Waiting for Thread Pool to drain");
// The call to exventX.WaitOne sets the event to wait until
// eventX.Set() occurs.
// (See oAlpha.Beta).
// Wait until event is fired, meaning eventX.Set() was called:
eventX.WaitOne(Timeout.Infinite,true);
// The WaitOne won't return until the event has been signaled.
Console.WriteLine("Thread Pool has been drained (Event fired)");
Console.WriteLine();
Console.WriteLine("Load across threads");
foreach(object o in oAlpha.HashCount.Keys)
Console.WriteLine("{0} {1}", o, oAlpha.HashCount[o]);
}
return 0;
}
}
public static void Connect(EndPoint remoteEP, Socket client) {
client.BeginConnect(remoteEP,
new AsyncCallback(ConnectCallback), client ); connectDone.WaitOne();
}
连接回调方法 ConnectCallback 实现 AsyncCallback 委托。它在远程设备可用时连接到远程设备,然后通过设置 ManualResetEvent connectDone 向应用程序线程发出连接完成的信号。下面的代码实现 ConnectCallback 方法。private static void ConnectCallback(IAsyncResult ar) {
try {
// Retrieve the socket from the state object.
Socket client = (Socket) ar.AsyncState; // Complete the connection.
client.EndConnect(ar); Console.WriteLine("Socket connected to {0}",
client.RemoteEndPoint.ToString()); // Signal that the connection has been made.
connectDone.Set();
} catch (Exception e) {
Console.WriteLine(e.ToString());
}
}
Send 示例方法以 ASCII 格式对指定的字符串数据进行编码,并将其异步发送到指定的套接字所表示的网络设备。下面的示例实现 Send 方法。private static void Send(Socket client, String data) {
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = Encoding.ASCII.GetBytes(data); // Begin sending the data to the remote device.
client.BeginSend(byteData, 0, byteData.Length, SocketFlags.None,
new AsyncCallback(SendCallback), client);
}
发送回调方法 SendCallback 实现 AsyncCallback 委托。它在网络设备准备接收时发送数据。下面的示例显示 SendCallback 方法的实现。它假定存在一个名为 sendDone 的全局 ManualResetEvent 实例。private static void SendCallback(IAsyncResult ar) {
try {
// Retrieve the socket from the state object.
Socket client = (Socket) ar.AsyncState; // Complete sending the data to the remote device.
int bytesSent = client.EndSend(ar);
Console.WriteLine("Sent {0} bytes to server.", bytesSent); // Signal that all bytes have been sent.
sendDone.Set();
} catch (Exception e) {
Console.WriteLine(e.ToString());
}
}
从客户端套接字读取数据需要一个在异步调用之间传递值的状态对象。下面的类是用于从客户端套接字接收数据的示例状态对象。它包含以下各项的字段:客户端套接字,用于接收数据的缓冲区,以及用于保存传入数据字符串的 StringBuilder。将这些字段放在该状态对象中,使这些字段的值在多个调用之间得以保留,以便从客户端套接字读取数据。public class StateObject {
public Socket workSocket = null; // Client socket.
public const int BufferSize = 256; // Size of receive buffer.
public byte[] buffer = new byte[BufferSize]; // Receive buffer.
public StringBuilder sb = new StringBuilder();// Received data string.
}
Receive 方法示例设置状态对象,然后调用 BeginReceive 方法从客户端套接字异步读取数据。下面的示例实现 Receive 方法。private static void Receive(Socket client) {
try {
// Create the state object.
StateObject state = new StateObject();
state.workSocket = client; // Begin receiving the data from the remote device.
client.BeginReceive( state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback), state);
} catch (Exception e) {
Console.WriteLine(e.ToString());
}
}
接收回调方法 ReceiveCallback 实现 AsyncCallback 委托。它接收来自网络设备的数据并生成消息字符串。它将来自网络的一个或多个数据字节读入数据缓冲区,然后再次调用 BeginReceive 方法,直到客户端发送的数据完成为止。从客户端读取所有数据后,ReceiveCallback 通过设置 ManualResetEvent sendDone 向应用程序线程发出数据完成的信号。下面的示例代码实现 ReceiveCallback 方法。它假定存在一个名为 response 的全局字符串(该字符串保存接收的字符串)和一个名为 receiveDone 的全局 ManualResetEvent 实例。服务器必须正常关闭客户端套接字以结束网络会话。private static void ReceiveCallback( IAsyncResult ar ) {
try {
// Retrieve the state object and the client socket
// from the async state object.
StateObject state = (StateObject) ar.AsyncState;
Socket client = state.workSocket;
// Read data from the remote device.
int bytesRead = client.EndReceive(ar);
if (bytesRead > 0) {
// There might be more data, so store the data received so far.
state.sb.Append(Encoding.ASCII.GetString(state.buffer,0,bytesRead));
// Get the rest of the data.
client.BeginReceive(state.buffer,0,StateObject.BufferSize,0,
new AsyncCallback(ReceiveCallback), state);
} else {
// All the data has arrived; put it in response.
if (state.sb.Length > 1) {
response = state.sb.ToString();
}
// Signal that all bytes have been received.
receiveDone.Set();
}
} catch (Exception e) {
Console.WriteLine(e.ToString());
}
}