bool CCommunication::NEAccept(void)
{
fd_set  readfds; struct timeval RevTimeOut;           // 超时时间
RevTimeOut.tv_sec = 1;             
RevTimeOut.tv_usec = 0; while (!m_Exit)
{
FD_ZERO(&readfds);            
FD_SET(m_NEUpdateListenSocket, &readfds);
FD_SET(m_NEManaListenSocket, &readfds); int nRet = select(0, &readfds, NULL, NULL, &RevTimeOut); if (0 == nRet) // 如果超时,继续这个循环
{
continue;
}
else if(nRet < 0)
{
printf("Select error, errorcode = %d\n", WSAGetLastError());
continue;
} SOCKET newsock = 0;
ULONG newip = 0;
struct sockaddr_in addr;
int len = sizeof(addr); // 检查ManageListensocket, 接受一个新socket,同时放入连接池
if (FD_ISSET(m_NEManaListenSocket, &readfds) != 0)
{
if ((newsock = WSAAccept(m_NEManaListenSocket, (struct sockaddr *)&addr, &len, NULL, 0)) != INVALID_SOCKET)
{
// 为新接受的连接socket分配内存,若分配失败,结束
if ((PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA))) == NULL)
{
printf("GlobalAlloc failed with error %d\n", GetLastError());
return;
} newip = ntohl(addr.sin_addr.S_un.S_addr); // 是否超过 server 允许同一个IP的最大client的连接数
if (m_NELinkPool.ExceedNumberOfSameIp(newip))
{
// 替换掉此IP的最晚的连接
SOCKET lastsocket = m_NELinkPool.GetLastSocket(newip);
closesocket(lastsocket);
m_NELinkPool.DeleteSocket(lastsocket);
m_BeapTable.DeleteSocket(lastsocket);
DoNEExceedNumberEvent(lastsocket, newip);
} // 增加连接
if (m_NELinkPool.IsNotFullAndAddSocket(newsock, newip))
{
SetSocketPara(newsock);
DoCreateNEMangeConnEvent(newsock, newip); // 增加连接成功后设置socket对应的PerHandleData,并将socket与完成端口绑定以便接收数据
PerHandleData->Socket = newsock;
PerHandleData->ip = newip; CreateIoCompletionPort((HANDLE)newsock, CompletionPort, (DWORD)PerHandleData, 0); if ((PerIOData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))) == NULL)
{
// 分配内存失败,则释放先前分配的内存,并关闭socket
closesocket(newsock);
m_NELinkPool.DeleteSocket(newsock);
m_BeapTable.DeleteSocket(newsock);
goto CHECK_UP_SOCKET;
} ZeroMemory(&(PerIOData->OVerlapped), sizeof(OVERLAPPED)); PerIOData->BytesRecv = 0;
PerIOData->BytesSend = 0;
PerIOData->DATABuf.len = DATA_BUFSIZE;
PerIOData->DATABuf.buf = PerIOData->Buffer; DWORD Flags = 0;
DWORD RecvBytes;
if (WSARecv(newsock, &(PerIOData->DATABuf), 1, &RecvBytes, &Flags, &(PerIOData->OVerlapped), NULL) == SOCKET_ERROR)
{
// socket出错时,将其关闭,并从连接队列中删除
// 这时产生完成通知,由工作线程负责释放为其分配的PerHandleData和PerIOData
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed, Error_Code = %d\n", WSAGetLastError());
closesocket(newsock);
m_NELinkPool.DeleteSocket(newsock);
m_BeapTable.DeleteSocket(newsock);
}
} }
else
{
DoNEAcceptFullEvent(newsock, newip);
closesocket(newsock);
GlobalFree(PerHandleData);
}
}
         }//while
