近日接触完成端口和线程池技术,看了侯捷多线程的书,对完成端口有个初步的理解。做了(参照网上的资料)一个socket+完成端口的例子,成功了。但是,对网上帖的线程池的实例,始终没能理解透。希望那位高手做过的,帮忙加些注释:
1)ThreadPool类各个函数的作用和实现思想。
2)类的函数ProcessJob中调用PostQueuedCompletionStatus的意图是什么?
3)StopThreadPool函数中调用GetQueuedCompletionStatus的意图是什么?
4)ManagerProc函数中调用GetQueuedCompletionStatus的意图又是什么,个人认为管理用的线程不用调用GetQueuedCompletionStatus函数,管理用线程不必“放入”线程池中。
5)给个测试的main函数。
6)或者还有关键点是我没有发现的,也请指点。如果能全面的讲解一下基于完成端口的线程池实现思想(总体思路,实现步骤),然后对应代码给说明一下最好了。
最初看到这些代码时,以为很简单,但是细细分析后~~~各位,拜托了。
部分代码如下。
如能通过Email联系,就太谢谢了:[email protected]
bool CThreadPool::Start(WORD wStatic, WORD wMax)
{
if (!(wStatic && wMax))
{
return false;
} m_wStaticThreadNum = wStatic;
m_wMaxThreadNum = wMax; ::EnterCriticalSection(&m_csLock); if (m_pThreadInfo)
{
delete [] m_pThreadInfo;
m_pThreadInfo = NULL;
}
m_pThreadInfo = new THREADINFO[wMax](); //Array for ThreadPool
if (m_pThreadInfo == NULL)
{
return false;
} if (m_hManagerIO)
{
::CloseHandle(m_hManagerIO);
m_hManagerIO = NULL;
}
m_hManagerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (m_hManagerIO == NULL)
{
return false;
} if (m_hWorkerIO)
{
::CloseHandle(m_hWorkerIO);
m_hManagerIO = NULL;
}
m_hWorkerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (m_hWorkerIO == NULL)
{
return false;
} // create thread for manager.
m_bQuitManager = false;
if (m_hManagerThread)
{
::TerminateThread(m_hManagerThread, 0);
m_hManagerThread = NULL;
}
unsigned ManagerThreadID;
m_hManagerThread = (HANDLE)_beginthreadex(NULL, 0, &ManagerProc, this, 0, &ManagerThreadID); //this: the class
if (m_hManagerThread == NULL)
{
return false;
} // create wStatic threads of pool
for (WORD i = 0; i < wStatic; ++i)
{
m_pThreadInfo[i].hThreadHandle = (HANDLE)_beginthreadex(NULL, 0, &WorkerProc, this, 0, &m_pThreadInfo[i].dwThreadID);
} ::LeaveCriticalSection(&m_csLock); return true;
}void CThreadPool::ProcessJob(CWorkDesc* pJob, CWork* pWorker) const
{
::PostQueuedCompletionStatus(
m_hWorkerIO,
reinterpret_cast<DWORD>(pWorker),
reinterpret_cast<DWORD>(pJob),
NULL );
}
void CThreadPool::Stop(void)
{
::EnterCriticalSection(&m_csLock); m_bQuitManager = true;
DWORD dwRes; // end the manager thread.
for (int i = 0; i < 10; i++)
{
::GetExitCodeThread(m_hManagerThread, &dwRes);
if (dwRes == CThreadPool::MANAGERPROC_RETURN_VALUE)
{
break;
}
if (i == 9)
{
::TerminateThread(m_hManagerThread, 0);
}
else
{
::Sleep(1000);
}
} // close the CompletionPort
::CloseHandle(m_hManagerIO);
m_hManagerIO = NULL; // end the thread in the pool
for (int i = 0; i < m_wMaxThreadNum; i++)
{
if (m_pThreadInfo[i].dwThreadID == 0)
{
continue;
}
m_pThreadInfo[i].bIsQuit = true; for (int j = 0; j < 10; j++)
{
::GetExitCodeThread(m_pThreadInfo[i].hThreadHandle, &dwRes);
if (dwRes == CThreadPool::WORKERPROC_RETURN_VALUE)
{
break;
}
if (j == 9)
{
::TerminateThread(m_pThreadInfo[i].hThreadHandle, 0);
}
else
{
::Sleep(500);
}
}
} ::CloseHandle(m_hWorkerIO);
m_hWorkerIO = NULL; if (m_pThreadInfo)
{
delete [] m_pThreadInfo;
m_pThreadInfo = NULL;
} unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped; while (::GetQueuedCompletionStatus(m_hWorkerIO, &pN1, &pN2, &pOverLapped, 0)) //m_hManagerIO
{
CWork* pWork = reinterpret_cast<CWork*>(pN1);
CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2);
delete pWorkDesc;
}
::LeaveCriticalSection(&m_csLock);
}
unsigned __stdcall CThreadPool::ManagerProc(void* pThread)
{
unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped; CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); while (!pServer->m_bQuitManager)
{
if (::GetQueuedCompletionStatus(
pServer->m_hManagerIO,
&pN1,
&pN2,
&pOverLapped,
pServer->m_dwMSeconds)
== TRUE)
{
if (pOverLapped == (OVERLAPPED*)0xFFFFFFFFF)
{
break;
}
}
else
{
EThreadStatus stat = pServer->GetWorkThreadStatus();
if (stat == CThreadPool::BUSY)
{
puts("Add thread");
pServer->AddThread();
}
else if (stat == CThreadPool::IDLE)
{
puts("Del thread");
pServer->DelThread();
}
else
{}
}
}
_endthreadex(CThreadPool::MANAGERPROC_RETURN_VALUE); return CThreadPool::MANAGERPROC_RETURN_VALUE;
}
unsigned __stdcall CThreadPool::WorkerProc(void* pThread)
{
unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped;
CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); DWORD threadID = ::GetCurrentThreadId();
int nSeq = pServer->GetThreadbyID(threadID); if (nSeq < 0)
{
return 0;
} while (!pServer->m_pThreadInfo[nSeq].bIsQuit)
{
if (::GetQueuedCompletionStatus(
pServer->m_hWorkerIO,
&pN1,
&pN2,
&pOverLapped,
pServer->m_dwMSeconds))
{
CWork* pWork = reinterpret_cast<CWork*>(pN1);
CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2); printf("do work. \n");
pServer->m_pThreadInfo[nSeq].bIsBusy = true; pWork->ThreadJob(pWorkDesc);
delete pWorkDesc; pServer->m_pThreadInfo[nSeq].bIsBusy = false;
printf("do work over. \n");
}
}
printf("worker thread down. \n"); pServer->m_pThreadInfo[nSeq].dwThreadID = 0; _endthreadex(CThreadPool::WORKERPROC_RETURN_VALUE); return CThreadPool::WORKERPROC_RETURN_VALUE;
}
CThreadPool::EThreadStatus CThreadPool::GetWorkThreadStatus(void)
{
float fAll = 0.0;
float fRun = 0.0;
for (WORD wi = 0; wi < m_wMaxThreadNum; ++wi)
{
if (m_pThreadInfo[wi].dwThreadID)
{
fAll++;
if (m_pThreadInfo[wi].bIsBusy)
{
fRun++;
}
}
} if (fAll == 0)
{
return CThreadPool::IDLE;
}
if (fRun / (1.0 * fAll) > 0.8)
{
return CThreadPool::BUSY;
}
if (fRun / (1.0 * fAll) < 0.2)
{
return CThreadPool::IDLE;
}
return CThreadPool::NORMAL;
}
1)ThreadPool类各个函数的作用和实现思想。
2)类的函数ProcessJob中调用PostQueuedCompletionStatus的意图是什么?
3)StopThreadPool函数中调用GetQueuedCompletionStatus的意图是什么?
4)ManagerProc函数中调用GetQueuedCompletionStatus的意图又是什么,个人认为管理用的线程不用调用GetQueuedCompletionStatus函数,管理用线程不必“放入”线程池中。
5)给个测试的main函数。
6)或者还有关键点是我没有发现的,也请指点。如果能全面的讲解一下基于完成端口的线程池实现思想(总体思路,实现步骤),然后对应代码给说明一下最好了。
最初看到这些代码时,以为很简单,但是细细分析后~~~各位,拜托了。
部分代码如下。
如能通过Email联系,就太谢谢了:[email protected]
bool CThreadPool::Start(WORD wStatic, WORD wMax)
{
if (!(wStatic && wMax))
{
return false;
} m_wStaticThreadNum = wStatic;
m_wMaxThreadNum = wMax; ::EnterCriticalSection(&m_csLock); if (m_pThreadInfo)
{
delete [] m_pThreadInfo;
m_pThreadInfo = NULL;
}
m_pThreadInfo = new THREADINFO[wMax](); //Array for ThreadPool
if (m_pThreadInfo == NULL)
{
return false;
} if (m_hManagerIO)
{
::CloseHandle(m_hManagerIO);
m_hManagerIO = NULL;
}
m_hManagerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (m_hManagerIO == NULL)
{
return false;
} if (m_hWorkerIO)
{
::CloseHandle(m_hWorkerIO);
m_hManagerIO = NULL;
}
m_hWorkerIO = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (m_hWorkerIO == NULL)
{
return false;
} // create thread for manager.
m_bQuitManager = false;
if (m_hManagerThread)
{
::TerminateThread(m_hManagerThread, 0);
m_hManagerThread = NULL;
}
unsigned ManagerThreadID;
m_hManagerThread = (HANDLE)_beginthreadex(NULL, 0, &ManagerProc, this, 0, &ManagerThreadID); //this: the class
if (m_hManagerThread == NULL)
{
return false;
} // create wStatic threads of pool
for (WORD i = 0; i < wStatic; ++i)
{
m_pThreadInfo[i].hThreadHandle = (HANDLE)_beginthreadex(NULL, 0, &WorkerProc, this, 0, &m_pThreadInfo[i].dwThreadID);
} ::LeaveCriticalSection(&m_csLock); return true;
}void CThreadPool::ProcessJob(CWorkDesc* pJob, CWork* pWorker) const
{
::PostQueuedCompletionStatus(
m_hWorkerIO,
reinterpret_cast<DWORD>(pWorker),
reinterpret_cast<DWORD>(pJob),
NULL );
}
void CThreadPool::Stop(void)
{
::EnterCriticalSection(&m_csLock); m_bQuitManager = true;
DWORD dwRes; // end the manager thread.
for (int i = 0; i < 10; i++)
{
::GetExitCodeThread(m_hManagerThread, &dwRes);
if (dwRes == CThreadPool::MANAGERPROC_RETURN_VALUE)
{
break;
}
if (i == 9)
{
::TerminateThread(m_hManagerThread, 0);
}
else
{
::Sleep(1000);
}
} // close the CompletionPort
::CloseHandle(m_hManagerIO);
m_hManagerIO = NULL; // end the thread in the pool
for (int i = 0; i < m_wMaxThreadNum; i++)
{
if (m_pThreadInfo[i].dwThreadID == 0)
{
continue;
}
m_pThreadInfo[i].bIsQuit = true; for (int j = 0; j < 10; j++)
{
::GetExitCodeThread(m_pThreadInfo[i].hThreadHandle, &dwRes);
if (dwRes == CThreadPool::WORKERPROC_RETURN_VALUE)
{
break;
}
if (j == 9)
{
::TerminateThread(m_pThreadInfo[i].hThreadHandle, 0);
}
else
{
::Sleep(500);
}
}
} ::CloseHandle(m_hWorkerIO);
m_hWorkerIO = NULL; if (m_pThreadInfo)
{
delete [] m_pThreadInfo;
m_pThreadInfo = NULL;
} unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped; while (::GetQueuedCompletionStatus(m_hWorkerIO, &pN1, &pN2, &pOverLapped, 0)) //m_hManagerIO
{
CWork* pWork = reinterpret_cast<CWork*>(pN1);
CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2);
delete pWorkDesc;
}
::LeaveCriticalSection(&m_csLock);
}
unsigned __stdcall CThreadPool::ManagerProc(void* pThread)
{
unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped; CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); while (!pServer->m_bQuitManager)
{
if (::GetQueuedCompletionStatus(
pServer->m_hManagerIO,
&pN1,
&pN2,
&pOverLapped,
pServer->m_dwMSeconds)
== TRUE)
{
if (pOverLapped == (OVERLAPPED*)0xFFFFFFFFF)
{
break;
}
}
else
{
EThreadStatus stat = pServer->GetWorkThreadStatus();
if (stat == CThreadPool::BUSY)
{
puts("Add thread");
pServer->AddThread();
}
else if (stat == CThreadPool::IDLE)
{
puts("Del thread");
pServer->DelThread();
}
else
{}
}
}
_endthreadex(CThreadPool::MANAGERPROC_RETURN_VALUE); return CThreadPool::MANAGERPROC_RETURN_VALUE;
}
unsigned __stdcall CThreadPool::WorkerProc(void* pThread)
{
unsigned long pN1;
unsigned long pN2;
OVERLAPPED* pOverLapped;
CThreadPool* pServer = reinterpret_cast<CThreadPool*>(pThread); DWORD threadID = ::GetCurrentThreadId();
int nSeq = pServer->GetThreadbyID(threadID); if (nSeq < 0)
{
return 0;
} while (!pServer->m_pThreadInfo[nSeq].bIsQuit)
{
if (::GetQueuedCompletionStatus(
pServer->m_hWorkerIO,
&pN1,
&pN2,
&pOverLapped,
pServer->m_dwMSeconds))
{
CWork* pWork = reinterpret_cast<CWork*>(pN1);
CWorkDesc* pWorkDesc = reinterpret_cast<CWorkDesc*>(pN2); printf("do work. \n");
pServer->m_pThreadInfo[nSeq].bIsBusy = true; pWork->ThreadJob(pWorkDesc);
delete pWorkDesc; pServer->m_pThreadInfo[nSeq].bIsBusy = false;
printf("do work over. \n");
}
}
printf("worker thread down. \n"); pServer->m_pThreadInfo[nSeq].dwThreadID = 0; _endthreadex(CThreadPool::WORKERPROC_RETURN_VALUE); return CThreadPool::WORKERPROC_RETURN_VALUE;
}
CThreadPool::EThreadStatus CThreadPool::GetWorkThreadStatus(void)
{
float fAll = 0.0;
float fRun = 0.0;
for (WORD wi = 0; wi < m_wMaxThreadNum; ++wi)
{
if (m_pThreadInfo[wi].dwThreadID)
{
fAll++;
if (m_pThreadInfo[wi].bIsBusy)
{
fRun++;
}
}
} if (fAll == 0)
{
return CThreadPool::IDLE;
}
if (fRun / (1.0 * fAll) > 0.8)
{
return CThreadPool::BUSY;
}
if (fRun / (1.0 * fAll) < 0.2)
{
return CThreadPool::IDLE;
}
return CThreadPool::NORMAL;
}
侯捷书中关于完成端口使用步骤的讲述
1) 产生一个I/O completion port
2) 让它和一个文件handle产生关联
3) 产生一堆线程
4) 让每个线程都在completion port上等待
5) 开始对着那个文件handle发出一些overlapped I/O请求。
我们实现线程池的思想(我的设想):
1)产程一个管理线程,来负责线程池的增加,减少等工作。
2)产生几个线程,负责完成具体的任务,这几个线程由完成端口来调度。这几个线程加完成端口就构成“池”的概念,对吗?
3)线程具体的工作在类中不定义,具体使用线程池时再定义?怎么定义具体工作,或者说怎么用这个线程池我还不清楚。
线程池的问题困扰快一周了,急得不行(!!-_-!!)。
拜托了,各位。
我目前也在做类似的东西,可以采用系统提供的线程池,自己来写工作线程处理,线程池与完成端口类似,有一些共同的特性,线程池只要写好回调函数以及同步对象就可以了
我的情况是:使用完成端口模型作服务器,当GetQueuedCompletionStatus的时候
偶尔会出现
ERROR_CONNECTION_ABORTED 由本地系统终止网络连接
ERROR_SEM_TIMEOUT 信号灯超时时间已到。
样的错误,同时服务器能够接受连接但是不能write/read数据,除了线程被阻塞意外还有什么可能呢?
请各位大大帮忙提点提点 ..