近日接触完成端口和线程池技术,看了侯捷多线程的书,对完成端口有个初步的理解。做了(参照网上的资料)一个socket+完成端口的例子,成功了。但是,对网上帖的线程池的实例,始终没能理解透。希望那位高手做过的,帮忙加些注释:
1)ThreadPool类各个函数的作用和实现思想。
2)类的函数ProcessJob中调用PostQueuedCompletionStatus的意图是什么?
3)StopThreadPool函数中调用GetQueuedCompletionStatus的意图是什么?
4)ManagerProc函数中调用GetQueuedCompletionStatus的意图又是什么,个人认为管理用的线程不用调用GetQueuedCompletionStatus函数,管理用线程不必“放入”线程池中。
5)给个测试的main函数。
6)或者还有关键点是我没有发现的,也请指点。如果能全面的讲解一下基于完成端口的线程池实现思想(总体思路,实现步骤),然后对应代码给说明一下最好了。
最初看到这些代码时,以为很简单,但是细细分析后~~~各位,拜托了。
部分代码如下。
如能通过Email联系,就太谢谢了:[email protected]
bool CThreadPool::Start(WORD wStatic, WORD wMax)
{
if (!(wStatic && wMax))
{
return false;
} m_wStaticThreadNum = wStatic;
m_wMaxThreadNum = wMax; ::EnterCriticalSection(&m_csLock); if (m_pThreadInfo)
{
delete [] m_pThreadInfo;
m_pThreadInfo = NULL;
}
m_pThreadInfo = new THREADINFO[wMax](); //Array for ThreadPool
if (m_pThreadInfo == NULL)
{
return false;
} if (m_hManagerIO)
{
::CloseHandle(m_hManagerIO);
m_hManagerIO = NULL;
}
m_hManagerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
if (m_hManagerIO == NULL)
{
return false;
} if (m_hWorkerIO)
{
::CloseHandle(m_hWorkerIO);
m_hManagerIO = NULL;
}
m_hWorkerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
if (m_hWorkerIO == NULL)
{
return false;
} // create thread for manager.
m_bQuitManager = false;
if (m_hManagerThread)
{
::TerminateThread(m_hManagerThread, 0);
m_hManagerThread = NULL;
}
unsigned ManagerThreadID;
m_hManagerThread = (HANDLE)_beginthreadex(NULL, 0, &ManagerProc, this, 0, &ManagerThreadID); //this: the class
if (m_hManagerThread == NULL)
{
return false;
} // create wStatic threads of pool
for (WORD i = 0; i < wStatic; ++i)
{
m_pThreadInfo[i].hThreadHandle = (HANDLE)_beginthreadex(NULL, 0, &WorkerProc, this, 0, &m_pThreadInfo[i].dwThreadID);
} ::LeaveCriticalSection(&m_csLock); return true;
}void CThreadPool::ProcessJob(CWorkDesc* pJob, CWork* pWorker) const
{
::PostQueuedCompletionStatus(
m_hWorkerIO,
reinterpret_cast<DWORD>(pWorker),
reinterpret_cast<DWORD>(pJob),
NULL ); 
}
void CThreadPool::Stop(void)
{
::EnterCriticalSection(&m_csLock); m_bQuitManager = true;
DWORD dwRes; // end the manager thread.
for (int i = 0; i < 10; i++)
{
::GetExitCodeThread(m_hManagerThread, &dwRes);
if (dwRes == CThreadPool::MANAGERPROC_RETURN_VALUE)
{
break;
}
if (i == 9)
{
::TerminateThread(m_hManagerThread, 0);
}
else
{
::Sleep(1000);
}
} // close the CompletionPort
::CloseHandle(m_hManagerIO);
m_hManagerIO = NULL; // end the thread in the pool
for (int i = 0; i < m_wMaxThreadNum; i++)
{
if (m_pThreadInfo[i].dwThreadID == 0)
{
continue;
}
m_pThreadInfo[i].bIsQuit = true; for (int j = 0; j < 10; j++)
{
::GetExitCodeThread(m_pThreadInfo[i].hThreadHandle, &dwRes);
if (dwRes == CThreadPool::WORKERPROC_RETURN_VALUE)
{
break;
}
if (j == 9)
{
::TerminateThread(m_pThreadInfo[i].hThreadHandle, 0);
}
else
{
::Sleep(500);
}
}
} ::CloseHandle(m_hWorkerIO);
m_hWorkerIO = NULL; if (m_pThreadInfo)
{
delete [] m_pThreadInfo;
m_pThreadInfo = NULL;
} unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped; while (::GetQueuedCompletionStatus(m_hWorkerIO, &pN1, &pN2, &pOverLapped, 0)) //m_hManagerIO
{
CWork* pWork = reinterpret_cast<CWork*>(pN1);
CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2);
delete pWorkDesc;
}
::LeaveCriticalSection(&m_csLock);
}
unsigned __stdcall CThreadPool::ManagerProc(void* pThread)
{
unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped; CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); while (!pServer->m_bQuitManager)
{
if (::GetQueuedCompletionStatus(
pServer->m_hManagerIO, 
&pN1, 
&pN2, 
&pOverLapped, 
pServer->m_dwMSeconds) 
== TRUE)
{
if (pOverLapped == (OVERLAPPED*)0xFFFFFFFFF)
{
break;
}
}
else
{
EThreadStatus stat = pServer->GetWorkThreadStatus(); 
if (stat == CThreadPool::BUSY)
{
puts("Add thread");
pServer->AddThread();
}
else if (stat == CThreadPool::IDLE)
{
puts("Del thread");
pServer->DelThread();
}
else
{}
}
}
_endthreadex(CThreadPool::MANAGERPROC_RETURN_VALUE); return CThreadPool::MANAGERPROC_RETURN_VALUE; 
}
unsigned __stdcall CThreadPool::WorkerProc(void* pThread)
{
unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped;
CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); DWORD threadID = ::GetCurrentThreadId();
int nSeq = pServer->GetThreadbyID(threadID); if (nSeq < 0)
{
return 0;
} while (!pServer->m_pThreadInfo[nSeq].bIsQuit)
{
if (::GetQueuedCompletionStatus(
pServer->m_hWorkerIO, 
&pN1, 
&pN2, 
&pOverLapped, 
pServer->m_dwMSeconds))
{
CWork* pWork = reinterpret_cast<CWork*>(pN1);
CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2); printf("do work. \n");
pServer->m_pThreadInfo[nSeq].bIsBusy = true; pWork->ThreadJob(pWorkDesc);
delete pWorkDesc; pServer->m_pThreadInfo[nSeq].bIsBusy = false;
printf("do work over. \n");
}
}
printf("worker thread down. \n"); pServer->m_pThreadInfo[nSeq].dwThreadID = 0; _endthreadex(CThreadPool::WORKERPROC_RETURN_VALUE); return CThreadPool::WORKERPROC_RETURN_VALUE;
}
CThreadPool::EThreadStatus CThreadPool::GetWorkThreadStatus(void)
{
float fAll = 0.0;
float fRun = 0.0;
for (WORD wi = 0; wi < m_wMaxThreadNum; ++wi)
{
if (m_pThreadInfo[wi].dwThreadID)
{
fAll++;
if (m_pThreadInfo[wi].bIsBusy)
{
fRun++;
}
}
} if (fAll == 0)
{
return CThreadPool::IDLE; 
}
if (fRun / (1.0 * fAll) > 0.8)
{
return CThreadPool::BUSY;
}
if (fRun / (1.0 * fAll) < 0.2)
{
return CThreadPool::IDLE;
}
return CThreadPool::NORMAL;
}

