// IOCPThreadPool.cpp: implementation of the CIOCPThreadPool class.
//
//////////////////////////////////////////////////////////////////////#include "stdafx.h"
#include "PortMap1_0.h"
#include "IOCPThreadPool.h"
#include "ADOConn.h"
#include <vector>
#include <algorithm>
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////CIOCPThreadPool::CIOCPThreadPool()
{
m_dwMSeconds=200;
m_EndServer=false;
m_pHThread=new HANDLE[MAX_THREAD_COUNT];
for (WORD wIndex=0; wIndex<MAX_THREAD_COUNT; ++wIndex)
{
m_pHThread[wIndex]=INVALID_HANDLE_VALUE;
}
m_hWorkIO=NULL;
m_pPerSockContextList=NULL;
m_wMaxThreadCount=MAX_THREAD_COUNT;
m_wThreadCount=DefaultThreadCount();
::InitializeCriticalSection(&m_ListCriticalSection);
}CIOCPThreadPool::CIOCPThreadPool(WORD v_wMaxThreadCount)
{
m_dwMSeconds=200;
m_EndServer=false;
m_pHThread=new HANDLE[v_wMaxThreadCount];
for (WORD wIndex=0; wIndex<v_wMaxThreadCount; ++wIndex)
{
m_pHThread[wIndex]=INVALID_HANDLE_VALUE;
}
m_hWorkIO=NULL;
m_pPerSockContextList=NULL;
m_wMaxThreadCount=v_wMaxThreadCount;
m_wThreadCount=DefaultThreadCount();
::InitializeCriticalSection(&m_ListCriticalSection);
}CIOCPThreadPool::~CIOCPThreadPool()
{
if (NULL!=m_pHThread)
{
delete [] m_pHThread;
m_pHThread=NULL;
} if (NULL!=m_hWorkIO)
{
CloseHandle(m_hWorkIO);
m_hWorkIO=NULL;
} m_EndServer=true; if (NULL!=m_pPerSockContextList)
{
FreeSocketContextList();
} ::DeleteCriticalSection(&m_ListCriticalSection);
}void CIOCPThreadPool::FreeSocketContextList()
{
// ::EnterCriticalSection(&m_ListCriticalSection);
PPER_SOCKET_CONTEXT pTemp1, pTemp2;
pTemp1=m_pPerSockContextList;
while (NULL!=pTemp1)
{
pTemp2=pTemp1->pSocketContextForward;
FreeAndCloseSocket(pTemp1);
pTemp1=pTemp2;
}
m_pPerSockContextList=NULL;
// ::LeaveCriticalSection(&m_ListCriticalSection);
}
//需要修改
void CIOCPThreadPool::FreeAndCloseSocket(PPER_SOCKET_CONTEXT &v_pPerSocketContext)
{
::EnterCriticalSection(&m_ListCriticalSection);
if (NULL==v_pPerSocketContext)
{
return;
} PPER_IO_CONTEXT pPerIoContext;
pPerIoContext=v_pPerSocketContext->pPerIoContext;
if (NULL!=pPerIoContext)
{
// while (!HasOverlappedIoCompleted(&(pPerIoContext->OverLapped)))
// {
// Sleep(100);
// }
//
if (INVALID_SOCKET!=v_pPerSocketContext->socketAccept)
{
closesocket(v_pPerSocketContext->socketAccept);
v_pPerSocketContext->socketAccept=INVALID_SOCKET;
}
if (INVALID_SOCKET!=v_pPerSocketContext->pPerIoContext->SocketServer)
{
closesocket(v_pPerSocketContext->pPerIoContext->SocketServer);
v_pPerSocketContext->pPerIoContext->SocketServer=INVALID_SOCKET;
} free(pPerIoContext);
v_pPerSocketContext->pPerIoContext=NULL;
if (NULL!=v_pPerSocketContext->pSocketContextBack)
{
v_pPerSocketContext->pSocketContextBack->pSocketContextForward=v_pPerSocketContext->pSocketContextForward;
}
if (NULL!=v_pPerSocketContext->pSocketContextForward)
{
v_pPerSocketContext->pSocketContextForward->pSocketContextBack=v_pPerSocketContext->pSocketContextBack;
} free(v_pPerSocketContext);
v_pPerSocketContext=NULL;
} ::LeaveCriticalSection(&m_ListCriticalSection);
}
WORD CIOCPThreadPool::DefaultThreadCount()
{
SYSTEM_INFO systemInfo;
GetSystemInfo(&systemInfo);
return systemInfo.dwNumberOfProcessors*2;
}BOOL CIOCPThreadPool::InitWinSock()
{
WSAData wsaData;
if (WSAStartup(MAKEWORD(2,2),&wsaData)==SOCKET_ERROR)
{
return false;
}
return TRUE;
}BOOL CIOCPThreadPool::Start()
{
DWORD dwThreadID;
HANDLE hThread=NULL;
HWND wnd;
InitWinSock();
LoadMap();
m_hWorkIO=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
if (NULL==m_hWorkIO)
{
Stop();
return false;
}
for (WORD wIndex=0; wIndex<m_wThreadCount; wIndex++)
{
hThread=CreateThread(NULL,0,WorkProc,this,0,&dwThreadID);
if (NULL==hThread)
{
Stop();
return false;
}
m_pHThread[wIndex]=hThread;
} hThread=CreateThread(NULL,0,ManagerProc,this,0,&dwThreadID);
if (NULL==hThread)
{
Stop();
return false;
}
CloseHandle(hThread);
return true;
}//Stop有问题,
void CIOCPThreadPool::Stop()
{
if (NULL!=m_hWorkIO)
{
for (WORD wIndex=0; wIndex<m_wThreadCount; ++wIndex)
{
PostQueuedCompletionStatus(m_hWorkIO,0,0,NULL);
}
} if (WAIT_OBJECT_0!=WaitForMultipleObjects(m_wThreadCount,m_pHThread,true,1000))
{
DWORD dwExitCode;
for (WORD wIndex=0; wIndex<m_wThreadCount; ++wIndex)
{
if (m_pHThread[wIndex])
{
TerminateThread(m_pHThread[wIndex],dwExitCode);
}
}
}
WSACleanup();
}BOOL CIOCPThreadPool::CreateAllListenSocket()
{
map<u_short, DEST_ADDR>::const_iterator it=m_Map.begin();
map<u_short, DEST_ADDR>::const_iterator end=m_Map.end();
SOCKET socket;
while (it!=end)
{
socket=CreateListenSocket(it->first);
if (INVALID_SOCKET!=socket)
{
m_ListenSocketAll.push_back(socket);
m_MapInfo.insert(make_pair(socket,it->second));
}
++it;
}
return true;
}
//
//////////////////////////////////////////////////////////////////////#include "stdafx.h"
#include "PortMap1_0.h"
#include "IOCPThreadPool.h"
#include "ADOConn.h"
#include <vector>
#include <algorithm>
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////CIOCPThreadPool::CIOCPThreadPool()
{
m_dwMSeconds=200;
m_EndServer=false;
m_pHThread=new HANDLE[MAX_THREAD_COUNT];
for (WORD wIndex=0; wIndex<MAX_THREAD_COUNT; ++wIndex)
{
m_pHThread[wIndex]=INVALID_HANDLE_VALUE;
}
m_hWorkIO=NULL;
m_pPerSockContextList=NULL;
m_wMaxThreadCount=MAX_THREAD_COUNT;
m_wThreadCount=DefaultThreadCount();
::InitializeCriticalSection(&m_ListCriticalSection);
}CIOCPThreadPool::CIOCPThreadPool(WORD v_wMaxThreadCount)
{
m_dwMSeconds=200;
m_EndServer=false;
m_pHThread=new HANDLE[v_wMaxThreadCount];
for (WORD wIndex=0; wIndex<v_wMaxThreadCount; ++wIndex)
{
m_pHThread[wIndex]=INVALID_HANDLE_VALUE;
}
m_hWorkIO=NULL;
m_pPerSockContextList=NULL;
m_wMaxThreadCount=v_wMaxThreadCount;
m_wThreadCount=DefaultThreadCount();
::InitializeCriticalSection(&m_ListCriticalSection);
}CIOCPThreadPool::~CIOCPThreadPool()
{
if (NULL!=m_pHThread)
{
delete [] m_pHThread;
m_pHThread=NULL;
} if (NULL!=m_hWorkIO)
{
CloseHandle(m_hWorkIO);
m_hWorkIO=NULL;
} m_EndServer=true; if (NULL!=m_pPerSockContextList)
{
FreeSocketContextList();
} ::DeleteCriticalSection(&m_ListCriticalSection);
}void CIOCPThreadPool::FreeSocketContextList()
{
// ::EnterCriticalSection(&m_ListCriticalSection);
PPER_SOCKET_CONTEXT pTemp1, pTemp2;
pTemp1=m_pPerSockContextList;
while (NULL!=pTemp1)
{
pTemp2=pTemp1->pSocketContextForward;
FreeAndCloseSocket(pTemp1);
pTemp1=pTemp2;
}
m_pPerSockContextList=NULL;
// ::LeaveCriticalSection(&m_ListCriticalSection);
}
//需要修改
void CIOCPThreadPool::FreeAndCloseSocket(PPER_SOCKET_CONTEXT &v_pPerSocketContext)
{
::EnterCriticalSection(&m_ListCriticalSection);
if (NULL==v_pPerSocketContext)
{
return;
} PPER_IO_CONTEXT pPerIoContext;
pPerIoContext=v_pPerSocketContext->pPerIoContext;
if (NULL!=pPerIoContext)
{
// while (!HasOverlappedIoCompleted(&(pPerIoContext->OverLapped)))
// {
// Sleep(100);
// }
//
if (INVALID_SOCKET!=v_pPerSocketContext->socketAccept)
{
closesocket(v_pPerSocketContext->socketAccept);
v_pPerSocketContext->socketAccept=INVALID_SOCKET;
}
if (INVALID_SOCKET!=v_pPerSocketContext->pPerIoContext->SocketServer)
{
closesocket(v_pPerSocketContext->pPerIoContext->SocketServer);
v_pPerSocketContext->pPerIoContext->SocketServer=INVALID_SOCKET;
} free(pPerIoContext);
v_pPerSocketContext->pPerIoContext=NULL;
if (NULL!=v_pPerSocketContext->pSocketContextBack)
{
v_pPerSocketContext->pSocketContextBack->pSocketContextForward=v_pPerSocketContext->pSocketContextForward;
}
if (NULL!=v_pPerSocketContext->pSocketContextForward)
{
v_pPerSocketContext->pSocketContextForward->pSocketContextBack=v_pPerSocketContext->pSocketContextBack;
} free(v_pPerSocketContext);
v_pPerSocketContext=NULL;
} ::LeaveCriticalSection(&m_ListCriticalSection);
}
WORD CIOCPThreadPool::DefaultThreadCount()
{
SYSTEM_INFO systemInfo;
GetSystemInfo(&systemInfo);
return systemInfo.dwNumberOfProcessors*2;
}BOOL CIOCPThreadPool::InitWinSock()
{
WSAData wsaData;
if (WSAStartup(MAKEWORD(2,2),&wsaData)==SOCKET_ERROR)
{
return false;
}
return TRUE;
}BOOL CIOCPThreadPool::Start()
{
DWORD dwThreadID;
HANDLE hThread=NULL;
HWND wnd;
InitWinSock();
LoadMap();
m_hWorkIO=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
if (NULL==m_hWorkIO)
{
Stop();
return false;
}
for (WORD wIndex=0; wIndex<m_wThreadCount; wIndex++)
{
hThread=CreateThread(NULL,0,WorkProc,this,0,&dwThreadID);
if (NULL==hThread)
{
Stop();
return false;
}
m_pHThread[wIndex]=hThread;
} hThread=CreateThread(NULL,0,ManagerProc,this,0,&dwThreadID);
if (NULL==hThread)
{
Stop();
return false;
}
CloseHandle(hThread);
return true;
}//Stop有问题,
void CIOCPThreadPool::Stop()
{
if (NULL!=m_hWorkIO)
{
for (WORD wIndex=0; wIndex<m_wThreadCount; ++wIndex)
{
PostQueuedCompletionStatus(m_hWorkIO,0,0,NULL);
}
} if (WAIT_OBJECT_0!=WaitForMultipleObjects(m_wThreadCount,m_pHThread,true,1000))
{
DWORD dwExitCode;
for (WORD wIndex=0; wIndex<m_wThreadCount; ++wIndex)
{
if (m_pHThread[wIndex])
{
TerminateThread(m_pHThread[wIndex],dwExitCode);
}
}
}
WSACleanup();
}BOOL CIOCPThreadPool::CreateAllListenSocket()
{
map<u_short, DEST_ADDR>::const_iterator it=m_Map.begin();
map<u_short, DEST_ADDR>::const_iterator end=m_Map.end();
SOCKET socket;
while (it!=end)
{
socket=CreateListenSocket(it->first);
if (INVALID_SOCKET!=socket)
{
m_ListenSocketAll.push_back(socket);
m_MapInfo.insert(make_pair(socket,it->second));
}
++it;
}
return true;
}
解决方案 »
- CHttpFile 读取网页内容问题
- vc编译出来的release文件改了名字后不能执行了,怎么回事
- VC 画大小可变的矩形
- 如何在对话框指定位置增加菜单(动态加载,非load资源的)********
- 一个结构体使用的问题。该怎么解决?
- 在VC中用ADO操作数据库,执行recordset.open时间超长,在pl/sql里执行同一语句很快,什么原因?
- 怎么做一个程序在windows启动前运行
- 对灰度不同的地方进行定位
- 不会吧, 这么简单的问题没人回答??60分白送了!!!!!!!!!!
- 如何才能获取一个线程窗口的句柄呢?
- 一个long int64类型的数,用转换为十六进制String,如何再将字符串转换回long int64类型呢
- MFC类与WinMain()函数是如何关联在一起的?
{
SOCKET socketListen;
SOCKADDR_IN sockAddrLocal;
int nRet;
int nZero=0;
LINGER linger;
linger.l_linger=0;
linger.l_onoff=1;
socketListen=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET==socketListen)
{
return INVALID_SOCKET;
}
/* WSAAsyncSelect(socketListen, m_hWnd, WM_SOCKET, FD_ACCEPT | FD_CLOSE); */ sockAddrLocal.sin_family=AF_INET;
sockAddrLocal.sin_port=htons(v_port);
sockAddrLocal.sin_addr.s_addr=htonl(INADDR_ANY);
nRet=bind(socketListen,(PSOCKADDR)&sockAddrLocal,sizeof(sockAddrLocal));
if (SOCKET_ERROR==nRet)
{
if (INVALID_SOCKET!=socketListen)
{
closesocket(socketListen);
socketListen=INVALID_SOCKET;
}
return INVALID_SOCKET;
}
nRet=listen(socketListen,5);
if (SOCKET_ERROR==nRet)
{
if (INVALID_SOCKET!=socketListen)
{
closesocket(socketListen);
socketListen=INVALID_SOCKET;
}
return INVALID_SOCKET;
}
nRet=setsockopt(socketListen,SOL_SOCKET,SO_SNDBUF,(char *)&nZero,sizeof(nZero));
if (SOCKET_ERROR==nRet)
{
if (INVALID_SOCKET!=socketListen)
{
closesocket(socketListen);
socketListen=INVALID_SOCKET;
}
return INVALID_SOCKET;
}
nRet=setsockopt(socketListen,SOL_SOCKET,SO_RCVBUF,(char *)&nZero,sizeof(nZero));
if (SOCKET_ERROR==nRet)
{
if (INVALID_SOCKET!=socketListen)
{
closesocket(socketListen);
socketListen=INVALID_SOCKET;
}
return INVALID_SOCKET;
}
nRet=setsockopt(socketListen,SOL_SOCKET,SO_LINGER,(char *)&linger,sizeof(linger));
if (SOCKET_ERROR==nRet)
{
if (INVALID_SOCKET!=socketListen)
{
closesocket(socketListen);
socketListen=INVALID_SOCKET;
}
return INVALID_SOCKET;
} WSAAsyncSelect(socketListen, m_hWnd, WM_SOCKET, FD_ACCEPT);
return socketListen;
}SOCKET CIOCPThreadPool::CreateServerSocket(char *v_IP, u_short v_Port)
{
SOCKET socketServer;
int nRet;
SOCKADDR_IN sockAddr;
socketServer=WSASocket(AF_INET,SOCK_STREAM,IPPROTO_IP,NULL,0,WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET==socketServer)
{
return INVALID_SOCKET;
}
sockAddr.sin_family=AF_INET;
sockAddr.sin_addr.S_un.S_addr=inet_addr(v_IP);
sockAddr.sin_port=htons(v_Port); nRet=connect(socketServer,(struct sockaddr*)&sockAddr,sizeof(sockAddr)); if (SOCKET_ERROR==nRet)
{
int nRet=WSAGetLastError();
return INVALID_SOCKET;
}
// WSAAsyncSelect(socketServer, m_hWnd, WM_SOCKET, FD_READ | FD_CLOSE);
return socketServer;
}PPER_SOCKET_CONTEXT CIOCPThreadPool::SocketConTextAllocate(SOCKET v_socketAccept, SOCKET v_socketServer, IO_OPERATION v_IoOperation)
{
EnterCriticalSection(&m_ListCriticalSection);
PPER_SOCKET_CONTEXT pPerSocketContext;
pPerSocketContext=(PPER_SOCKET_CONTEXT)malloc(sizeof(PER_SOCKET_CONTEXT));
if (NULL==pPerSocketContext)
{
return NULL;
} pPerSocketContext->pPerIoContext=(PPER_IO_CONTEXT)malloc(sizeof(PER_IO_CONTEXT));;
if (NULL==pPerSocketContext->pPerIoContext)
{
free(pPerSocketContext);
pPerSocketContext=NULL;
return NULL;
} pPerSocketContext->socketAccept=v_socketAccept;
pPerSocketContext->pSocketContextForward=NULL;
pPerSocketContext->pSocketContextBack=NULL;
pPerSocketContext->pPerIoContext->IoOperation=v_IoOperation;
pPerSocketContext->pPerIoContext->SocketServer=v_socketServer;
pPerSocketContext->pPerIoContext->nSendBytes=0;
pPerSocketContext->pPerIoContext->nTotalBytes=0;
pPerSocketContext->pPerIoContext->OverLapped.hEvent=NULL;
pPerSocketContext->pPerIoContext->OverLapped.Internal=0;
pPerSocketContext->pPerIoContext->OverLapped.InternalHigh=0;
pPerSocketContext->pPerIoContext->OverLapped.Offset=0;
pPerSocketContext->pPerIoContext->OverLapped.OffsetHigh=0;
pPerSocketContext->pPerIoContext->wsabuf.buf=pPerSocketContext->pPerIoContext->Buffer;
pPerSocketContext->pPerIoContext->wsabuf.len=sizeof(pPerSocketContext->pPerIoContext->Buffer);
ZeroMemory(pPerSocketContext->pPerIoContext->Buffer,sizeof(pPerSocketContext->pPerIoContext->Buffer)); LeaveCriticalSection(&m_ListCriticalSection);
return pPerSocketContext;
}void CIOCPThreadPool::AddToContextList(PPER_SOCKET_CONTEXT v_pPerSocketContext)
{
EnterCriticalSection(&m_ListCriticalSection);
PPER_SOCKET_CONTEXT pTemp=NULL;
if (NULL==m_pPerSockContextList)
{
m_pPerSockContextList=v_pPerSocketContext;
return;
}
pTemp=m_pPerSockContextList;
pTemp->pSocketContextBack=v_pPerSocketContext;
v_pPerSocketContext->pSocketContextForward=pTemp;
m_pPerSockContextList=v_pPerSocketContext;
LeaveCriticalSection(&m_ListCriticalSection); return;
}PPER_SOCKET_CONTEXT CIOCPThreadPool::UpdateCompletionPort(SOCKET v_SocketAccept, SOCKET v_SocketServer, IO_OPERATION v_IoOperation, BOOL v_bAddToList)
{
PPER_SOCKET_CONTEXT pPerSocketContext;
pPerSocketContext=SocketConTextAllocate(v_SocketAccept, v_SocketServer,v_IoOperation);
if (NULL==pPerSocketContext)
{
return NULL;
}
if (NULL==CreateIoCompletionPort(HANDLE(v_SocketAccept),m_hWorkIO, (DWORD)pPerSocketContext,0))
{
FreeAndCloseSocket(pPerSocketContext);
return NULL;
} if (v_bAddToList)
{
AddToContextList(pPerSocketContext);
} return pPerSocketContext;
}DWORD _stdcall ManagerProc(LPVOID v_lpvoid)
{
CIOCPThreadPool *pThis=(CIOCPThreadPool *)v_lpvoid;
int nRet;
BOOL bSuccess; bSuccess=pThis->CreateAllListenSocket();
if (!bSuccess)
{
pThis->Stop();
return -1;
} return 0;
}
{
CIOCPThreadPool *pThis=(CIOCPThreadPool*)v_lpvoid;
HANDLE hIOCP=pThis->m_hWorkIO;
BOOL bSuccess;
PPER_SOCKET_CONTEXT pPerSocketContext=NULL;
PPER_IO_CONTEXT pPerIoContext=NULL;
LPOVERLAPPED lpOverlapped;
DWORD dwIOBytes=0;
DWORD dwFlag=0;
DWORD dwNumOfBytesSend=0;
DWORD dwRecvNumBytes;
int nRet=0;
HANDLE hRet;
while (TRUE)
{
bSuccess=GetQueuedCompletionStatus(hIOCP,&dwIOBytes,(DWORD*)&pPerSocketContext,&lpOverlapped,INFINITE);
if (NULL==pPerSocketContext)
{
return 0;
} if ((!bSuccess) || (bSuccess && 0==dwIOBytes))
{
pThis->FreeAndCloseSocket(pPerSocketContext);
continue;
}
pPerIoContext=(PPER_IO_CONTEXT)lpOverlapped;
switch(pPerIoContext->IoOperation)
{
case IoRead:
{
pPerIoContext->IoOperation=IoWrite;
pPerIoContext->nTotalBytes=dwIOBytes;
pPerIoContext->nSendBytes=0;
pPerIoContext->wsabuf.len=dwIOBytes;
dwFlag=0; nRet=WSASend(pPerSocketContext->pPerIoContext->SocketServer,&(pPerSocketContext->pPerIoContext->wsabuf),1 \
,&dwNumOfBytesSend,dwFlag,&(pPerSocketContext->pPerIoContext->OverLapped),NULL);
if ((SOCKET_ERROR==nRet) && (ERROR_IO_PENDING!=WSAGetLastError()))
{
pThis->FreeAndCloseSocket(pPerSocketContext);
break; }
break;
}
case IoWrite:
{
dwFlag=0;
pPerIoContext->nSendBytes+=dwIOBytes;
pPerIoContext->IoOperation=IoWrite; if (pPerIoContext->nSendBytes<pPerIoContext->nTotalBytes)
{
pPerIoContext->wsabuf.buf=pPerIoContext->wsabuf.buf+dwIOBytes;
pPerIoContext->wsabuf.len=pPerIoContext->wsabuf.len-dwIOBytes;
nRet=WSASend(pPerIoContext->SocketServer,&(pPerIoContext->wsabuf),1 \
,&dwNumOfBytesSend,dwFlag,&(pPerIoContext->OverLapped),NULL);
if ((SOCKET_ERROR==nRet) && (ERROR_IO_PENDING!=WSAGetLastError()))
{
pThis->FreeAndCloseSocket(pPerSocketContext);
}
}
else
{
dwFlag=0;
dwRecvNumBytes=0; pPerSocketContext->pPerIoContext->IoOperation=IoRead;
pPerSocketContext->pPerIoContext->nSendBytes=0;
pPerSocketContext->pPerIoContext->nTotalBytes=0;
pPerSocketContext->pPerIoContext->OverLapped.hEvent=NULL;
pPerSocketContext->pPerIoContext->OverLapped.Internal=0;
pPerSocketContext->pPerIoContext->OverLapped.InternalHigh=0;
pPerSocketContext->pPerIoContext->OverLapped.Offset=0;
pPerSocketContext->pPerIoContext->OverLapped.OffsetHigh=0;
pPerSocketContext->pPerIoContext->wsabuf.len=sizeof(pPerSocketContext->pPerIoContext->Buffer);
ZeroMemory(pPerSocketContext->pPerIoContext->Buffer,sizeof(pPerSocketContext->pPerIoContext->Buffer)); nRet=WSARecv(pPerSocketContext->socketAccept,&(pPerSocketContext->pPerIoContext->wsabuf),1,&dwRecvNumBytes, \
&dwFlag,&(pPerSocketContext->pPerIoContext->OverLapped),NULL);
if ((SOCKET_ERROR==nRet) && (ERROR_IO_PENDING!=WSAGetLastError()))
{
pThis->FreeAndCloseSocket(pPerSocketContext);
break;
}
}
break; }
default:
break; }
}
return 0;
}void CIOCPThreadPool::LoadMap()
{
try
{
ADOConn adoconn;
CString strSql;
strSql="select LocalPort,DestIP,DestPort from map";
adoconn.GetRecordset(strSql);
while(!adoconn.m_pRecordset->adoEOF)
{
_variant_t LocalPort,DestIP,DestPort;
LocalPort=adoconn.m_pRecordset->GetCollect("LocalPort");
DestIP=adoconn.m_pRecordset->GetCollect("DestIP");
DestPort=adoconn.m_pRecordset->GetCollect("DestPort");
if ((VT_NULL!=LocalPort.vt) && (VT_NULL!=DestIP.vt) && (VT_NULL!=DestPort.vt))
{
_DEST_ADDR destAddr;
strcpy(destAddr.DestIP,(char*)(_bstr_t)DestIP);
destAddr.DestPort=DestPort.intVal;
m_Map.insert(make_pair(LocalPort.intVal,destAddr));
}
adoconn.m_pRecordset->MoveNext();
}
}catch(_com_error &e)
{
AfxMessageBox(e.Description());
}
}PPER_SOCKET_CONTEXT CIOCPThreadPool::GetSocketInfo(SOCKET v_Socket)
{
PPER_SOCKET_CONTEXT pPerSocketContext=m_pPerSockContextList;
while (pPerSocketContext)
{
if (v_Socket==pPerSocketContext->socketAccept)
{
return pPerSocketContext;
}
pPerSocketContext=pPerSocketContext->pSocketContextForward;
}
return NULL;
}
{
SOCKET socketAccept;
SOCKET socketServer;
PPER_SOCKET_CONTEXT pPerSocketContextClient;
PPER_SOCKET_CONTEXT pPerSocketContextServer;
int nRet=0;
int nRet2=0;
DWORD dwFlag=0;
DWORD dwFlag2=0;
DWORD dwRecvNumBytes=0;
DWORD dwRecvNumBytes2=0; if (uMsg == WM_SOCKET)
{
if (WSAGETSELECTERROR(lParam))
{
return 0;
}
else
{
switch(WSAGETSELECTEVENT(lParam))
{
case FD_ACCEPT:
{
if ((socketAccept=WSAAccept(wParam,NULL,NULL,NULL,0)) == INVALID_SOCKET)
{
break;
}
socketServer=g_ThreadPool.CreateServerSocket( \
g_ThreadPool.m_MapInfo[wParam].DestIP, g_ThreadPool.m_MapInfo[wParam].DestPort);
if (INVALID_SOCKET==socketServer)
{ closesocket(socketAccept);
socketAccept=INVALID_SOCKET;
break;
} pPerSocketContextClient=g_ThreadPool.UpdateCompletionPort(socketAccept,socketServer,IoRead,true);
if (NULL==pPerSocketContextClient)
{
break;
} pPerSocketContextServer=g_ThreadPool.UpdateCompletionPort(socketServer,socketAccept,IoRead,true);
if (NULL==pPerSocketContextServer)
{
break;
}
nRet=WSARecv(socketAccept,&(pPerSocketContextClient->pPerIoContext->wsabuf),1,&dwRecvNumBytes, \
&dwFlag,&(pPerSocketContextClient->pPerIoContext->OverLapped),NULL);
if ((SOCKET_ERROR==nRet) && (ERROR_IO_PENDING!=WSAGetLastError()))
{
g_ThreadPool.FreeAndCloseSocket(pPerSocketContextClient);
break;
} nRet2=WSARecv(socketServer,&(pPerSocketContextServer->pPerIoContext->wsabuf),1,&dwRecvNumBytes2, \
&dwFlag2,&(pPerSocketContextServer->pPerIoContext->OverLapped),NULL);
if ((SOCKET_ERROR==nRet2) && (ERROR_IO_PENDING!=WSAGetLastError()))
{
g_ThreadPool.FreeAndCloseSocket(pPerSocketContextServer);
break;
}
break;
}
// case FD_READ:
// {
// PPER_SOCKET_CONTEXT pPerSocketContext;
// pPerSocketContext=g_ThreadPool.GetSocketInfo(wParam);
// if (NULL==pPerSocketContext)
// {
// break;
// }
// if (0!=pPerSocketContext->pPerIoContext->nTotalBytes)
// {
// break;
// }
// dwFlag=0;
// nRet=WSARecv(socketAccept,&(pPerSocketContext->pPerIoContext->wsabuf),1,&dwRecvNumBytes, \
// &dwFlag,&(pPerSocketContext->pPerIoContext->OverLapped),NULL);
// if ((SOCKET_ERROR==nRet) && (ERROR_IO_PENDING!=WSAGetLastError()))
// {
// g_ThreadPool.FreeAndCloseSocket(pPerSocketContext);
// break;
// }
// break;
// }
// case FD_CLOSE:
// {
// PPER_SOCKET_CONTEXT pPerSocketContext;
// SOCKET socket;
// pPerSocketContext=g_ThreadPool.GetSocketInfo(wParam);
// if (NULL==pPerSocketContext)
// {
// break;
// }
// socket=pPerSocketContext->pPerIoContext->SocketServer;
// g_ThreadPool.FreeAndCloseSocket(pPerSocketContext);
// pPerSocketContext=g_ThreadPool.GetSocketInfo(socket);
// if (NULL==pPerSocketContext)
// {
// break;
// }
// g_ThreadPool.FreeAndCloseSocket(pPerSocketContext);
// break;
// }
default:
break;
}
}
return 0;
}
return DefWindowProc(hwnd, uMsg, wParam, lParam);
}HWND MakeWorkerWindow()
{
WNDCLASS wndclass;
CHAR *ProviderClass = "AsyncSelect";
HWND Window;
wndclass.style = CS_HREDRAW | CS_VREDRAW;
wndclass.lpfnWndProc = (WNDPROC)WindowProc;
wndclass.cbClsExtra = 0;
wndclass.cbWndExtra = 0;
wndclass.hInstance = NULL;
wndclass.hIcon = LoadIcon(NULL, IDI_APPLICATION);
wndclass.hCursor = LoadCursor(NULL, IDC_ARROW);
wndclass.hbrBackground = (HBRUSH) GetStockObject(WHITE_BRUSH);
wndclass.lpszMenuName = NULL;
wndclass.lpszClassName = ProviderClass;
if (RegisterClass(&wndclass) == 0)
{
printf("RegisterClass() failed with error %d\n", GetLastError());
return NULL;
}
// Create a window.
if ((Window = CreateWindow(
ProviderClass,
"",
WS_OVERLAPPEDWINDOW,
CW_USEDEFAULT,
CW_USEDEFAULT,
CW_USEDEFAULT,
CW_USEDEFAULT,
NULL,
NULL,
NULL,
NULL)) == NULL)
{
return NULL;
}
return Window;
}唉,太乱了,欢迎加我QQ要源码
源码我传到csdn了,欢迎下载,不吝赐教,谢谢!
谢谢,希望你能提出更多的问题