在从队列中取数据时,经常会堵着不动,线程开的越多,堵的概率越大。
各位大虾能给点意见吗???

解决方案 »

  1.   

    “线程开的越多,堵的概率越大”用IOCP,为什么要使用很多线程,一般使用CPU数目的2-3倍就可以了啊。"用信号量了吗?"
    用IOCP,应该不再使用信号量了吧。每一个工作者线程都是一个可等的内核对象啊。
    我倒建议gujianfei1979 (谷谷) 仔细检查一下内部代码,是不是有“死锁”导致线程被“堵”。这很可能是原因。因为“堵的概率与线程数目成正比”。
      

  2.   

    DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID)
    {
    HANDLE hCompletionPort = (HANDLE) CompletionPortID;
    DWORD  dwBytesTransferred = 0;
    SOCKET hSocket; 
    LPWSAOVERLAPPEDEX lpOvlpEx;
    int nErr;
    char host[16];
    int port;int threadseq =0;
    char writein[ 1024 ]; /* Md5Key added */
    int sign = 0;
    unsigned char signtemp[2];
    unsigned char RanNumModP[MAX_RANDOM_NUM_LEN / 8];
    unsigned char TheRanNumWeNeedSave[MAX_RANDOM_NUM_LEN / 8];
    unsigned char TheBigPrime[MAX_RANDOM_NUM_LEN / 8];
    unsigned char TheLastKeyWeNeed[MAX_RANDOM_NUM_LEN / 8];
    unsigned char RanNumModPOther[MAX_RANDOM_NUM_LEN / 8]; memset((void *)signtemp ,0, 2);
    memset((void *)RanNumModPOther, 0, MAX_RANDOM_NUM_LEN / 8);
    memset((void *)TheLastKeyWeNeed, 0, MAX_RANDOM_NUM_LEN / 8);
    /* Md5Key added */

    while (1)
    {
    BOOL r = GetQueuedCompletionStatus(hCompletionPort,&dwBytesTransferred,(LPDWORD)&hSocket,(LPWSAOVERLAPPED *) &lpOvlpEx,INFINITE );
    printf( "GetLastError =%d\n", GetLastError() );
    // printf( "lpOvlpEx->hSocket =%d\n", lpOvlpEx->hSocket );
    // printf( "hSocket =%d\n", hSocket );
    // printf( "lpOvlpEx->hSocketPair =%d\n", lpOvlpEx->hSocketPair );
    //  printf( "lpOvlpEx->sockflag =%d\n", lpOvlpEx->sockflag );
    // printf( "lpOvlpEx->hConnect =%d\n", lpOvlpEx->hConnect );
    sprintf(lpOvlpEx->filename, "e:/2222222.txt");
    if(GetLastError() == 997)
    {
    memset( writein, 0, 1024 );
    sprintf( writein, "!!!!!!!!!从队列中取出的__%d__%d__squid的编号%d__hsock的编号%d__lphsock的编号%d__lphsockPair的编号%d__状态%d\n", 
    threadid, threadseq, lpOvlpEx->hConnect, hSocket, lpOvlpEx->hSocket, lpOvlpEx->hSocketPair, lpOvlpEx->sockflag );
    WriteInFile1( writein, lpOvlpEx->filename );
    }
    if (!r)
    {
    nErr = WSAGetLastError();
    //printf("GetQueuedCompletionStatus = %d\n",nErr);
    }

    if (hSocket == 0 || lpOvlpEx == NULL)
    {
    // printf( "break!!!!!!!!!!!!! \n" );
    // SafeClose(lpOvlpEx);
    break;
    } //printf("%d bytes transed...\n",dwBytesTransferred);
    if (dwBytesTransferred == 0)
    {
    if (hSocket == lpOvlpEx->hSocket)
    {
    SafeClose(lpOvlpEx);
    printf(" .....................%d..........closed & freed........\n",hSocket);

    }
    continue;
    } if (lpOvlpEx->sockflag == IODATA_SOCK_FLAG_INIT)
    {
    //printf( "lpOvlpEx->sockflag == IODATA_SOCK_FLAG_INIT\n" );
    int method;
    sock5ans1 *m_proxyans5;
    sock4ans1 *m_proxyans4;
    sock5req1 *m_proxyreq5;
    sock4req1 *m_proxyreq4;
    bool jianyan;
    SOCKET sClient = lpOvlpEx->hSocket;
    char buffer5[1024];           
    int bufferlen5=1023;
    char buffer4[1024];           
    int bufferlen4=1023;
    jianyan = false; m_proxyreq5 = (sock5req1 *) lpOvlpEx->Buffer;
    m_proxyreq4 = (sock4req1 *) lpOvlpEx->Buffer;
    method = m_proxyreq5->nMethods; if ( m_proxyreq4->Vn == SOCKS_VN )
    {
    if ( m_proxyreq4->Cd == SOCKS_CONNECT )
    {
    lpOvlpEx->sockflag = IODATA_SOCK_FLAG_RECV_ADDR; ////////
    lpOvlpEx->targetport = (0xff & m_proxyreq4->other[0]) * 0x100
    + (0xff & m_proxyreq4->other[1]);
    //lpOvlpEx->targetaddr = ; // construct reply packet
    m_proxyans4 = ( sock4ans1 *)buffer4;
    m_proxyans4->Vn = SOCKS_4_OUT;
    m_proxyans4->Cd = SOCKS_OUT;
    ZeroMemory( lpOvlpEx, sizeof(WSAOVERLAPPED) );
    memcpy(lpOvlpEx->Buffer, m_proxyans4, sizeof(m_proxyans4));
    lpOvlpEx->WSABuf.len = sizeof(m_proxyans4);
    lpOvlpEx->WSABuf.buf = lpOvlpEx->Buffer; }else if ( m_proxyreq4->Cd == SOCKS_BIND )
    {
    printf( "Socks v4 Bind option" );
    }
    }
    else
    {
    for ( method; method>0; method-- )
    {
    if ( m_proxyreq5->Methods[ method ] == METHOD_AUTH_NO )//因为暂时只要求用无鉴别方式
    {
    jianyan = true;
    }
    }
    if ( m_proxyreq5->Ver == SOCKS_VER && jianyan )
    {
    //鉴别正确,返回消息
    m_proxyans5 = (sock5ans1 *)buffer5;
    m_proxyans5->Ver = SOCKS_VER;
    m_proxyans5->Method = METHOD_AUTH_NO;
    ZeroMemory(lpOvlpEx,sizeof(WSAOVERLAPPED));
    memcpy(lpOvlpEx->Buffer, m_proxyans5, sizeof(m_proxyans5));
    lpOvlpEx->WSABuf.len = sizeof(m_proxyans5);
    lpOvlpEx->WSABuf.buf = lpOvlpEx->Buffer;
    }    
        //转换值
    lpOvlpEx->sockflag = IODATA_SOCK_FLAG_SEND_AUTH;
    } DWORD dwReadBytes = 0,dwFlags = 0;
    if ( WSASend(lpOvlpEx->hSocket,&lpOvlpEx->WSABuf,1,&dwReadBytes,dwFlags,(LPWSAOVERLAPPED)lpOvlpEx,NULL) == SOCKET_ERROR)
    {
    if ( WSAGetLastError() != WSA_IO_PENDING)
    {
    printf("%d Send err = %d",lpOvlpEx->hSocketPair,WSAGetLastError());
    SafeClose(lpOvlpEx);
    continue;
    }
    }
    continue;
    } //  IODATA_SOCK_FLAG_INIT if (lpOvlpEx->sockflag == IODATA_SOCK_FLAG_SEND_AUTH)
    {
    //printf( "lpOvlpEx->sockflag == IODATA_SOCK_SEND_AUTH\n" ); sock5ans2 *m_proxyans2;
    char buffer2[1024];           
    int bufferlen2=1023; lpOvlpEx->sockflag = IODATA_SOCK_FLAG_RECV_ADDR;
    m_proxyans2 = (sock5ans2 *)buffer2;
    m_proxyans2->Ver = SOCKS_VER;
    m_proxyans2->Rep = REP_SUCCESS;
    ZeroMemory(lpOvlpEx,sizeof(WSAOVERLAPPED));
    memcpy(lpOvlpEx->Buffer, m_proxyans2, sizeof(m_proxyans2));
    lpOvlpEx->WSABuf.len = sizeof(m_proxyans2);
    lpOvlpEx->WSABuf.buf = lpOvlpEx->Buffer;
    DWORD dwReadBytes = 0,dwFlags = 0;
    if ( WSASend(lpOvlpEx->hSocket,&lpOvlpEx->WSABuf,1,&dwReadBytes,dwFlags,(LPWSAOVERLAPPED)lpOvlpEx,NULL) == SOCKET_ERROR)
    {
    if ( WSAGetLastError() != WSA_IO_PENDING)
    {
    printf("%d Send err = %d",lpOvlpEx->hSocketPair,WSAGetLastError());
    SafeClose(lpOvlpEx);
    continue;
    }
    }
    continue;
    } // IODATA_SOCK_FLAG_SEND_AUTH if( lpOvlpEx->sockflag == IODATA_SOCK_FLAG_RECV_ADDR )
    {
    //printf( "lpOvlpEx->sockflag == IODATA_SOCK_RECV_ADDR\n" );
    DWORD dwRecvBytes = 0,dwFlags = 0;
    DWORD dwReadBytes = 0;

    lpOvlpEx->WSABuf.len = 1024;
    if ( WSARecv(lpOvlpEx->hSocket,&lpOvlpEx->WSABuf,1,&dwRecvBytes,&dwFlags,(LPWSAOVERLAPPED)lpOvlpEx,NULL) == SOCKET_ERROR)
    {
    if ( WSAGetLastError() != WSA_IO_PENDING)
    {
    printf("%d conn recv err = %d",lpOvlpEx->hSocket,WSAGetLastError());
    SafeClose(lpOvlpEx);
    continue;
    }
    }
    lpOvlpEx->sockflag = IODATA_SOCK_FLAG_SEND_KEY;
    continue;
    } //IODATA_SOCK_FLAG_RECV_ADDR
      

  3.   

    if (lpOvlpEx->sockflag == IODATA_SOCK_FLAG_SEND_KEY)
    {
    //printf( "lpOvlpEx->sockflag == IODATA_SOCK_SEND_KEY\n" );
    lpOvlpEx->cryptflag = IODATA_CRYPT_FLAG_IDEA; //改为界面控制 if (lpOvlpEx->cryptflag == IODATA_CRYPT_FLAG_IDEA)
    {
    int x;
    unsigned char TEMP[258];
    CMProxy3 *pProxy = (CMProxy3 *)lpOvlpEx->pProxy3;
    DWORD dwRecvBytes = 0,dwFlags = 0;
    DWORD dwReadBytes = 0; strcpy( host, "10.1.25.100" ); //改为界面控制
    port = 3128; //
    SOCKADDR_IN remoteAddr;
    ZeroMemory ( &remoteAddr, sizeof (remoteAddr) );
    remoteAddr.sin_family = AF_INET;
    remoteAddr.sin_port = htons ( port );
    remoteAddr.sin_addr.s_addr = inet_addr( host );

    if( remoteAddr.sin_addr.s_addr == INADDR_NONE )
    {
    hostent * hosts = gethostbyname( host );
    if( hosts == NULL )
    {
    closesocket( lpOvlpEx->hSocket );
    GlobalFree( lpOvlpEx );
    printf( "err address\n" );
    continue;
    }
    memcpy( &remoteAddr.sin_addr, hosts->h_addr_list[0], hosts->h_length );
    } SOCKET hConnect;
    hConnect = WSASocket ( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
    if( hConnect == INVALID_SOCKET )
    {
    printf( "Socket Create Failed\n" );
    closesocket( lpOvlpEx->hSocket );
    GlobalFree( lpOvlpEx );
    continue;
    }

    int rr = connect( hConnect, (PSOCKADDR)&remoteAddr, sizeof(remoteAddr) );
    if ( rr )
    {
    printf( "Socket Connect Failed\n" );
    closesocket( hConnect );
    closesocket( lpOvlpEx->hSocket );
    GlobalFree( lpOvlpEx );

    continue;
    }//threadseq ++;
    /////交换密码数组///// sign = GenTheNumSendTo(RanNumModP, TheRanNumWeNeedSave, TheBigPrime, sign);
    signtemp[0] = sign;
    for(x=0; x < 256; x++)
    {
    TEMP[x] = RanNumModP[x];
    }
    memcpy(&TEMP[x], signtemp, 2); int ret1;
    int ret2;
    char keyarry[ 256 ];

    ret1 = send( hConnect, (char*)TEMP, sizeof(TEMP), 0 );
    ret2 = recv( hConnect,  keyarry, 256, 0 );
    /////交换结束///////// /////算出KEY//////////
    memcpy(RanNumModPOther, keyarry, sizeof(RanNumModPOther));
    set_precision(bits2units(MAX_RANDOM_NUM_LEN));
    mp_modexp((unit *)TheLastKeyWeNeed, (unit *)RanNumModPOther, (unit *)TheRanNumWeNeedSave, (unit *)TheBigPrime); Md5DegistFunction(TheLastKeyWeNeed, sizeof(TheLastKeyWeNeed), lpOvlpEx->Md5Key);
    /////算KEY结束////////
    ideaCfbInit(&lpOvlpEx->cfb_De, lpOvlpEx->Md5Key);
    int iSize = sizeof( WSAOVERLAPPEDEX );

    WSAOVERLAPPEDEX * pConnOvlEx = ( WSAOVERLAPPEDEX * ) GlobalAlloc( GPTR, sizeof(WSAOVERLAPPEDEX) );
    lpOvlpEx->pPair = pConnOvlEx;//互相
    pConnOvlEx->pPair = lpOvlpEx;//指定
    lpOvlpEx->hSocketPair = hConnect;
    strcpy( pConnOvlEx->filename, lpOvlpEx->filename );
    lpOvlpEx->hConnect = hConnect; // direction flag
    lpOvlpEx->sockflag = IODATA_SOCK_FLAG_WORKING; //////
    pConnOvlEx->hSocketPair = lpOvlpEx->hSocket;
    pConnOvlEx->hSocket = hConnect;
    pConnOvlEx->WSABuf.buf = pConnOvlEx->Buffer;
    pConnOvlEx->WSABuf.len = DATA_BUFSIZE;
    pConnOvlEx->pProxy3 = pConnOvlEx->pProxy3;
    pConnOvlEx->sockflag = lpOvlpEx->sockflag; //////
    memcpy( pConnOvlEx->Md5Key, lpOvlpEx->Md5Key, sizeof(lpOvlpEx->Md5Key) );
    memcpy( &(pConnOvlEx->cfb_De), &(lpOvlpEx->cfb_De), sizeof(lpOvlpEx->cfb_De) );
    pConnOvlEx->hConnect = lpOvlpEx->hConnect; // direction flag
    CreateIoCompletionPort( (HANDLE)hConnect, hCompletionPort, (DWORD)hConnect, 0 ); dwRecvBytes = 0; 
    dwFlags = 0;
    ZeroMemory( pConnOvlEx, sizeof(WSAOVERLAPPED) );
    pConnOvlEx->WSABuf.len = DATA_BUFSIZE;
    if( WSARecv( hConnect, &pConnOvlEx->WSABuf, 1, &dwRecvBytes, &dwFlags, (LPWSAOVERLAPPED)pConnOvlEx, NULL ) == SOCKET_ERROR )
    {
    if( WSAGetLastError() != WSA_IO_PENDING )
    {
    printf( "%d conn recv err = %d", pConnOvlEx->hSocket, WSAGetLastError() );
    SafeClose( lpOvlpEx );
    continue;
    }
    } ////处理头函数
    lpOvlpEx->requestState = HTTP_HEAD_DEAL_HEAD;
    DealWithHead( lpOvlpEx->Buffer, 
      lpOvlpEx->Buffer, 
      lpOvlpEx->targetport, 
      &lpOvlpEx->requestState
    );
    ////取得长度
    for( dwBytesTransferred = 0; dwBytesTransferred <= strlen( lpOvlpEx->Buffer ); dwBytesTransferred++ )
    {
    if( lpOvlpEx->Buffer[ dwBytesTransferred ] == '\0' )
    {
    break;
    }
    }
    lpOvlpEx->WSABuf.len = dwBytesTransferred;
    memset( writein, 0, 1024 );
    sprintf( writein, "发给squid的__%d__%d__squid的编号%d__hsock的编号%d__hsockPair的编号%d__状态SENDKEY\n", 
    threadid, threadseq, lpOvlpEx->hConnect, lpOvlpEx->hSocket, lpOvlpEx->hSocketPair );
    WriteInFile1( lpOvlpEx->Buffer, lpOvlpEx->filename );
    /////加密
    ideaCfbEncrypt( &(lpOvlpEx->cfb_De),
    (unsigned char *)lpOvlpEx->Buffer, 
    (unsigned char *)lpOvlpEx->Buffer, 
    lpOvlpEx->WSABuf.len
      );
    //////传送
    if( WSASend( lpOvlpEx->hSocketPair, &lpOvlpEx->WSABuf, 1, &dwReadBytes, dwFlags, (LPWSAOVERLAPPED)lpOvlpEx, NULL ) == SOCKET_ERROR )
    {
    if( WSAGetLastError() != WSA_IO_PENDING )
    {
    printf( "%d Send err = %d", lpOvlpEx->hSocketPair, WSAGetLastError() );
    SafeClose( lpOvlpEx );
    continue;
    }
    }
    continue;
    } // lpOvlpEx->sockflag == IODATA_SOCK_FLAG_SEND_KEY
      

  4.   

    continue;
    } // lpOvlpEx->sockflag == IODATA_SOCK_FLAG_SEND_KEY

    if( lpOvlpEx->sockflag == IODATA_SOCK_FLAG_WORKING )
    {
    printf( "lpOvlpEx->sockflag == IODATA_SOCK_FLAG_WORKING\n" );
    if( hSocket == lpOvlpEx->hSocket )
    {
    // forwarding
    lpOvlpEx->WSABuf.len = dwBytesTransferred;
    if( lpOvlpEx->cryptflag == IODATA_CRYPT_FLAG_IDEA )
    {
    if( lpOvlpEx->hSocketPair == lpOvlpEx->hConnect/*往squid发的*/ )
    {
    lpOvlpEx->requestState = HTTP_HEAD_DEAL_HEAD;
    DealWithHead( lpOvlpEx->Buffer, 
      lpOvlpEx->Buffer, 
      lpOvlpEx->targetport, 
      &lpOvlpEx->requestState
    );
    for( dwBytesTransferred = 0; dwBytesTransferred <= strlen( lpOvlpEx->Buffer ); dwBytesTransferred++ )
    {
    if( lpOvlpEx->Buffer[ dwBytesTransferred ] == '\0' )
    {
    break;
    }
    }

    lpOvlpEx->WSABuf.len = dwBytesTransferred;
    memset( writein, 0, 1024 );
    sprintf( writein, "发给squid的__%d__%d__squid的编号%d__hsock的编号%d__hsockPair的编号%d__状态working\n", 
    threadid, threadseq, lpOvlpEx->hConnect, lpOvlpEx->hSocket, lpOvlpEx->hSocketPair );
    WriteInFile1( lpOvlpEx->Buffer, lpOvlpEx->filename );
    IdeaEncryptFunction( lpOvlpEx->Md5Key, 
     (unsigned char *)lpOvlpEx->Buffer, 
     (unsigned char *)lpOvlpEx->Buffer, 
     lpOvlpEx->WSABuf.len
       );
    }//加密
    else
    {
    /*往浏览器发的*/
    ideaCfbDecrypt( &(lpOvlpEx->cfb_De),
    (unsigned char *)lpOvlpEx->Buffer, 
    (unsigned char *)lpOvlpEx->Buffer, 
    lpOvlpEx->WSABuf.len 
      );
    memset( writein, 0, 1024 );
    sprintf( writein, "发给IE的__%d__%d__squid的编号%d__hsock的编号%d__hsockPair的编号%d__状态working\n", 
    threadid, threadseq, lpOvlpEx->hConnect, lpOvlpEx->hSocket, lpOvlpEx->hSocketPair );
    WriteInFile1( lpOvlpEx->Buffer, lpOvlpEx->filename );
    }//解密 }
    else
    {
    if( lpOvlpEx->hSocketPair == lpOvlpEx->hConnect/*往squid发的*/ )
    {
    lpOvlpEx->requestState = HTTP_HEAD_DEAL_HEAD;
    DealWithHead( lpOvlpEx->Buffer, 
      lpOvlpEx->Buffer, 
      lpOvlpEx->targetport, 
      &lpOvlpEx->requestState
    );
    }
    } DWORD dwReadBytes = 0,dwFlags = 0;

    ZeroMemory( lpOvlpEx, sizeof(WSAOVERLAPPED) );
    lpOvlpEx->WSABuf.len = dwBytesTransferred;

    if( WSASend( lpOvlpEx->hSocketPair, &lpOvlpEx->WSABuf, 1, &dwReadBytes, dwFlags, (LPWSAOVERLAPPED)lpOvlpEx, NULL ) == SOCKET_ERROR )
    {
    if( WSAGetLastError() != WSA_IO_PENDING )
    {
    //printf( "%d Send err = %d", lpOvlpEx->hSocketPair, WSAGetLastError() );
    SafeClose( lpOvlpEx );
    continue;
    }
    }
    }
    else if( hSocket == lpOvlpEx->hSocketPair )
    {
    // listerning
    //数据发送完,转入接收状态
    DWORD dwRecvBytes = 0, dwFlags = 0;
    lpOvlpEx->WSABuf.len = DATA_BUFSIZE;
    ZeroMemory( lpOvlpEx, sizeof(WSAOVERLAPPED) );
    ZeroMemory( lpOvlpEx->Buffer, sizeof(lpOvlpEx->Buffer) );
    if( WSARecv( lpOvlpEx->hSocket, &lpOvlpEx->WSABuf, 1, &dwRecvBytes, &dwFlags, (LPWSAOVERLAPPED)lpOvlpEx, NULL ) == SOCKET_ERROR )
    {
    if( WSAGetLastError() != WSA_IO_PENDING)
    {
    //printf( "%d recv err = %d\n", lpOvlpEx->hSocket, WSAGetLastError() );
    SafeClose( lpOvlpEx );
    continue;
    }
    }
    }
    else // fail
    {
    SafeClose( lpOvlpEx );
    }
    }
    }
    return 0;
    }
      

  5.   

    应该使用一个管理线程创建线程池
    通过信号量控制工作线程用时打开线程不用时关闭始终只有一个线程等待
    不过你的编程风格实在不敢恭维
    比如不写注释,还有你那注释掉的printf为什么不用TRACE宏输出调试信息
    搞不定问我要例子
      

  6.   

    请给我一个例子谢谢!  [email protected]