解决方案 »

  1.   

    自己先顶一下:
    侯捷书中关于完成端口使用步骤的讲述
    1) 产生一个I/O completion port
    2) 让它和一个文件handle产生关联
    3) 产生一堆线程
    4) 让每个线程都在completion port上等待
    5) 开始对着那个文件handle发出一些overlapped I/O请求。
    我们实现线程池的思想(我的设想):
    1)产程一个管理线程,来负责线程池的增加,减少等工作。
    2)产生几个线程,负责完成具体的任务,这几个线程由完成端口来调度。这几个线程加完成端口就构成“池”的概念,对吗?
    3)线程具体的工作在类中不定义,具体使用线程池时再定义?怎么定义具体工作,或者说怎么用这个线程池我还不清楚。
    线程池的问题困扰快一周了,急得不行(!!-_-!!)。
    拜托了,各位。
      

  2.   

    你这里少代码啊?没有调用ProcessJob的过程,这个就是将当前线程放入线程池的处理函数
      

  3.   

    sorry,刚看到,在这里看代码真的很费劲
    我目前也在做类似的东西,可以采用系统提供的线程池,自己来写工作线程处理,线程池与完成端口类似,有一些共同的特性,线程池只要写好回调函数以及同步对象就可以了
      

  4.   

    我用IOCP做服务器,运行几天出现的.
    我的情况是:使用完成端口模型作服务器,当GetQueuedCompletionStatus的时候   
      偶尔会出现   
              ERROR_CONNECTION_ABORTED   由本地系统终止网络连接   
              ERROR_SEM_TIMEOUT       信号灯超时时间已到。   
      样的错误,同时服务器能够接受连接但是不能write/read数据,除了线程被阻塞意外还有什么可能呢?
      请各位大大帮忙提点提点 ..
      

  5.   

    哦,先谢谢了。我在Win核心编程中也有看到QueueUserWorkItem,不过没有认真的研究。我这里贴出的代码只是关键部分,其他的我没有贴出来。我感觉这套设计方案应该是可取的,但是我不是很明白其中的细节。还有就是怎么使用这个线程池还不知道。所以希望有人能指点一下。
      

  6.   

    toxyboy: 你的线程池的模型和我帖的这个是一样的吗?那能摆脱你给我帖的代码讲解一下,在写个简单的main,来使用这个线程池???先谢谢了。