return true;
}下面是工作线程处理:
DWORD WINAPI CCommunication::ServerWorkerThread(LPVOID lpParam)
{
// 提取传递过来的参数,包括this指针和完成端口
PMY_PARA_STRU my_para;
my_para = (PMY_PARA_STRU)lpParam; CCommunication* pthis = reinterpret_cast<CCommunication*>(my_para->pointer);
HANDLE CompletionPort = my_para->CompletionPortID; unsigned long  BytesTransferred = 9999;
unsigned long  RecvBytes;
unsigned long  Flags; time_t CheckLinkTime = 0;
time_t KeepAliveTime = 0;
time_t WriteLogTime = time(0); // 定义对应的单句柄数据指针
LPPER_HANDLE_DATA PerHandleData = NULL;
LPPER_IO_OPERATION_DATA PerIOData = NULL; int actlen = 0;                      // 接收长度
SOCKET CurSock = 0;                  // 当前的socket
int proxy = NO_PROXY;                // 代理标志
SockPacket CurSockInfo;              // 当前的socket所有信息
char buf[4096]; while(1)
{
time_t NowTime = time(0);
BOOL ret; ret = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIOData, 1000); int error_code = WSAGetLastError(); // 超时,则继续等待
if (ret == 0 && PerIOData == NULL)
{
if (error_code == WAIT_TIMEOUT)
{
continue;
}
} // 获得保存的socket相关信息,包括socket、ip、上次收到数据时间以及未处理的数据
CurSock = PerHandleData->Socket;
pthis->m_NELinkPool.GetSocketInfo(CurSock, &CurSockInfo);

// 首先检查套接字上是否发生错误,如果发生了则关闭套接字并清楚同套接字相关的SOCKET_INFORATION结构体
if (BytesTransferred == 0 && (PerIOData->OperationType == RECV_POSTED || PerIOData->OperationType == SEND_POSTED))
{
printf("Closing Socket %d, code = %d\n", PerHandleData->Socket, WSAGetLastError());
closesocket(PerHandleData->Socket);
pthis->m_NELinkPool.DeleteSocket(PerHandleData->Socket);
pthis->m_BeapTable.DeleteSocket(PerHandleData->Socket); GlobalFree(PerHandleData);
GlobalFree(PerIOData); continue;
} // 接收到数据时
if (PerIOData->OperationType == RECV_POSTED)
{
actlen = BytesTransferred;
memset(buf, 0, sizeof(buf));
strncpy(buf, PerIOData->Buffer, actlen);
pthis->m_NELinkPool.SetCurTime(CurSock, time(NULL));
//pthis->DoNERecvDataEvent(CurSock, CurSockInfo.ip, buf, actlen); // 获取完整包
proxy = NO_PROXY;
string pack = "";
BEAP_FACTORY factoryVer = UNKNOWN;
// 这里的GetFirstPacket和GetNextPacket内处理了报文粘连
for (bool b=pthis->m_NELinkPool.GetFirstPacket(CurSock, buf, actlen, pack, &proxy, &factoryVer);
b;
b=pthis->m_NELinkPool.GetNextPacket(CurSock, pack, &proxy, &factoryVer))
{
// 注意:curSockInfo中laststr少了刚刚接收的信息, 如果以下要使用laststr字段,需修改代码
pthis->DealNEEvent(pack.c_str(), static_cast<int>(pack.size()), proxy, CurSockInfo, factoryVer);

}
} Flags = 0;
ZeroMemory(&(PerIOData->OVerlapped), sizeof(OVERLAPPED)); PerIOData->DATABuf.buf = PerIOData->Buffer;
PerIOData->DATABuf.len = DATA_BUFSIZE; if (WSARecv(PerHandleData->Socket, &(PerIOData->DATABuf), 1, &RecvBytes, &Flags, &(PerIOData->OVerlapped), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecv() failed with error %d\n", WSAGetLastError()); closesocket(PerHandleData->Socket);
pthis->m_NELinkPool.DeleteSocket(PerHandleData->Socket);
pthis->m_BeapTable.DeleteSocket(PerHandleData->Socket);
}
}
}
return 1;
}
另外还有一个线程用来判断socket是否超时,我测试过,当socket超时时我直接调用closesocket,这时GetQueuedCompletionStatus就会返回,然后释放
为这个socket分配的单句柄数据和单IO数据。我不明白的是GetQueuedCompletionStatus都在哪些情况下会接收到完成通知消息呢?
另外,请僵哥帮忙看一下我这个程序处理有没有可能发生丢包现象呢?

解决方案 »

  1.   

    当你对与相应完成端口绑定的句柄(包括文件和SOCKET描述符等)调用了AcceptEx,ConnectEx,WSASend,WSARecv,WSASendTo,WSARecvFrom,ReadFile,WriteFile等支持Overlap的API,并带上合法的Overlapped结构,调用成功(result = true or GetLastError = ERROR_IO_PENDING)之后,当这些操作终止之时(可能是成功返回,也可以是由于其它异常导致返回)就会产生完成通知。
      

  2.   

    如果调用如WSASend返回的是false,并且WSAGetLastError(或GetLastError)返回的不是ERROR_IO_PENDING,那么此时应用层就已经得到了一个调用失败的结果,从而就不会被放置到请求队列当中,也就不会有完成通知。
      

  3.   

    哦,原来是这样,明白了!另外,能不能麻烦僵哥帮我大体检查一下上面的程序哪里有错啊,还有,不知道这个问题你有没有碰到过,我已经用了2天时间了,也没找出原因在哪
    http://topic.csdn.net/u/20081211/10/31dba266-533b-4758-b0c6-5519d5c04000.html
      

  4.   

    之所以为让僵哥帮我检查上面的程序是因为看到有些人说“WSARecv并不保证一次读完你要读的数据”这是不是意味着,假设我客户端发送一个“1234567898765432123456789”这样一个消息的话WSARecv一次调用后只能接收到“1234567898765432123456”呢?如果这样的话那如果客户端又发送了一个“abcdefghijklmn”消息的话,在调用WSARecv就会收到
    “789abcdefghijklmn”呢?
      

  5.   

    有都可能。TCP流协议,需要自己处理消息边界。
      

  6.   


    发送端可以加一个延迟,如sleep(1),发送频率高的间隔时间要稍微长一些
      

  7.   

    在Windows下Sleep的精确度在15~50毫秒之间,对于一个1G带宽的网络,如此的延迟对单个连接上的数据流转会带来什么样的影响?
      

  8.   

    呵呵 延迟其实是不用的,我做了一个实验,让客户端不停地发送1000条数据,
    在每次调用WSARecv()之后将收到的内容打印出来,发现粘连很严重,但是总条数的确
    没有少,粘连的情况我已经在for (bool b=pthis->m_NELinkPool.GetFirstPacket(CurSock, buf, actlen, pack, &proxy, &factoryVer);
                    b;
                    b=pthis->m_NELinkPool.GetNextPacket(CurSock, pack, &proxy, &factoryVer))
                {
                    // 注意:curSockInfo中laststr少了刚刚接收的信息, 如果以下要使用laststr字段,需修改代码
                    pthis->DealNEEvent(pack.c_str(), static_cast<int>(pack.size()), proxy, CurSockInfo, factoryVer);
                        
                }处理过了的。
      

  9.   

    对了,僵哥,如果我对一个socket做了closesocket操作,是不是一定也会使得
    GetQueuedCompletionStatus()函数得到完成通知呢?
      

  10.   

    未处理的请求?那对于socket都有哪些操作会使得有请求放到队列里面呢?