IOCP性能优化主要是集中在每个处理接收数据和发送数据的对象锁,如果能降低锁的调用次数和提高锁的效率,对IOCP的整理效率和吞吐量都非常有帮助。有很多开发人员在优化IOCP的时候,对于如何提高锁的效率,有使用原子操作来加锁,这样做的效率比较调用Windows的锁效率,因而吞吐率也更高。我们不讨论提高锁的效率,我们这次的优化是降低锁的调用次数。服务端优化    IOCP是异步线程通知方式,有多个线程在调用GetQueuedCompletionStatus函数等待返回,如果有发送和接收,如果正在处理中,则会导致发送等待,这是因为我们每次处理IOCP返回,我们都对TSocketHandle进行了加锁,如下:              ClientSocket.Lock.Enter;
              try
                if Assigned(ClientSocket.SocketHandle) then
                begin
                  ClientSocket.SocketHandle.ProcessIOComplete(IocpRecord, iWorkCount);
                  if not ClientSocket.SocketHandle.Connected then
                    FreeSocketHandle(ClientSocket.SocketHandle);
                end;
              finally
                ClientSocket.Lock.Leave;
              end;    因此我们优化这个锁的进入次数,则能提高整个系统的吞吐量和传输性能。服务器和客户端在发送数据时,是调用不同的函数分别发送,如服务端下发响应信息函数:    procedure TBaseSocket.DoSendResult;
    var
      wLen: Word;
      cLen: Cardinal;
      utf8Buff: UTF8String;
    begin
      if not Connected then Exit;
      utf8Buff := AnsiToUtf8(FResponse.Text); //转为UTF8
      if LenType = ltWord then
      begin
        wLen := Length(utf8Buff) + SizeOf(Cardinal); //总长度
        WriteWord(wLen, False);
      end
      else if LenType = ltCardinal then
      begin
        cLen := Length(utf8Buff) + SizeOf(Cardinal); //总长度
        WriteCardinal(cLen, False);
      end;
      WriteCardinal(Length(utf8Buff), False); //下发命令长度
      WriteBuffer(PUTF8String(utf8Buff)^, Length(utf8Buff)); //命令内容
    end;    这里调了多次Write函数,每次Write函数最终都会调用函数WriteBuffer,每次调用WriteBuffer都会通过WSASend投递一次IOCP请求,从而造成多次进入TSocketHandle的锁。WriteBuffer函数如下:    procedure TSocketHandle.WriteBuffer(const ABuffer; const ACount: Integer;
      const AIocpOperate: TIocpOperate);
    var
      IocpRec: PIocpRecord;
      iErrCode: Integer;
      dSend, dFlag: DWORD;
    begin
      IocpRec := FSendOverlapped.Allocate(ACount);
      FillChar(IocpRec.Overlapped, SizeOf(IocpRec.Overlapped), 0);
      IocpRec.IocpOperate := AIocpOperate;
      System.Move(ABuffer, IocpRec.WsaBuf.buf^, ACount);
      dFlag := 0;
      if WSASend(FSocket, @IocpRec.WsaBuf, 1, dSend, dFlag, @IocpRec.Overlapped, nil) = SOCKET_ERROR then
      begin
        iErrCode := WSAGetLastError;
        if iErrCode <> ERROR_IO_PENDING then
        begin
          FIocpServer.DoError('WSASend', GetLastWsaErrorStr);
          ProcessNetError(iErrCode);
        end;
      end;  
    end;    解决办法是我们把发送引入缓存机制,把发送的数据缓存在内存中,最后调用函数一次发送,具体实现函数分为打开发送缓存和Flush发送缓存,代码如下:    procedure TSocketHandle.OpenWriteBuffer;
    begin
      if Assigned(FOutputBuf) then
        FreeAndNil(FOutputBuf);
      FOutputBuf := TMemoryStream.Create;
    end;    procedure TSocketHandle.FlushWriteBuffer(const AIocpOperate: TIocpOperate);
    var
      IocpRec: PIocpRecord;
      iErrCode: Integer;
      dSend, dFlag: DWORD;
    begin
      IocpRec := FSendOverlapped.Allocate(FOutputBuf.Size);
      IocpRec.Overlapped.Internal := 0;
      IocpRec.Overlapped.InternalHigh := 0;
      IocpRec.Overlapped.Offset := 0;
      IocpRec.Overlapped.OffsetHigh := 0;
      IocpRec.Overlapped.hEvent := 0;
      IocpRec.IocpOperate := AIocpOperate;
      System.Move(PAnsiChar(FOutputBuf.Memory)[0], IocpRec.WsaBuf.buf^, FOutputBuf.Size);
      dFlag := 0;
      if WSASend(FSocket, @IocpRec.WsaBuf, 1, dSend, dFlag, @IocpRec.Overlapped, nil) = SOCKET_ERROR then
      begin
        iErrCode := WSAGetLastError;
        if iErrCode <> ERROR_IO_PENDING then
        begin
          FIocpServer.DoError('WSASend', GetLastWsaErrorStr);
          ProcessNetError(iErrCode);
        end;
      end;
      FreeAndNil(FOutputBuf);
    end;    调用写入函数就是先写入缓存,具体实现代码如下:    procedure TSocketHandle.WriteBuffer(const ABuffer; const ACount: Integer);
    begin
      FOutputBuf.WriteBuffer(ABuffer, ACount);
    end;    procedure TSocketHandle.WriteWord(AValue: Word; const AConvert: Boolean = True);
    begin
      if AConvert then
        AValue := htons(AValue);
      WriteBuffer(AValue, SizeOf(AValue));
    end;    procedure TSocketHandle.WriteCardinal(AValue: Cardinal;
      const AConvert: Boolean);
    begin
      if AConvert then
        AValue := htonl(AValue);
      WriteBuffer(AValue, SizeOf(AValue));
    end;    procedure TSocketHandle.WriteInteger(AValue: Integer; const AConvert: Boolean);
    begin
      WriteCardinal(Cardinal(AValue), AConvert);
    end;    procedure TSocketHandle.WriteSmallInt(AValue: SmallInt; const AConvert: Boolean);
    begin
      WriteWord(Word(AValue), AConvert);
    end;    procedure TSocketHandle.WriteString(const AValue: string);
    var
      iLen: Integer;
    begin
      iLen := Length(AValue);
      if iLen > 0 then
        WriteBuffer(PChar(AValue)^, iLen);
    end;    procedure TSocketHandle.WriteLn(const AValue: string);
    begin
      WriteString(AValue + EOL);
    end;    改造后的服务器下发响应函数如下:    procedure TBaseSocket.DoSendResult(const AWriteNow: Boolean);
    var
      wLen: Word;
      cLen: Cardinal;
      utf8Buff: UTF8String;
    begin
      if not Connected then Exit;
      if AWriteNow then
        OpenWriteBuffer;
      utf8Buff := AnsiToUtf8(FResponse.Text); //转为UTF8
      if LenType = ltWord then
      begin
        wLen := Length(utf8Buff) + SizeOf(Cardinal); //总长度
        WriteWord(wLen, False);
      end
      else if LenType = ltCardinal then
      begin
        cLen := Length(utf8Buff) + SizeOf(Cardinal); //总长度
        WriteCardinal(cLen, False);
      end;
      WriteCardinal(Length(utf8Buff), False); //下发命令长度
      WriteBuffer(PUTF8String(utf8Buff)^, Length(utf8Buff)); //命令内容
      if AWriteNow then
        FlushWriteBuffer(ioWrite); //直接发送
    end;客户端优化    客户端我们是使用INDY组件中的TIdTCPClient,这个组件也支持使用缓存的方式,具体调用代码如下:          FClient.OpenWriteBuffer(-1); //-1表示外面手动刷新发送
          try
            slBuff.Add('[' + CSRequest + ']');
            slBuff.Add(CSCommand + CSEqualSign + ACommand);
            if Assigned(ARequest) then
              slBuff.AddStrings(ARequest);
            sBuff := slBuff.Text;
            utf8Command := AnsiToUtf8(sBuff); //把ANSI转为UTF-8
            if not IsEmptyStr(AData) then
              utf8Data := AnsiToUtf8(AData); //把ANSI转为UTF-8
            dwPacketLen := SizeOf(Cardinal) + Length(utf8Command) + Length(utf8Data); //总长度
            dwCommandLen := Length(utf8Command); //命令长度
            FClient.WriteCardinal(dwPacketLen, False); //发送整个包长度
            FClient.WriteCardinal(dwCommandLen, False); //发送命令长度
            FClient.WriteBuffer(PUTF8String(utf8Command)^, dwCommandLen, False); //发送命令内容
            if not IsEmptyStr(AData) then
              FClient.WriteBuffer(PUTF8String(utf8Data)^, Length(utf8Data), False); //发送数据
            FClient.CloseWriteBuffer; //发送当前全部数据
          except
            on E: Exception do
            begin
              FLastError := E.Message;
              Result := False;
              FClient.CancelWriteBuffer;
              Exit;
            end;
          end;优化效果    在广域网的时候,优化锁的调用次数对性能提升比较明显,尤其是小数据量的交互响应时间,能降低一半。吞吐量的提升在局域网测试的时候,可以把千兆网卡占完,达到125MB/S(千兆网卡是1Gb/S / 8 = 125MB/S,网卡的工业标准是使用bit)。V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;
V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:[email protected]性能优化socket高性能iocp并发

解决方案 »

  1.   

    感谢LZ的分享精神.
    友情帮LZ整理一下发送时,数据处理的情况:
    整理如下:
    TMemoryStream.Create 1次,包含GetMem 1次, FillChar 1次
    TMemoryStream.Free   1次,包含FreeMem 2次
    TMemoryStream.Write     包含至少1次GetMem,如果用户数据>$2000,那将增加GetMem 1次, Move 1次,FreeMem 1次
    复制用户数据             2次
    发送结构                     申请结构至少1次GetMem,释放结构至少1次FreeMem总共>=3次GetMem,>=3次FreeMem,复制用户数据2次.
    Delphi自带的内存管理机制也好,FastMM也好,多线程下GetMem,FreeMem都有锁操作.
      

  2.   

    非常感谢楼主的奉献!
    水平有限,这D7下的控件和代码,不知道怎么修改才能在XE2下安装和使用,还有劳大牛弄个XE2下的,先谢谢啦!