哥哥姐姐你们好!
    我现在想写个处理N个客户连接的线程池服务器,由于以前没接触过线程池.所以对线程池的原理不是很清楚,我在google或者baidu里看了下,但是都讲的不是很清楚,很多细节都是一笔带过,我希望路过的哥哥姐姐能给我详细地讲述一下线程池的原理和创建步骤,最好有完整的源代码,源代码和介绍文章同步的最好.小弟在此谢过.

解决方案 »

  1.   

        线程池使用 Windows NT 和 Windows 2000 操作系统系列下的可用完成端口。这种线程池的基本原理是创建一个单独的完成端口,使所有的工作线程在此完成端口上等待,当新请求进入时,将该请求发送至此完成端口。完成端口机制可确保只有一个控制线程获得这个新请求的通知。     如果不使用完成端口而使用其他同步对象(如信号灯),也可以创建并管理线程池。但是,使用完成端口时,操作系统会以最有效的方式调度和释放线程(即“后进先出”)。   其实可以把完成端口看成系统维护的一个队列,操作系统把重叠IO操作完成的事件通知放到该队列里,由于是暴露 “操作完成”的事件通知,所以命名为“完成端口”(COmpletion Ports)。一个socket被创建后,可以在任何时刻和一个完成端口联系起来。
      一般来说,一个应用程序可以创建多个工作线程来处理完成端口上的通知事件。工作线程的数量依赖于程序的具体需要。但是在理想的情况下,应该对应一个CPU创建一个线程。因为在完成端口理想模型中,每个线程都可以从系统获得一个“原子”性的时间片,轮番运行并检查完成端口,线程的切换是额外的开销。在实际开发的时候,还要考虑这些线程是否牵涉到其他堵塞操作的情况。如果某线程进行堵塞操作,系统则将其挂起,让别的线程获得运行时间。因此,如果有这样的情况,可以多创建几个线程来尽量利用时间。
      总之,开发一个可扩展的Winsock服务器并非十分困难的。主要是开始一个监听socket,接收连接,并且进行重叠发送和接收的IO操作。最大的挑战就是管理系统资源,限制重叠Io的数量,避免内存危机。遵循这几个原则,就能帮助你开发高性能,可扩展的服务程序。
        
      

  2.   

    #ifndef _ThreadPool_H_
    #define _ThreadPool_H_
    #pragma warning(disable: 4530)
    #pragma warning(disable: 4786)
    #include <cassert>
    #include <vector>
    #include <queue>
    #include <windows.h>using namespace std;class ThreadJob  //工作基类
    {
    public:
    //供线程池调用的虚函数
    virtual void DoJob(void *pPara) = 0;
    };
    class ThreadPool
    {
    public:
    //dwNum 线程池规模
    ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
    {
    InitializeCriticalSection(&_csThreadVector);
    InitializeCriticalSection(&_csWorkQueue);
    //手动还是自动
    _EventComplete = CreateEvent(0, false, false, NULL);
    _EventEnd = CreateEvent(0, true, false, NULL);
    _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
    assert(_EventComplete != INVALID_HANDLE_VALUE);
    assert(_EventEnd != INVALID_HANDLE_VALUE);
    assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
    AdjustSize(dwNum <= 0 ? 4 : dwNum);
    }
    ~ThreadPool()
    {
    DeleteCriticalSection(&_csWorkQueue);
    CloseHandle(_EventEnd);
    CloseHandle(_EventComplete);
    CloseHandle(_SemaphoreCall);
    CloseHandle(_SemaphoreDel); vector<ThreadItem*>::iterator iter;
    for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
    {
    if(*iter)
    delete *iter;
    }
    DeleteCriticalSection(&_csThreadVector);
    }
    //调整线程池规模
    int AdjustSize(int iNum)
    {
    if(iNum > 0)
    {
    ThreadItem *pNew;
    EnterCriticalSection(&_csThreadVector);
    for(int _i=0; _i<iNum; _i++)
    {
    _ThreadVector.push_back(pNew = new ThreadItem(this)); 
    assert(pNew);
    pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
    // set priority
    SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
    assert(pNew->_Handle);
    }
    LeaveCriticalSection(&_csThreadVector);
    }
    else
    {
    iNum *= -1;
    ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
    }
    return (int)_lThreadNum;
    }
    //调用线程池
    void Call(void (*pFunc)(void  *), void *pPara = NULL)
    {
    assert(pFunc);
    EnterCriticalSection(&_csWorkQueue);
    _JobQueue.push(new JobItem(pFunc, pPara));
    LeaveCriticalSection(&_csWorkQueue);
    ReleaseSemaphore(_SemaphoreCall, 1, NULL);//通知
    }
    //调用线程池
    inline void Call(ThreadJob * p, void *pPara = NULL)
    {
    Call(CallProc, new CallProcPara(p, pPara));
    }
    //结束线程池, 并同步等待
    bool EndAndWait(DWORD dwWaitTime = INFINITE)
    {
    SetEvent(_EventEnd);
    return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
    }
    //结束线程池
    inline void End()
    {
    SetEvent(_EventEnd);
    }
    inline DWORD Size()
    {
    return (DWORD)_lThreadNum;
    }
    inline DWORD GetRunningSize()
    {
    return (DWORD)_lRunningNum;
    }
    bool IsRunning()
    {
    return _lRunningNum > 0;
    }
    protected:
    //工作线程
    static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
    {
    ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
    assert(pThread);
    ThreadPool *pThreadPoolObj = pThread->_pThis;
    assert(pThreadPoolObj);
    InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
    HANDLE hWaitHandle[3];
    hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
    hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
    hWaitHandle[2] = pThreadPoolObj->_EventEnd;
    JobItem *pJob;
    bool fHasJob; for(;;)
    {
    DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
    //响应删除线程信号
    if(wr == WAIT_OBJECT_0 + 1)  
    break; //从队列里取得用户作业
    EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
    if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
    {
    pJob = pThreadPoolObj->_JobQueue.front();
    pThreadPoolObj->_JobQueue.pop();
    assert(pJob);
    }
    LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
    //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
    if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  
    break;
    if(fHasJob && pJob)
    {
    InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    pThread->_dwLastBeginTime = GetTickCount();
    pThread->_dwCount++;
    pThread->_fIsRunning = true;
    pJob->_pFunc(pJob->_pPara); //运行用户作业
    delete pJob; 
    pThread->_fIsRunning = false;
    InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
    }
    }
    //删除自身结构
    EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
    pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
    LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
    delete pThread;
    InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
    if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
    SetEvent(pThreadPoolObj->_EventComplete);
    return 0;
    }
    //调用用户对象虚函数
    static void CallProc(void *pPara) 
    {
    CallProcPara *cp = static_cast<CallProcPara *>(pPara);
    assert(cp);
    if(cp)
    {
    cp->_pObj->DoJob(cp->_pPara);
    delete cp;
    }
    }
    //用户对象结构
    struct CallProcPara  
    {
    ThreadJob* _pObj;//用户对象 
    void *_pPara;//用户参数
    CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
    };
    //用户函数结构
    struct JobItem 
    {
    void (*_pFunc)(void  *);//函数
    void *_pPara; //参数
    JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
    };
    //线程池中的线程结构
    struct ThreadItem
    {
    HANDLE _Handle; //线程句柄
    ThreadPool *_pThis;  //线程池的指针
    DWORD _dwLastBeginTime; //最后一次运行开始时间
    DWORD _dwCount; //运行次数
    bool _fIsRunning;
    ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
    ~ThreadItem()
    {
    if(_Handle)
    {
    CloseHandle(_Handle);
    _Handle = NULL;
    }
    }
    }; std::queue<JobItem *> _JobQueue;  //工作队列
    std::vector<ThreadItem *>  _ThreadVector; //线程数据
    CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
    HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
    long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
    };
    #endif //_ThreadPool_H_
    线程池思相: 有两个队列:线程队列与工作队列.线程队列里是开启的线程,工作队列是客户提交要完成的工作.线程队列里的线程会向工作队列里找工作做,如果有工作则去执行工作,否则阻塞.当有新工作加入,会唤醒阻塞线程,干活.给公司上班一样,几个人,有活干活,没活等活.
    不知道你听明白没?
      

  3.   

    #ifndef _ThreadPool_H_
    #define _ThreadPool_H_
    #pragma warning(disable: 4530)
    #pragma warning(disable: 4786)
    #include <cassert>
    #include <vector>
    #include <queue>
    #include <windows.h>using namespace std;class ThreadJob  //工作基类
    {
    public:
    //供线程池调用的虚函数
    virtual void DoJob(void *pPara) = 0;
    };
    class ThreadPool
    {
    public:
    //dwNum 线程池规模
    ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
    {
    InitializeCriticalSection(&_csThreadVector);
    InitializeCriticalSection(&_csWorkQueue);
    //手动还是自动
    _EventComplete = CreateEvent(0, false, false, NULL);
    _EventEnd = CreateEvent(0, true, false, NULL);
    _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
    assert(_EventComplete != INVALID_HANDLE_VALUE);
    assert(_EventEnd != INVALID_HANDLE_VALUE);
    assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
    AdjustSize(dwNum <= 0 ? 4 : dwNum);
    }
    ~ThreadPool()
    {
    DeleteCriticalSection(&_csWorkQueue);
    CloseHandle(_EventEnd);
    CloseHandle(_EventComplete);
    CloseHandle(_SemaphoreCall);
    CloseHandle(_SemaphoreDel); vector<ThreadItem*>::iterator iter;
    for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
    {
    if(*iter)
    delete *iter;
    }
    DeleteCriticalSection(&_csThreadVector);
    }
    //调整线程池规模
    int AdjustSize(int iNum)
    {
    if(iNum > 0)
    {
    ThreadItem *pNew;
    EnterCriticalSection(&_csThreadVector);
    for(int _i=0; _i<iNum; _i++)
    {
    _ThreadVector.push_back(pNew = new ThreadItem(this)); 
    assert(pNew);
    pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
    // set priority
    SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
    assert(pNew->_Handle);
    }
    LeaveCriticalSection(&_csThreadVector);
    }
    else
    {
    iNum *= -1;
    ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
    }
    return (int)_lThreadNum;
    }
    //调用线程池
    void Call(void (*pFunc)(void  *), void *pPara = NULL)
    {
    assert(pFunc);
    EnterCriticalSection(&_csWorkQueue);
    _JobQueue.push(new JobItem(pFunc, pPara));
    LeaveCriticalSection(&_csWorkQueue);
    ReleaseSemaphore(_SemaphoreCall, 1, NULL);//通知
    }
    //调用线程池
    inline void Call(ThreadJob * p, void *pPara = NULL)
    {
    Call(CallProc, new CallProcPara(p, pPara));
    }
    //结束线程池, 并同步等待
    bool EndAndWait(DWORD dwWaitTime = INFINITE)
    {
    SetEvent(_EventEnd);
    return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
    }
    //结束线程池
    inline void End()
    {
    SetEvent(_EventEnd);
    }
    inline DWORD Size()
    {
    return (DWORD)_lThreadNum;
    }
    inline DWORD GetRunningSize()
    {
    return (DWORD)_lRunningNum;
    }
    bool IsRunning()
    {
    return _lRunningNum > 0;
    }
    protected:
    //工作线程
    static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
    {
    ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
    assert(pThread);
    ThreadPool *pThreadPoolObj = pThread->_pThis;
    assert(pThreadPoolObj);
    InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
    HANDLE hWaitHandle[3];
    hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
    hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
    hWaitHandle[2] = pThreadPoolObj->_EventEnd;
    JobItem *pJob;
    bool fHasJob; for(;;)
    {
    DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
    //响应删除线程信号
    if(wr == WAIT_OBJECT_0 + 1)  
    break; //从队列里取得用户作业
    EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
    if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
    {
    pJob = pThreadPoolObj->_JobQueue.front();
    pThreadPoolObj->_JobQueue.pop();
    assert(pJob);
    }
    LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
    //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
    if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  
    break;
    if(fHasJob && pJob)
    {
    InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    pThread->_dwLastBeginTime = GetTickCount();
    pThread->_dwCount++;
    pThread->_fIsRunning = true;
    pJob->_pFunc(pJob->_pPara); //运行用户作业
    delete pJob; 
    pThread->_fIsRunning = false;
    InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
    }
    }
    //删除自身结构
    EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
    pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
    LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
    delete pThread;
    InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
    if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
    SetEvent(pThreadPoolObj->_EventComplete);
    return 0;
    }
    //调用用户对象虚函数
    static void CallProc(void *pPara) 
    {
    CallProcPara *cp = static_cast<CallProcPara *>(pPara);
    assert(cp);
    if(cp)
    {
    cp->_pObj->DoJob(cp->_pPara);
    delete cp;
    }
    }
    //用户对象结构
    struct CallProcPara  
    {
    ThreadJob* _pObj;//用户对象 
    void *_pPara;//用户参数
    CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
    };
    //用户函数结构
    struct JobItem 
    {
    void (*_pFunc)(void  *);//函数
    void *_pPara; //参数
    JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
    };
    //线程池中的线程结构
    struct ThreadItem
    {
    HANDLE _Handle; //线程句柄
    ThreadPool *_pThis;  //线程池的指针
    DWORD _dwLastBeginTime; //最后一次运行开始时间
    DWORD _dwCount; //运行次数
    bool _fIsRunning;
    ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
    ~ThreadItem()
    {
    if(_Handle)
    {
    CloseHandle(_Handle);
    _Handle = NULL;
    }
    }
    }; std::queue<JobItem *> _JobQueue;  //工作队列
    std::vector<ThreadItem *>  _ThreadVector; //线程数据
    CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
    HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
    long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
    };
    #endif //_ThreadPool_H_
    线程池思相: 有两个队列:线程队列与工作队列.线程队列里是开启的线程,工作队列是客户提交要完成的工作.线程队列里的线程会向工作队列里找工作做,如果有工作则去执行工作,否则阻塞.当有新工作加入,会唤醒阻塞线程,干活.给公司上班一样,几个人,有活干活,没活等活.
    不知道你听明白没?
      

  4.   

    #ifndef _ThreadPool_H_
    #define _ThreadPool_H_
    #pragma warning(disable: 4530)
    #pragma warning(disable: 4786)
    #include <cassert>
    #include <vector>
    #include <queue>
    #include <windows.h>using namespace std;class ThreadJob  //工作基类
    {
    public:
    //供线程池调用的虚函数
    virtual void DoJob(void *pPara) = 0;
    };
    class ThreadPool
    {
    public:
    //dwNum 线程池规模
    ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
    {
    InitializeCriticalSection(&_csThreadVector);
    InitializeCriticalSection(&_csWorkQueue);
    //手动还是自动
    _EventComplete = CreateEvent(0, false, false, NULL);
    _EventEnd = CreateEvent(0, true, false, NULL);
    _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
    assert(_EventComplete != INVALID_HANDLE_VALUE);
    assert(_EventEnd != INVALID_HANDLE_VALUE);
    assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
    AdjustSize(dwNum <= 0 ? 4 : dwNum);
    }
    ~ThreadPool()
    {
    DeleteCriticalSection(&_csWorkQueue);
    CloseHandle(_EventEnd);
    CloseHandle(_EventComplete);
    CloseHandle(_SemaphoreCall);
    CloseHandle(_SemaphoreDel); vector<ThreadItem*>::iterator iter;
    for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
    {
    if(*iter)
    delete *iter;
    }
    DeleteCriticalSection(&_csThreadVector);
    }
    //调整线程池规模
    int AdjustSize(int iNum)
    {
    if(iNum > 0)
    {
    ThreadItem *pNew;
    EnterCriticalSection(&_csThreadVector);
    for(int _i=0; _i<iNum; _i++)
    {
    _ThreadVector.push_back(pNew = new ThreadItem(this)); 
    assert(pNew);
    pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
    // set priority
    SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
    assert(pNew->_Handle);
    }
    LeaveCriticalSection(&_csThreadVector);
    }
    else
    {
    iNum *= -1;
    ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
    }
    return (int)_lThreadNum;
    }
    //调用线程池
    void Call(void (*pFunc)(void  *), void *pPara = NULL)
    {
    assert(pFunc);
    EnterCriticalSection(&_csWorkQueue);
    _JobQueue.push(new JobItem(pFunc, pPara));
    LeaveCriticalSection(&_csWorkQueue);
    ReleaseSemaphore(_SemaphoreCall, 1, NULL);//通知
    }
    //调用线程池
    inline void Call(ThreadJob * p, void *pPara = NULL)
    {
    Call(CallProc, new CallProcPara(p, pPara));
    }
    //结束线程池, 并同步等待
    bool EndAndWait(DWORD dwWaitTime = INFINITE)
    {
    SetEvent(_EventEnd);
    return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
    }
    //结束线程池
    inline void End()
    {
    SetEvent(_EventEnd);
    }
    inline DWORD Size()
    {
    return (DWORD)_lThreadNum;
    }
    inline DWORD GetRunningSize()
    {
    return (DWORD)_lRunningNum;
    }
    bool IsRunning()
    {
    return _lRunningNum > 0;
    }
    protected:
    //工作线程
    static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
    {
    ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
    assert(pThread);
    ThreadPool *pThreadPoolObj = pThread->_pThis;
    assert(pThreadPoolObj);
    InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
    HANDLE hWaitHandle[3];
    hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
    hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
    hWaitHandle[2] = pThreadPoolObj->_EventEnd;
    JobItem *pJob;
    bool fHasJob; for(;;)
    {
    DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
    //响应删除线程信号
    if(wr == WAIT_OBJECT_0 + 1)  
    break; //从队列里取得用户作业
    EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
    if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
    {
    pJob = pThreadPoolObj->_JobQueue.front();
    pThreadPoolObj->_JobQueue.pop();
    assert(pJob);
    }
    LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
    //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
    if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  
    break;
    if(fHasJob && pJob)
    {
    InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    pThread->_dwLastBeginTime = GetTickCount();
    pThread->_dwCount++;
    pThread->_fIsRunning = true;
    pJob->_pFunc(pJob->_pPara); //运行用户作业
    delete pJob; 
    pThread->_fIsRunning = false;
    InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
    }
    }
    //删除自身结构
    EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
    pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
    LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
    delete pThread;
    InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
    if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
    SetEvent(pThreadPoolObj->_EventComplete);
    return 0;
    }
    //调用用户对象虚函数
    static void CallProc(void *pPara) 
    {
    CallProcPara *cp = static_cast<CallProcPara *>(pPara);
    assert(cp);
    if(cp)
    {
    cp->_pObj->DoJob(cp->_pPara);
    delete cp;
    }
    }
    //用户对象结构
    struct CallProcPara  
    {
    ThreadJob* _pObj;//用户对象 
    void *_pPara;//用户参数
    CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
    };
    //用户函数结构
    struct JobItem 
    {
    void (*_pFunc)(void  *);//函数
    void *_pPara; //参数
    JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
    };
    //线程池中的线程结构
    struct ThreadItem
    {
    HANDLE _Handle; //线程句柄
    ThreadPool *_pThis;  //线程池的指针
    DWORD _dwLastBeginTime; //最后一次运行开始时间
    DWORD _dwCount; //运行次数
    bool _fIsRunning;
    ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
    ~ThreadItem()
    {
    if(_Handle)
    {
    CloseHandle(_Handle);
    _Handle = NULL;
    }
    }
    }; std::queue<JobItem *> _JobQueue;  //工作队列
    std::vector<ThreadItem *>  _ThreadVector; //线程数据
    CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
    HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
    long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
    };
    #endif //_ThreadPool_H_
    线程池思相: 有两个队列:线程队列与工作队列.线程队列里是开启的线程,工作队列是客户提交要完成的工作.线程队列里的线程会向工作队列里找工作做,如果有工作则去执行工作,否则阻塞.当有新工作加入,会唤醒阻塞线程,干活.给公司上班一样,几个人,有活干活,没活等活.
    不知道你听明白没?
      

  5.   

    支持一下写过几个线程池, 简单的用MFC, 复杂的用完成端口