bool CCommunication::NEAccept(void)
{
fd_set readfds; struct timeval RevTimeOut; // 超时时间
RevTimeOut.tv_sec = 1;
RevTimeOut.tv_usec = 0; while (!m_Exit)
{
FD_ZERO(&readfds);
FD_SET(m_NEUpdateListenSocket, &readfds);
FD_SET(m_NEManaListenSocket, &readfds); int nRet = select(0, &readfds, NULL, NULL, &RevTimeOut); if (0 == nRet) // 如果超时,继续这个循环
{
continue;
}
else if(nRet < 0)
{
printf("Select error, errorcode = %d\n", WSAGetLastError());
continue;
} SOCKET newsock = 0;
ULONG newip = 0;
struct sockaddr_in addr;
int len = sizeof(addr); // 检查ManageListensocket, 接受一个新socket,同时放入连接池
if (FD_ISSET(m_NEManaListenSocket, &readfds) != 0)
{
if ((newsock = WSAAccept(m_NEManaListenSocket, (struct sockaddr *)&addr, &len, NULL, 0)) != INVALID_SOCKET)
{
// 为新接受的连接socket分配内存,若分配失败,结束
if ((PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA))) == NULL)
{
printf("GlobalAlloc failed with error %d\n", GetLastError());
return;
} newip = ntohl(addr.sin_addr.S_un.S_addr); // 是否超过 server 允许同一个IP的最大client的连接数
if (m_NELinkPool.ExceedNumberOfSameIp(newip))
{
// 替换掉此IP的最晚的连接
SOCKET lastsocket = m_NELinkPool.GetLastSocket(newip);
closesocket(lastsocket);
m_NELinkPool.DeleteSocket(lastsocket);
m_BeapTable.DeleteSocket(lastsocket);
DoNEExceedNumberEvent(lastsocket, newip);
} // 增加连接
if (m_NELinkPool.IsNotFullAndAddSocket(newsock, newip))
{
SetSocketPara(newsock);
DoCreateNEMangeConnEvent(newsock, newip); // 增加连接成功后设置socket对应的PerHandleData,并将socket与完成端口绑定以便接收数据
PerHandleData->Socket = newsock;
PerHandleData->ip = newip; CreateIoCompletionPort((HANDLE)newsock, CompletionPort, (DWORD)PerHandleData, 0); if ((PerIOData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))) == NULL)
{
// 分配内存失败,则释放先前分配的内存,并关闭socket
closesocket(newsock);
m_NELinkPool.DeleteSocket(newsock);
m_BeapTable.DeleteSocket(newsock);
goto CHECK_UP_SOCKET;
} ZeroMemory(&(PerIOData->OVerlapped), sizeof(OVERLAPPED)); PerIOData->BytesRecv = 0;
PerIOData->BytesSend = 0;
PerIOData->DATABuf.len = DATA_BUFSIZE;
PerIOData->DATABuf.buf = PerIOData->Buffer; DWORD Flags = 0;
DWORD RecvBytes;
if (WSARecv(newsock, &(PerIOData->DATABuf), 1, &RecvBytes, &Flags, &(PerIOData->OVerlapped), NULL) == SOCKET_ERROR)
{
// socket出错时,将其关闭,并从连接队列中删除
// 这时产生完成通知,由工作线程负责释放为其分配的PerHandleData和PerIOData
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed, Error_Code = %d\n", WSAGetLastError());
closesocket(newsock);
m_NELinkPool.DeleteSocket(newsock);
m_BeapTable.DeleteSocket(newsock);
}
} }
else
{
DoNEAcceptFullEvent(newsock, newip);
closesocket(newsock);
GlobalFree(PerHandleData);
}
}
}//while
return true;
}下面是工作线程处理:
DWORD WINAPI CCommunication::ServerWorkerThread(LPVOID lpParam)
{
// 提取传递过来的参数,包括this指针和完成端口
PMY_PARA_STRU my_para;
my_para = (PMY_PARA_STRU)lpParam; CCommunication* pthis = reinterpret_cast<CCommunication*>(my_para->pointer);
HANDLE CompletionPort = my_para->CompletionPortID; unsigned long BytesTransferred = 9999;
unsigned long RecvBytes;
unsigned long Flags; time_t CheckLinkTime = 0;
time_t KeepAliveTime = 0;
time_t WriteLogTime = time(0); // 定义对应的单句柄数据指针
LPPER_HANDLE_DATA PerHandleData = NULL;
LPPER_IO_OPERATION_DATA PerIOData = NULL; int actlen = 0; // 接收长度
SOCKET CurSock = 0; // 当前的socket
int proxy = NO_PROXY; // 代理标志
SockPacket CurSockInfo; // 当前的socket所有信息
char buf[4096]; while(1)
{
time_t NowTime = time(0);
BOOL ret; ret = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIOData, 1000); int error_code = WSAGetLastError(); // 超时,则继续等待
if (ret == 0 && PerIOData == NULL)
{
if (error_code == WAIT_TIMEOUT)
{
continue;
}
} // 获得保存的socket相关信息,包括socket、ip、上次收到数据时间以及未处理的数据
CurSock = PerHandleData->Socket;
pthis->m_NELinkPool.GetSocketInfo(CurSock, &CurSockInfo);
// 首先检查套接字上是否发生错误,如果发生了则关闭套接字并清楚同套接字相关的SOCKET_INFORATION结构体
if (BytesTransferred == 0 && (PerIOData->OperationType == RECV_POSTED || PerIOData->OperationType == SEND_POSTED))
{
printf("Closing Socket %d, code = %d\n", PerHandleData->Socket, WSAGetLastError());
closesocket(PerHandleData->Socket);
pthis->m_NELinkPool.DeleteSocket(PerHandleData->Socket);
pthis->m_BeapTable.DeleteSocket(PerHandleData->Socket); GlobalFree(PerHandleData);
GlobalFree(PerIOData); continue;
} // 接收到数据时
if (PerIOData->OperationType == RECV_POSTED)
{
actlen = BytesTransferred;
memset(buf, 0, sizeof(buf));
strncpy(buf, PerIOData->Buffer, actlen);
pthis->m_NELinkPool.SetCurTime(CurSock, time(NULL));
//pthis->DoNERecvDataEvent(CurSock, CurSockInfo.ip, buf, actlen); // 获取完整包
proxy = NO_PROXY;
string pack = "";
BEAP_FACTORY factoryVer = UNKNOWN;
// 这里的GetFirstPacket和GetNextPacket内处理了报文粘连
for (bool b=pthis->m_NELinkPool.GetFirstPacket(CurSock, buf, actlen, pack, &proxy, &factoryVer);
b;
b=pthis->m_NELinkPool.GetNextPacket(CurSock, pack, &proxy, &factoryVer))
{
// 注意:curSockInfo中laststr少了刚刚接收的信息, 如果以下要使用laststr字段,需修改代码
pthis->DealNEEvent(pack.c_str(), static_cast<int>(pack.size()), proxy, CurSockInfo, factoryVer);
}
} Flags = 0;
ZeroMemory(&(PerIOData->OVerlapped), sizeof(OVERLAPPED)); PerIOData->DATABuf.buf = PerIOData->Buffer;
PerIOData->DATABuf.len = DATA_BUFSIZE; if (WSARecv(PerHandleData->Socket, &(PerIOData->DATABuf), 1, &RecvBytes, &Flags, &(PerIOData->OVerlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError()); closesocket(PerHandleData->Socket);
pthis->m_NELinkPool.DeleteSocket(PerHandleData->Socket);
pthis->m_BeapTable.DeleteSocket(PerHandleData->Socket);
}
}
}
return 1;
}
另外还有一个线程用来判断socket是否超时,我测试过,当socket超时时我直接调用closesocket,这时GetQueuedCompletionStatus就会返回,然后释放
为这个socket分配的单句柄数据和单IO数据。我不明白的是GetQueuedCompletionStatus都在哪些情况下会接收到完成通知消息呢?
另外,请僵哥帮忙看一下我这个程序处理有没有可能发生丢包现象呢?
http://topic.csdn.net/u/20081211/10/31dba266-533b-4758-b0c6-5519d5c04000.html
“789abcdefghijklmn”呢?
发送端可以加一个延迟,如sleep(1),发送频率高的间隔时间要稍微长一些
在每次调用WSARecv()之后将收到的内容打印出来,发现粘连很严重,但是总条数的确
没有少,粘连的情况我已经在for (bool b=pthis->m_NELinkPool.GetFirstPacket(CurSock, buf, actlen, pack, &proxy, &factoryVer);
b;
b=pthis->m_NELinkPool.GetNextPacket(CurSock, pack, &proxy, &factoryVer))
{
// 注意:curSockInfo中laststr少了刚刚接收的信息, 如果以下要使用laststr字段,需修改代码
pthis->DealNEEvent(pack.c_str(), static_cast<int>(pack.size()), proxy, CurSockInfo, factoryVer);
}处理过了的。
GetQueuedCompletionStatus()函数得到完成通知呢?