SOCKET通信中用到,收到数据时要立即写入,另一个处理线程读取数据处理.现在写入和读取是用临界区保护,一个在读的时候另一个不能写,但因为读取后的处理会比较慢,所以想能不能实现一个写入可以立即完成,但读取允许等待的队列??

解决方案 »

  1.   

    这是符合要求,不过是bcb做的,对着改改,看能不能用。
    //---------------------------------------------------------------------------
    // Que.h
    //---------------------------------------------------------------------------
    #ifndef QueH
    #define QueH#include <vcl.h>
    #pragma hdrstop//---------------------------------------------------------------------------
    class TWaitableQueue
    {
    private:
       TList             *_List;
       DWORD              _MaxSize;
       HANDLE             _MutSem[2];
    public:
       // 队列的最大项个数为 65535 项
       __fastcall TWaitableQueue(const DWORD MaxSize = 0x0000FFFF);
       __fastcall ~TWaitableQueue(void);   // 返回 Push 后,队列项的个数
       // -1 : 参数错误, aItem == NULL
       // -2 : 超时
       // -3 : 队列满,无法 Push
       int  __fastcall PushItem(void*  aItem, const DWORD aTimeout = 3000);
       // 返回 Take 后,队列项的个数,如果队列为空,则一直等待
       // -1 : 参数错误, aItem == NULL
       // -2 : 超时
       // -3 : 失败
       int  __fastcall TakeItem(void** aItem, const DWORD aTimeout = 3000);};
    //---------------------------------------------------------------------------
    #endif
    //---------------------------------------------------------------------------
    // Que.cpp
    //---------------------------------------------------------------------------#pragma hdrstop#include "Que.h"//---------------------------------------------------------------------------
    #pragma package(smart_init)
    //---------------------------------------------------------------------------
    __fastcall TWaitableQueue::TWaitableQueue(const DWORD MaxSize)
    {
      _List = new TList();
      _MaxSize = MaxSize;
      _MutSem[0] = ::CreateMutex(NULL, FALSE, NULL);
      _MutSem[1] = ::CreateSemaphore(NULL, 0, _MaxSize, NULL);
    }
    //---------------------------------------------------------------------------
    __fastcall TWaitableQueue::~TWaitableQueue(void)
    {
      ::CloseHandle(_MutSem[0]);
      ::CloseHandle(_MutSem[1]);
      delete _List;
    }
    //---------------------------------------------------------------------------
    int __fastcall TWaitableQueue::PushItem(void* aItem, const DWORD aTimeout)
    {
      if(aItem == NULL) return -1;  DWORD   dwRet;
      dwRet = ::WaitForSingleObject(_MutSem[0], aTimeout);
      if(dwRet == WAIT_TIMEOUT)
        return -2;  long    prev = -3;  if(::ReleaseSemaphore(_MutSem[1], 1, &prev)) {
        _List->Add(aItem);
        prev = prev + 1;
      }
      ::ReleaseMutex(_MutSem[0]);  return prev;}
    //---------------------------------------------------------------------------
    int __fastcall TWaitableQueue::TakeItem(void** aItem, const DWORD aTimeout)
    {
      if(aItem == NULL) return -1;
      *aItem = NULL;  DWORD   dwRet;
      dwRet = ::WaitForMultipleObjects(2, _MutSem, TRUE, aTimeout);
      if(dwRet == WAIT_TIMEOUT)
        return -2;  long    prev = -3;
      int i = _List->Count;
      if(i>0) {
        *aItem = _List->Items[0];
        _List->Delete(0);
        prev = i - 1;
      }  ::ReleaseMutex(_MutSem[0]);
      return prev;
    }
    //---------------------------------------------------------------------------
      

  2.   

    另外说明一下,上面的队列内部用 TList ,Delphi 的TList其实也是用数组实现的,效率不必担心
      

  3.   

    昨天回去翻了一下书,找到一个解决的办法,在弹出的时候建立一个PopIndex 的快照,就是用另一个变量保存PopIndex的值function Pop
    var 
     TempPopIndex:integer;
    begin
      第一步
      EnterCriticalSection(FSection);
      TempPopIndex:=PopIndex;
      LeaveCriticalSection(FSection);
      第二步
      if Assigned(OnStrPopChange) then//如果有弹出处理方法,则执行
       OnStrPopChange(FBufArr[TempPopIndex],APopLen);
      第三步
      EnterCriticalSection(FSection);
      根据返回的长度确定PopIndex的下一个值
      LeaveCriticalSection(FSection);
    end;因为第一步和第二步的操作时间很短,所以不会造成写入线程的等待.当然这只能是适应于多个写入,一个读取的情况,如果是多个读取就不行.这主要是用在SOCKET接收时,收到数据后要立刻保存起来,而读取处理线程则允许慢一些.
      

  4.   

    个人感觉从楼主的描述当中,实现存在一定的问题,实际上读取之后的处理比较长,这个不应该跟队列有关系,读取的时候并不是加锁进去就一直锁定,直到处理完成,而应该是取走之后,马上释放,这个时候从理论上来讲,你的写入是立即完成的。首先在读取后的处理过程当中两两之间间隔比较长,大部分写入并不受阻,而读取的过程也相对写入要短得多,所以这个短时间长间隔的读取对写入应该不够成影响。但是这个样子的设计,如果写入是一个频繁事件的话,可能会导致队列的不断增长,而处理不过来,导致堆积。方便的话可以考虑多线程处理。另外,如果处理的是无序数据的话,并且操作系统平台为较新版系统,可以参考一下:Using Singly Linked Lists
      

  5.   

    上面的那个Singly Linked List,我有实现过,其他都不是问题,关键点就是那几个内存函数。unit MemFunctionUnit;interface
    uses
      windows;
    const
      MAX_NATURAL_ALIGNMENT               = sizeof(DWORD);
      MEMORY_ALLOCATION_ALIGNMENT         = 8;
      PTR_SZ                      : UINT  = sizeof(Pointer);
    type
      SIZE_T      = UINT;
      UINTPTR_T   = UINT;
      PUINTPTR_T  = ^UINTPTR_T;
      ERRNO_T     = Integer;
      function _aligned_offset_malloc_base(
            size
          , align
          , offset  : size_t
        ):  Pointer;
      function _aligned_malloc_base(
            size
          , alignment : size_t
        ):  Pointer;  function _aligned_realloc_base(
          memblock  : Pointer
        ; size
        , alignment : SIZE_T
        ):  Pointer;  function _aligned_recalloc_base(
          memblock  : Pointer
        ; count
        , size
        , alignment : SIZE_T
        ):  Pointer;
      function _aligned_offset_realloc_base(
          memblock  : Pointer
        ; size
        , align
        , offset  : SIZE_T
        ):  Pointer;
      function _aligned_offset_recalloc_base(
          memblock  : Pointer
        ; count
        , size
        , align
        , offset  : SIZE_T
        ):  Pointer;
      procedure  _aligned_free_base(
          memblock  : Pointer
        );implementation
    function IS_2_POW_N(
        X : SIZE_T
      ):  Boolean;inline;
    begin
      Result :=(((X) and (X-1)) = 0);
    end;function memmove (
              dst   : Pointer
      ; const src   : Pointer
      ;       count : SIZE_T
      ):  Pointer;
    var
      tmpSrc  : Pointer;
    begin
      result := dst;
      tmpSrc := Src;
      if    ((SIZE_T(dst) <= SIZE_T(tmpSrc))
        or  (Pchar(dst) >= (PChar(tmpSrc) + count))) then
      begin
        (*
         * Non-Overlapping Buffers
         * copy from lower addresses to higher addresses
         *)
        while (count>0) do
          begin
            dec(count);
            PChar(dst)^ := PChar(tmpSrc)^;
            Inc(Pchar(dst));
            Inc(PChar(tmpSrc));
          end;
      end
      else
      begin
        (*
         * Overlapping Buffers
         * copy from higher addresses to lower addresses
         *)
        dst     := Pchar(dst) + count - 1;
        tmpSrc  := Pchar(tmpSrc) + count - 1;    while (count>0) do
          begin
            Dec(Count);
            PChar(dst)^ := PChar(tmpSrc)^;
            dst         := PChar(dst) - 1;
            tmpSrc      := Pchar(tmpSrc) - 1;
          end;
      end;
    end;function _msize_base (
        pblock  : Pointer
      ):  size_t;
    begin
      Result :=$ffffffff;
      (* validation section *)
      if Not(pblock <> Nil) then
        Exit;
      if Not HeapLock(ProcessHeap) then
        Exit;
      try
        Result := size_t(HeapSize(ProcessHeap,0,pblock));
      finally
        HeapUnlock(ProcessHeap);
      end;
    end;function _aligned_offset_malloc_base(
        size
      , align
      , offset  : size_t
      ):  Pointer;
    var
        ptr
      , retptr
      , gap   : UINTPTR_T;
      tmpPtr  : PUINTPTR_T;
    begin
      Result := Nil;
      if Not IS_2_POW_N(align) then
        Exit;
      if Not ((offset = 0) or (offset < size)) then
        Exit;
      if align < PTR_SZ then
        align :=  PTR_SZ - 1
      else
        align := align -1;
        (* gap = number of bytes needed to round up offset to align with PTR_SZ*)
      gap     :=  (0 - offset) and (PTR_SZ -1);
      ptr     :=  uintptr_t(HeapAllocEx(PTR_SZ +gap +align +size));
      if ( ptr = uintptr_t(0)) then
        Exit;  retptr  :=  ((ptr +PTR_SZ +gap +align +offset) and (Not align))- offset;  tmpPtr  :=  Puintptr_t(retptr - gap);
      Dec(tmpPtr);
      tmpPtr^ :=  ptr;  result  := Pointer(retptr);
    end;function _aligned_malloc_base(
        size
      , alignment : size_t
      ):  Pointer;
    begin
        Result  :=  _aligned_offset_malloc_base(size, alignment, 0);
    end;function _aligned_realloc_base(
        memblock  : Pointer
      ; size
      , alignment : SIZE_T
      ):  Pointer;
    begin
        result  :=  _aligned_offset_realloc_base(memblock, size, alignment, 0);
    end;function _aligned_recalloc_base(
        memblock  : Pointer
      ; count
      , size
      , alignment : SIZE_T
      ):  Pointer;
    begin
        result  :=  _aligned_offset_recalloc_base(memblock, count, size, alignment, 0);
    end;function _aligned_offset_realloc_base(
        memblock  : Pointer
      ; size
      , align
      , offset    : SIZE_T
      ):  Pointer;
    var
        ptr
      , retptr
      , gap
      , stptr
      , diff      : uintptr_t;
        movsz
      , reqsz     : uintptr_t;
      bFree       : boolean;
      save_errno  : errno_t;
      tmpPtr      : PUINTPTR_T;
    begin
      bFree   :=  false;
      Result  :=  Nil;
      (* special cases *)
      if (memblock = Nil) then
      begin
        Result := _aligned_offset_malloc_base(size, align, offset);
        Exit;
      end;
      if (size = 0) then
      begin
        _aligned_free_base(memblock);
        Exit;
      end;  (* validation section *)
      if Not IS_2_POW_N(align) then
        Exit;
      if Not ((offset = 0) or (offset < size)) then
        Exit;  stptr := uintptr_t(memblock);  (* ptr points to the pointer to starting of the memory block *)
      stptr := (stptr or Not(PTR_SZ -1)) - PTR_SZ;  (* ptr is the pointer to the start of memory block*)
      stptr := puintptr_t (stptr)^;
      if (align < PTR_SZ ) then
         align := PTR_SZ;
      align := align -1;
      (* gap = number of bytes needed to round up offset to align with PTR_SZ*)
      gap := (0 -offset) and (PTR_SZ -1);  diff := uintptr_t(memblock) - stptr;
      (* Mov size is min of the size of data available and sizw requested.
      *)
      movsz := _msize_base(Pointer(stptr)) - (uintptr_t(memblock) - stptr);
      if movsz>size then
        movsz := size;
      reqsz := PTR_SZ +gap +align +size;  (* First check if we can expand(reducing or expanding using expand) data
       * safely, ie no data is lost. eg, reducing alignment and keeping size
       * same might result in loss of data at the tail of data block while
       * expanding.
       *
       * If no, use malloc to allocate the new data and move data.
       *
       * If yes, expand and then check if we need to move the data.
       *)
      if ((stptr +align +PTR_SZ +gap)<uintptr_t(memblock)) then
      begin
        ptr := uintptr_t(HeapAllocEx(reqsz));
        if ((ptr) = uintptr_t( 0)) then
          Exit;
        bFree := true;
      end
      else
      begin
        (* we need to save errno, which can be modified by _expand *)
        save_errno:= GetLastError;
        ptr:=uintptr_t(HeapReAllocEx(Pointer(stptr), reqsz));
        if (ptr = uintptr_t(0)) then
        begin
          SetLastError(save_errno);
          ptr := uintptr_t(HeapAllocEx(reqsz));
          if (ptr = uintptr_t( 0)) then
            Exit;
          bFree := true;
        end
        else
          stptr := ptr;
      end;    if ( (ptr = (uintptr_t(memblock) - diff))
             and ((Not( (uintptr_t(memblock) + gap +offset) and (Not align) ))<>0)) then
        //if  (ptr = (uintptr_t(memblock) - diff) )
        //     and (Not( (size_t(memblock) + gap +offset)) and (Not(align)) ) then
        begin
            Result:= memblock;
            Exit;
        end;    retptr :=((ptr +PTR_SZ +gap +align +offset) and Not align)- offset;
        memmove(Pointer(retptr), Pointer(stptr + diff), movsz);
        if ( bFree) then
            HeapfreeEx (Pointer(stptr));    //((uintptr_t *)(retptr - gap))[-1] = ptr;
        tmpPtr := Puintptr_t(retptr - gap);
        Dec(tmpPtr);
        tmpPtr^:=ptr;    Result:= Pointer(retptr);
    end;function _aligned_offset_recalloc_base(
        memblock:Pointer;
        count,
        size,
        align,
        offset:SIZE_T
        ):Pointer;
    var
      size_orig:  SIZE_T;
    begin
      //size_orig:=0;  (* ensure that (size * num) does not overflow *)  size_orig := size * count;  Result:= _aligned_offset_realloc_base(memblock, size_orig, align, offset);
    end;procedure  _aligned_free_base(memblock:Pointer);
    var
      ptr:uintptr_t ;
    begin
      if (memblock = Nil) then
        Exit;  ptr := uintptr_t(memblock);  (* ptr points to the pointer to starting of the memory block *)
      ptr := (ptr and Not (PTR_SZ -1)) - PTR_SZ;  (* ptr is the pointer to the start of memory block*)
      ptr := (Puintptr_t(ptr))^;
      HeapFreeEx(Pointer(ptr));
    end;end.
      

  6.   

      因为SOCKET在接收的时候每次RECEIVE都是将SOCKET缓冲区中的数据全部接收了,而些数据肯定会是包括了自己定义的多个包.读取的时候就是要进行分隔这些包,还有就是可能会有将包截断的情况,这里POP就不能全部弹出,TZStrPopChangeEvent=procedure (const S:String;var APopLen: Integer) of object;if Assigned(OnStrPopChange) then
       OnStrPopChange(FBufArr[TempPopIndex],APopLen);线程执行POP ,POP里调用OnStrPopChange来处理内容,OnStrPopChange是业务程序提供的,如文件处理,将收到的包合并成文件;表处理,将收到的数据组合成一个表格的形式.当然,每个业务也有自己的缓冲区.
    OnStrPopChange主要进行包业务归属判断和包分隔.但相比来说,SOCKET的RECEIVE事件收到后,总是能立刻放入缓冲区的话,对性能来说会更好一些.
    如果SOCKET的RECEIVE的速度太快,会有造成堆积的可能性,但只要分配较大的缓冲空间,或者将客户端的发送间隔调整一下,或者让客户端有个策略,如在连续发送超过10M的情况下暂停几秒.
    但是作为服务器来说,只要有数据就应该尽量快的接收下来.现在有问题是,如何才能测试比较出各种实现方式的性能差异和BUG,因为很少能达到极点,BUG会很隐晦
      

  7.   

    添加和弹出时,其实最主要的是对POPIndex,PushIndex两个值的修改,如果将对这两个值的操作全部改为原子操作,仅在写入时用临界区保护一下FBufArr[FPopIndex]:=Text;读取时就不用临界区保护,这样应该也能正确运行吧.
      

  8.   

    12楼:因为要根据处理结果来确定下一下弹出的位置,如果只处理的部分数据,则不修改弹出位置的,剩余的数据等下一次处理,如果全部处理了,则将弹出位置移动到下一个位置.SOCKET接收时,接收数据应该比处理接收数据优先.尽量不阻塞接收数据后向缓冲写数据的动作.
      

  9.   

    偷懒用的队列,完成端口.写入方Post*进去,读取方Get*,随便你如何读取。然后可以进行任意地分派。unit IOCPUnit;interface
    uses
      Windows;
    type
      TIOCP=class(TObject)
        strict private
          FHandle           : THandle;  //完成端口句柄
          FQueueThreadSize  : Integer;  //线程队列
          FManagementCount  : Integer;  //引用计数
          FIsShutingDown    : Boolean;    public
          property QueueThreads     : Integer read FQueueThreadSize; //线程队列
          property ManagementCount  : Integer read FManagementCount; //引用计数    public
          (*====================================================================*)
          (***                       IO句柄与完成端口关联                     ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数:                                                         *)
          (*    IOHandle      : IO句柄 (文件句柄,管道句柄,Socket等句柄类IO资源) *)
          (*    IOContextKey  : IO上下文关键字 (描述 IOHandle 相关的上下文数据, *)
          (*                                   结构体或者类指针等标识性信息 )   *)
          (*    Concurrent    : 并发线程数,通常为0                              *)
          (*  输出参数: (无)                                                    *)
          (*  返 回 值:  完成端口句柄                                           *)
          (*  备    注:                                                         *)
          (*--------------------------------------------------------------------*)
          function CreateIoCompletionPort(    IOHandle      : THandle
                                            ; IOContextKey  : DWORD
                                            ; Concurrent    : DWORD = 0
                                            ): THandle;
          function AssociateWith (  IOHandle      : THandle
                                  ; IOContextKey  : DWORD
                                  ) : BOOL;      (*====================================================================*)
          (***                          完成队列检索                          ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数:                                                         *)
          (*    dwMilliseconds    : 等待时间(单位:毫秒),同WaitForSingleObject   *)
          (*  输出参数:                                                         *)
          (*    BytesTransferred  : 传输完成字节数,0表示IO关闭,或者出现异常     *)
          (*    IOContextKey      : IO上下文关键字                              *)
          (*    Overlapped        : 重叠IO结构指针                              *)
          (*  返 回 值:                                                         *)
          (*    成功:True,失败/出错:False(请使用GetLastError取得具体错误)       *)
          (*  备    注:                                                         *)
          (*--------------------------------------------------------------------*)
          function GetQueuedCompletionStatus(   var BytesTransferred  : DWORD
                                              ; var IOContextKey      : DWORD
                                              ; var Overlapped        : POverlapped
                                              ;     dwMilliseconds    : DWORD
                                              ): BOOL;      (*====================================================================*)
          (***                         发送完成事件                           ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数:                                                         *)
          (*    lpBytesTransferred  : 与 GetQueuedCompletionStatus 相对应       *)
          (*    dwCompletionKey     : 与 GetQueuedCompletionStatus 相对应       *)
          (*    lpOverlapped        : 与 GetQueuedCompletionStatus 相对应       *)
          (*  输出参数: (无)                                                    *)
          (*  返回值:  成功:True,失败/出错:False(使用GetLastError取得具体错误)  *)
          (*  备注:                                                             *)
          (*--------------------------------------------------------------------*)
          function PostQueuedCompletionStatus(    lpBytesTransferred  : DWORD
                                                ; dwCompletionKey     : DWORD
                                                ; lpOverlapped        : POverlapped
                                                ): BOOL;      (*====================================================================*)
          (***                    发送一个特定的完成事件                      ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数: (无)                                                    *)
          (*  输出参数: (无)                                                    *)
          (*  返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
          (*  备    注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG)           *)
          (*--------------------------------------------------------------------*)
          function PostThreadQuitFlag : BOOL;      (*====================================================================*)
          (***                         增加引用计数                           ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数: (无)                                                    *)
          (*  输出参数: (无)                                                    *)
          (*  返 回 值: 当前引用计数                                            *)
          (*  备    注:                                                         *)
          (*--------------------------------------------------------------------*)
          function Attach             : Integer;  overload;      (*====================================================================*)
          (***                         减少引用计数                           ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数: (无)                                                    *)
          (*  输出参数: (无)                                                    *)
          (*  返 回 值: 剩余引用计数                                            *)
          (*  备    注: 减少引用计数,但不理会引用计数是否为0,请使用  Free 代替  *)
          (*--------------------------------------------------------------------*)
          function Detach             : Integer;      (*====================================================================*)
          (***         为每对列当中的每一个线程发送一个特定的完成事件         ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数: (无)                                                    *)
          (*  输出参数: (无)                                                    *)
          (*  返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
          (*  备    注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG)           *)
          (*--------------------------------------------------------------------*)
          procedure ShutDownAll;
          procedure ShutDownAllEx;      (*====================================================================*)
          (***                          减少引用计数                          ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数: (无)                                                    *)
          (*  输出参数: (无)                                                    *)
          (*  返 回 值: (无)                                                    *)
          (*  备    注: 当引用计数减少为0时,对象被自动释放                      *)
          (*--------------------------------------------------------------------*)
          procedure Free;
      

  10.   

    续上:    public
          constructor Create;
          destructor Destroy; override;
        public
          class function Attach(lpCompletion: TIOCP): TIOCP;  Overload;
      end;
    const
      SHUTDOWN_FLAG = $FFFFFFFF; // Posted to the completion port when shutting downimplementationclass function TIOCP.Attach(lpCompletion: TIOCP): TIOCP;
    var
      Management  : Integer;
    begin
      Result  :=  nil;
      if Not Assigned(lpCompletion) then
        Exit;
      Management  :=  lpCompletion.Attach;
      if Management = 1 then
        begin
          lpCompletion.Detach;
          Exit;
        end;
      Result  :=  lpCompletion;
    end;function TIOCP.CreateIoCompletionPort(    IOHandle      : Cardinal
                                            ; IOContextKey  : Cardinal
                                            ; Concurrent    : Cardinal
                                            ):  THandle;
    begin
      Result  :=  Windows.CreateIoCompletionPort(   IOHandle
                                                  , FHandle
                                                  , IOContextKey
                                                  , Concurrent
                                                  );
    end;function TIOCP.AssociateWith(IOHandle: Cardinal; IOContextKey: Cardinal):BOOL;
    begin
      result := (CreateIoCompletionPort(IOHandle,IOContextKey)=self.FHandle);
    end;function TIOCP.GetQueuedCompletionStatus(   var BytesTransferred  : Cardinal
                                              ; var IOContextKey      : Cardinal
                                              ; var Overlapped        : POverlapped
                                              ;     dwMilliseconds    : Cardinal
                                              ): BOOL;
    begin
      InterlockedIncrement(FQueueThreadSize);
      Result  :=  Windows.GetQueuedCompletionStatus(    FHandle
                                                      , BytesTransferred
                                                      , IOContextKey
                                                      , Overlapped
                                                      , dwMilliseconds
                                                      );
      InterlockedDecrement(FQueueThreadSize);
    end;function TIOCP.PostQueuedCompletionStatus(  lpBytesTransferred  : Cardinal
                                              ; dwCompletionKey     : Cardinal
                                              ; lpOverlapped        : POverlapped
                                              ): BOOL;
    begin
      if FIsShutingDown then
        begin
          Result := False;
          Exit;
        end;
      Result  :=  Windows.PostQueuedCompletionStatus(   FHandle
                                                      , lpBytesTransferred
                                                      , dwCompletionKey
                                                      , lpOverlapped
                                                      );
    end;function TIOCP.PostThreadQuitFlag:  BOOL;
    begin
      Result  :=  Windows.PostQueuedCompletionStatus(   FHandle,   0
                                              , 0
                                              , POverLapped(SHUTDOWN_FLAG)
                                              );
    end;function TIOCP.Attach : Integer;
    begin
      Result  :=  InterlockedIncrement(FManagementCount);
    end;function TIOCP.Detach;
    begin
      Result  :=  InterlockedDecrement(FManagementCount);
    end;procedure TIOCP.Free;
    begin
      if not (self  <>  nil) then
        Exit;
      if Detach > 0 then
        Exit;
      Inherited Free;
    end;constructor TIOCP.Create;
    begin
      Inherited;
      FIsShutingDown    :=  False;
      FHandle           :=  INVALID_HANDLE_VALUE;
      FQueueThreadSize  :=  0;
      FManagementCount  :=  0;
      FHandle           := Windows.CreateIoCompletionPort(  INVALID_HANDLE_VALUE
                                                          , 0
                                                          , 0
                                                          , 0
                                                          );
      InterlockedIncrement(FManagementCount);end;procedure TIOCP.ShutDownAll;
    var
      I   : Integer;
    begin  I   :=  FQueueThreadSize;
      while I > 0 do
        begin
          PostThreadQuitFlag;
          Dec(I);
        end;
      while FQueueThreadSize  > 0 do Sleep(1);
    end;procedure TIOCP.ShutDownAllEx;
    begin
      FIsShutingDown := True;
      ShutDownAll;
    end;Destructor TIOCP.Destroy;
    begin
      ShutDownAll;
      if FHandle  <>  INVALID_HANDLE_VALUE then
        CloseHandle(FHandle);
      FHandle :=  INVALID_HANDLE_VALUE;
    end;
    end.
      

  11.   

    调用方法很简单:
    写入:
    PostQueuedCompletionStatus(DWORD(Parameter1),DWORD(Parameter2),POverlapped(Parameter3))
    成功返回 Not (0),失败返回 0;
    读取:
    GetQueuedCompletionStatus(DWORD(Parameter1),DWORD(Parameter2),POverlapped(Parameter3),WaitForTime(*等待超时时间*))
    判断超时:
     GetLastError = WAIT_TIMEOUT 当WaitForTime参数设置为INFINITE表示一直等待,适合线程池调度.
    而GetQueuedCompletionStatus本身就是一个很好的线程池。只要线程使用GetQueuedCompletionStatus进行工作等待,任务的调配就可以通过PostQueuedCompletionStatus来分派给线程。只要三个参数对齐就可以。而且其中的POverlapped也同样可以当作是指针来使用。