求一个操作的相关线程池类的源码,希望大家给个示例,小弟我感谢你们!!!!!!!!!!!!!

解决方案 »

  1.   

    最简单的得用GetQueuedCompletionStatus等待来做任务线程池。
    所有线程元素调用GetQueuedCompletionStatus等待任务,有任务时使用PostQueuedCompletionStatus加入任务,其中一个线程便会得到任务,然后加以处理。unit HSSyncThread;interface
    uses
      Windows,Classes,SysUtils,iocpunit,UnitSyncProc,ExtCtrls;
    type
      TSyncThread = class(TThread)
        private
          FIOCP:TIOCP;
          FCurrentProcObject:TSyncProc;
        protected
          procedure Execute;override;
          procedure RunCurrentProc;
        public
          constructor Create(CreateSuspended: Boolean);
          destructor Destroy;override;
          procedure Shutdown;
          procedure ShutdownEx;
          function InQueue(SyncProc:TSyncProc):BOOL;
      end;
      function UserSyncProc(    ObjectProc  : TObjectProc
                              ; IsSyncCall  : Boolean
                              ):  BOOL; overload;
      function UserSyncProc(    ObjectProc  : TObjectProc1
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ):  BOOL; overload;
      function UserSyncProc(    ObjectProc  : TObjectProc2
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ): BOOL; overload;
      function UserSyncProc(    ObjectProc  : TObjectProc3
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ; Param2      : Pointer
                              ):  BOOL; overload;
      function UserSyncProc(    ObjectProc  : TObjectProc4
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ; Param2      : Pointer
                              ; Param3      : Pointer
                              ):  BOOL; overload;
      function UserSyncProc(    ObjectProc  : TObjectProc5
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ; Param2      : Pointer
                              ; Param3      : Pointer
                              ; Param4      : Pointer
                              ):  BOOL; overload;var
      SyncThread:TSyncThread = Nil;implementation//uses
    //  ActiveX;var
      LSyncThread:TSyncThread = Nil;
      function UserSyncProc(    ObjectProc  : TObjectProc
                              ; IsSyncCall  : Boolean
                              ):  BOOL;
      var
        CurrentSyncProc:TSyncProc;
      begin
        Result := False;
        if Not Assigned(ObjectProc) then
          Exit;
        if Not Assigned(SyncThread) then
          Exit;
        if Not Assigned(SyncProcPool) then
          Exit;
        CurrentSyncProc := SyncProcPool.CreateSyncProc;
        if Not Assigned(CurrentSyncProc) then
          Exit;    CurrentSyncProc^.TimeredType  := tmNone;    CurrentSyncProc^.IsSyncCall   := IsSyncCall;
        CurrentSyncProc^.InitDay      := 0;
        CurrentSyncProc^.InitTime     := 0;
        CurrentSyncProc^.ExecTimes    := 1;
        CurrentSyncProc^.ExecTime     := 0;
        CurrentSyncProc^.TimeOutTime  := 0;    //Modify It!
        CurrentSyncProc^.ParamCount   := 0;
        CurrentSyncProc^.Param0       := Nil;
        CurrentSyncProc^.Param1       := Nil;
        CurrentSyncProc^.Param2       := Nil;
        CurrentSyncProc^.Param3       := Nil;
        CurrentSyncProc^.Param4       := Nil;
        //Modify It!    CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);    result := SyncThread.InQueue(CurrentSyncProc);
        if Not Result then
          CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
      end;  function UserSyncProc(    ObjectProc  : TObjectProc1
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ):  BOOL;
      var
        CurrentSyncProc:TSyncProc;
      begin
        Result := False;
        if Not Assigned(ObjectProc) then
          Exit;
        if Not Assigned(SyncThread) then
          Exit;
        if Not Assigned(SyncProcPool) then
          Exit;
        CurrentSyncProc := SyncProcPool.CreateSyncProc;
        if Not Assigned(CurrentSyncProc) then
          Exit;    CurrentSyncProc^.TimeredType  := tmNone;    CurrentSyncProc^.IsSyncCall   := IsSyncCall;
        CurrentSyncProc^.InitDay      := 0;
        CurrentSyncProc^.InitTime     := 0;
        CurrentSyncProc^.ExecTimes    := 1;
        CurrentSyncProc^.ExecTime     := 0;
        CurrentSyncProc^.TimeOutTime  := 0;    //Modify It!
        CurrentSyncProc^.ParamCount   := 1;
        CurrentSyncProc^.Param0       := Param0;
        CurrentSyncProc^.Param1       := Nil;
        CurrentSyncProc^.Param2       := Nil;
        CurrentSyncProc^.Param3       := Nil;
        CurrentSyncProc^.Param4       := Nil;    //Modify It!
        
        CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);    result := SyncThread.InQueue(CurrentSyncProc);
        if Not Result then
          CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
      end;  function UserSyncProc(    ObjectProc  : TObjectProc2
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ):  BOOL;
      var
        CurrentSyncProc:TSyncProc;
      begin
        Result := False;
        if Not Assigned(ObjectProc) then
          Exit;
        if Not Assigned(SyncThread) then
          Exit;
        if Not Assigned(SyncProcPool) then
          Exit;
        CurrentSyncProc := SyncProcPool.CreateSyncProc;
        if Not Assigned(CurrentSyncProc) then
          Exit;    CurrentSyncProc^.TimeredType  := tmNone;    CurrentSyncProc^.IsSyncCall   := IsSyncCall;
        CurrentSyncProc^.InitDay      := 0;
        CurrentSyncProc^.InitTime     := 0;
        CurrentSyncProc^.ExecTimes    := 1;
        CurrentSyncProc^.ExecTime     := 0;
        CurrentSyncProc^.TimeOutTime  := 0;    //Modify It!
        CurrentSyncProc^.ParamCount   := 2;
        CurrentSyncProc^.Param0       := Param0;
        CurrentSyncProc^.Param1       := Param1;
        CurrentSyncProc^.Param2       := Nil;
        CurrentSyncProc^.Param3       := Nil;
        CurrentSyncProc^.Param4       := Nil;
        //Modify It!
        
        CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);    result := SyncThread.InQueue(CurrentSyncProc);
        if Not Result then
          CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
      end;  function UserSyncProc(    ObjectProc  : TObjectProc3
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ; Param2      : Pointer
                              ):  BOOL;
      var
        CurrentSyncProc:TSyncProc;
      begin
        Result := False;
        if Not Assigned(ObjectProc) then
          Exit;
        if Not Assigned(SyncThread) then
          Exit;
        if Not Assigned(SyncProcPool) then
          Exit;
        CurrentSyncProc := SyncProcPool.CreateSyncProc;
        if Not Assigned(CurrentSyncProc) then
          Exit;    CurrentSyncProc^.TimeredType  := tmNone;    CurrentSyncProc^.IsSyncCall   := IsSyncCall;
        CurrentSyncProc^.InitDay      := 0;
        CurrentSyncProc^.InitTime     := 0;
        CurrentSyncProc^.ExecTimes    := 1;
        CurrentSyncProc^.ExecTime     := 0;
        CurrentSyncProc^.TimeOutTime  := 0;    //Modify It!
        CurrentSyncProc^.ParamCount   := 3;
        CurrentSyncProc^.Param0       := Param0;
        CurrentSyncProc^.Param1       := Param1;
        CurrentSyncProc^.Param2       := Param2;
        CurrentSyncProc^.Param3       := Nil;
        CurrentSyncProc^.Param4       := Nil;
        //Modify It!
        
        CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);    result := SyncThread.InQueue(CurrentSyncProc);
        if Not Result then
          CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
      end;
      

  2.   

    //续上 unit HSSyncThread;
      function UserSyncProc(    ObjectProc  : TObjectProc4
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ; Param2      : Pointer
                              ; Param3      : Pointer
                              ):  BOOL;
      var
        CurrentSyncProc:TSyncProc;
      begin
        Result := False;
        if Not Assigned(ObjectProc) then
          Exit;
        if Not Assigned(SyncThread) then
          Exit;
        if Not Assigned(SyncProcPool) then
          Exit;
        CurrentSyncProc := SyncProcPool.CreateSyncProc;
        if Not Assigned(CurrentSyncProc) then
          Exit;    CurrentSyncProc^.TimeredType  := tmNone;    CurrentSyncProc^.IsSyncCall   := IsSyncCall;
        CurrentSyncProc^.InitDay      := 0;
        CurrentSyncProc^.InitTime     := 0;
        CurrentSyncProc^.ExecTimes    := 1;
        CurrentSyncProc^.ExecTime     := 0;
        CurrentSyncProc^.TimeOutTime  := 0;    //Modify It!
        CurrentSyncProc^.ParamCount   := 4;
        CurrentSyncProc^.Param0       := Param0;
        CurrentSyncProc^.Param1       := Param1;
        CurrentSyncProc^.Param2       := Param2;
        CurrentSyncProc^.Param3       := Param3;
        CurrentSyncProc^.Param4       := Nil;
        //Modify It!
        
        CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);    result := SyncThread.InQueue(CurrentSyncProc);
        if Not Result then
          CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
      end;  function UserSyncProc(    ObjectProc  : TObjectProc5
                              ; IsSyncCall  : Boolean
                              ; Param0      : Pointer
                              ; Param1      : Pointer
                              ; Param2      : Pointer
                              ; Param3      : Pointer
                              ; Param4      : Pointer
                              ):  BOOL;
      var
        CurrentSyncProc:TSyncProc;
      begin
        Result := False;
        if Not Assigned(ObjectProc) then
          Exit;
        if Not Assigned(SyncThread) then
          Exit;
        if Not Assigned(SyncProcPool) then
          Exit;
        CurrentSyncProc := SyncProcPool.CreateSyncProc;
        if Not Assigned(CurrentSyncProc) then
          Exit;    CurrentSyncProc^.TimeredType  := tmNone;    CurrentSyncProc^.IsSyncCall   := IsSyncCall;
        CurrentSyncProc^.InitDay      := 0;
        CurrentSyncProc^.InitTime     := 0;
        CurrentSyncProc^.ExecTimes    := 1;
        CurrentSyncProc^.ExecTime     := 0;
        CurrentSyncProc^.TimeOutTime  := 0;    //Modify It!
        CurrentSyncProc^.ParamCount   := 5;
        CurrentSyncProc^.Param0       := Param0;
        CurrentSyncProc^.Param1       := Param1;
        CurrentSyncProc^.Param2       := Param2;
        CurrentSyncProc^.Param3       := Param3;
        CurrentSyncProc^.Param4       := Param4;
        //Modify It!
        
        CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);    result := SyncThread.InQueue(CurrentSyncProc);
        if Not Result then
          CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
      end;function TSyncThread.InQueue(SyncProc:TSyncProc):BOOL;
    begin
      Result := False;
      if Not Assigned(SyncProc) then
        Exit;
      Result := FIOCP.PostQueuedCompletionStatus(DWORD(SyncProc),0,Nil);
    end;procedure TSyncThread.RunCurrentProc;
    begin
      FCurrentProcObject.Run;
    end;procedure TSyncThread.Execute;
    var
      dwResult:Boolean;
      CurrentProc:TSyncProc;
      Param0,Param1:DWORD;
      Overlapped:POVERLAPPED;
    begin
      //Toggle Comment if Needed
      //CoInitializeEx(nil, COINIT_MULTITHREADED);
      //try
        while Not Terminated do
          begin
            dwResult := FIOCP.GetQueuedCompletionStatus(Param0,Param1,Overlapped,INFINITE );
            if Not dwResult then
              break;
            if DWORD(Overlapped) = SHUTDOWN_FLAG then
              break;
            if Param0 = 0 then
              continue;
            CurrentProc := TSyncProc(Param0);
            try
              if CurrentProc.IsSyncCall then
                begin
                  try
                    FCurrentProcObject := CurrentProc;
                    Synchronize(RunCurrentProc);
                    FCurrentProcObject := Nil;
                  except              end;
                end
              else
                begin
                  try
                    CurrentProc.Run;
                  except              end;
                end;
            finally
              CurrentProc.Owner.FreeSyncProc(CurrentProc);
            end;
          end;
      //Toggle Comment if Needed
      //finally
      //  CoUninitialize;
      //end;
    end;procedure TSyncThread.Shutdown;
    begin  FIOCP.ShutDownAll;
      Terminate;  
    end;
    procedure TSyncThread.ShutdownEx;
    begin  FIOCP.ShutDownAllEx;
      Terminate;
    end;constructor TSyncThread.Create(CreateSuspended: Boolean);
    begin
      Inherited Create(true);
      FIOCP := TIOCP.Create;
      if Not CreateSuspended then
        begin
          Resume;
        end;
    end;destructor TSyncThread.Destroy;
    begin
      FIOCP.ShutDownAll;
      FIOCP.Free;
      Inherited;
    end;Initialization
      SyncThread := TSyncThread.Create(false);
      SyncThread.FreeOnTerminate := True;
    finalization
      LSyncThread := SyncThread;
      SyncThread := Nil;
      if LSyncThread<>Nil then  LSyncThread.ShutdownEx;end.
      

  3.   

    unit UnitSyncProc;interfaceuses
        Windows
      , Classes
      , DateUtils
      , SysUtils;
    type
      TObjectProc   = procedure of Object;
      TObjectProc1  = procedure (Param0:Pointer) of Object;
      TObjectProc2  = procedure (Param0:Pointer;Param1:Pointer) of Object;
      TObjectProc3  = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer) of Object;
      TObjectProc4  = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer;Param3:Pointer) of Object;
      TObjectProc5  = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer;Param3:Pointer;Param4:Pointer) of Object;  TTimeredType = (    tmNone
                        , tmOnce
                        , tmPerSecond
                        , tmPerMinute
                        , tmPerHour
                        , tmPerDay
                        , tmPerWeek
                        , tmPerMonth
                        , tmOften
                      );  TSyncProcPool = class;
      TSyncProc  = ^_SyncProc;
      _SyncProc  = packed Record
          //--Used by Pool Manager
          Owner         : TSyncProcPool;
          WhereXY       : Integer;
          Using         : Boolean;
          //----------------------
          TimeredType   : TTimeredType;
          ParamCount    : Byte;
          IsSyncCall    : Boolean;
          //定时器任务      InitDay       : Integer;//yyyymmdd
          InitTime      : Integer;//hhnnss      ExecTimes     : Integer;
          ExecTime      : TDateTime;
          TimeOutTime   : TDateTime;      Param0        : Pointer;
          Param1        : Pointer;
          Param2        : Pointer;
          Param3        : Pointer;
          Param4        : Pointer;
          ObjectedProc  : TObjectProc;
          function CheckRun:Boolean;
          procedure Run;
      end;  TSyncProcPool = class(TObject)
        const FFirstMember  : Integer = -1;
        strict private
          FMemoryList       : TThreadList;  //线程安全的内存切点列表      FLastUsing        : Integer;      //最后使用指针
          FLastMember       : Integer;      //最后指针
          FManagementCount  : Integer;      //内存池对象引用计数
        strict private
          //分配一个dwBytes大小的内存节点
          function  CreateNewSyncProc   : TSyncProc;
          //
          procedure InternalFreeSyncProc(     lpSyncProc  : TSyncProc );
        protected
          procedure RemoveSyncProcNode  (     lpSyncProc  : TSyncProc );    public
          function  CreateSyncProc       : TSyncProc;  overload;
          procedure FreeSyncProc        (     lpSyncProc  : TSyncProc );
          procedure Reduce;
          procedure FreeAllSyncProcs;
        public
          Property  ManagementCount   : Integer read FManagementCount;
        public
          procedure Free;
          function  Attach            : Integer; overload;
          function  Detach            : Integer;
          procedure Init( const newPoolSize : Integer = 0);
        public
          constructor Create;
          destructor Destroy;override;
        public
          class function Attach(lpSyncProcPool:TSyncProcPool): TSyncProcPool; overload;
      end;
    var
      SyncProcPool  : TSyncProcPool=nil;
    implementation
    //日志
    procedure WriteLog( const   s: string);
    begin
    end;function HeapAllocEx(dwBytes: DWORD): Pointer;
    begin
      Result  :=  HeapAlloc(GetProcessHeap, HEAP_ZERO_MEMORY, dwBytes);
    end;function HeapReallocEx(lpMem:Pointer;dwBytes: DWORD): Pointer;
    begin
      Result  :=  HeapRealloc(GetProcessHeap, HEAP_ZERO_MEMORY, lpMem, dwBytes);
    end;function HeapFreeEx(lpMem:Pointer):BOOL;
    begin
      Result  :=  HeapFree( GetProcessHeap, 0,  lpMem);
    end;function _SyncProc.CheckRun:Boolean;
    var
      TimeStr:String;
    begin
      Result := false;
      case self.TimeredType of
        tmNone: Result := True;
        tmOnce:
          begin
            TimeStr := FormatDatetime('yyyymmddhhnnss',Now);
            if (TimeStr >= FormatDatetime('yyyymmddhhnnss',ExecTime)) then
              begin
                if (TimeOutTime > 0.000000001) and (TimeStr > FormatDatetime('yyyymmddhhnnss',TimeOutTime)) then
                  Exit;
                Result := True;
              end;
            
          end;
        tmPerSecond:
          begin
            if SecondOf(Now)<> (InitTime mod 100) then
              Result := True;
          end;
        tmPerMinute:
          begin
            if MinuteOf(Now)<> ((InitTime div 100) mod 100) then
              Result := True;
          end;
        tmPerHour:
          begin
            if HourOf(Now)<> ((InitTime div 10000) mod 100) then
              Result := True;
          end;
        tmPerDay:
          begin
            //if SecondOf(Now)<> (InitTime mod 100) then
            //  Result := True;
          end;
        tmPerWeek:
          begin
            //if SecondOf(Now)<> (InitTime mod 100) then
            //  Result := True;
          end;
        tmPerMonth:
          begin
            //if SecondOf(Now)<> (InitTime mod 100) then
            //  Result := True;
          end;
        tmOften: Result := True;
      end;
    end;
    procedure _SyncProc.Run;
    begin
      if Not Assigned(ObjectedProc) then
        Exit;
      case ParamCount of
        1:TObjectProc1(ObjectedProc)(Param0);
        2:TObjectProc2(ObjectedProc)(Param0,Param1);
        3:TObjectProc3(ObjectedProc)(Param0,Param1,Param2);
        4:TObjectProc4(ObjectedProc)(Param0,Param1,Param2,Param3);
        5:TObjectProc5(ObjectedProc)(Param0,Param1,Param2,Param3,Param4);
        else
          ObjectedProc;
      end;
    end;class function TSyncProcPool.Attach(lpSyncProcPool: TSyncProcPool): TSyncProcPool;
    var
      ManagementCount : Integer;
    begin
      Result  :=  nil;  if Not Assigned(lpSyncProcPool) then
        Exit;  ManagementCount :=  lpSyncProcPool.Attach ;
      if ManagementCount  = 1 then
        begin
          lpSyncProcPool.Detach;
          Exit;
        end;  Result  :=  lpSyncProcPool;
    end;
    procedure TSyncProcPool.InternalFreeSyncProc( lpSyncProc  : TSyncProc);
    begin
      if Not (lpSyncProc <> nil) then
        Exit;
      HeapFreeEx(lpSyncProc);
    end;function TSyncProcPool.CreateNewSyncProc  : TSyncProc;
    begin
      Result  :=  TSyncProc(HeapAllocEx(sizeof(_SyncProc)));
      if Result<>nil then
        Result^.Owner:=self;
    end;procedure TSyncProcPool.Init(const newPoolSize: Integer );
    var
      I         : Integer;
      lpSyncProc  : TSyncProc;
    begin
      with FMemoryList.LockList do
        try
          I :=  Count;
          for I := I to newPoolSize  do
            begin
              lpSyncProc  :=  CreateNewSyncProc;
              if lpSyncProc = nil then
                break;
              lpSyncProc^.Using   :=  false;
              lpSyncProc^.WhereXY := Add(lpSyncProc);
              Inc(FLastMember);
            end;
        finally
          FMemoryList.UnlockList;
        end;
    end;procedure TSyncProcPool.FreeAllSyncProcs;
    var
      I:Integer;
    begin
      try
        with FMemoryList.LockList do
          try
            for I := 0 to Count - 1 do
              begin
                if TSyncProc(Items[I])^.Using then
                  begin
                    FreeSyncProc(Items[I]);
                  end;
              end;
          finally
            FMemoryList.UnlockList;
          end;
      Except
        on E: Exception do
        begin
          WriteLog('Exception: TActionPool.FreeAllActions; '+ E.Message);
        end;
      end;
    end;
      

  4.   

    //续上 unit UnitSyncProc;
    procedure TSyncProcPool.Reduce;
    var
      I         : Integer;
      lpSyncProc  : TSyncProc;
    begin
      with FMemoryList.LockList do
        try
          for I := Count downto 1 do
            begin
              lpSyncProc  :=  Items[I-1];
              if Not lpSyncProc^.Using then
                begin
                  Dec(FLastMember);
                  delete(I-1);
                  InternalFreeSyncProc(lpSyncProc);            end;
            end;
        finally
          FMemoryList.UnlockList;
        end;
    end;function TSyncProcPool.CreateSyncProc:TSyncProc;
    var
      lpSyncProc      : TSyncProc;
    begin
      Result      :=  nil;
      if Not Assigned(FMemoryList) then
        Exit;  with FMemoryList.LockList do
        try
          if (FLastMember<>FFirstMember) and (FLastUsing<>FLastMember) then
            begin
              Inc(FLastUsing);
              lpSyncProc  :=  Items[FLastUsing];
              Result    :=  lpSyncProc;
            end;
          if Result = nil then
            begin
              lpSyncProc  :=  CreateNewSyncProc;
              if lpSyncProc = nil then
                Exit;          lpSyncProc^.WhereXY := Add(lpSyncProc);
              Result            :=  lpSyncProc;
              Inc(FLastMember);
              FLastUsing        :=  FLastMember;
            end;      Result^.Using       :=  true;
          self.Attach;
        finally
          FMemoryList.UnlockList;
        end;end;procedure TSyncProcPool.FreeSyncProc(lpSyncProc: TSyncProc);
    var
      lpTempSyncProc  : TSyncProc;
    begin
      if Not (lpSyncProc <> nil) then
        Exit;
      if lpSyncProc^.Owner=nil then
        Exit;
      if lpSyncProc^.Owner<>self then
        begin
          lpSyncProc^.Owner.FreeSyncProc(lpSyncProc);
          Exit;
        end;
      with FMemoryList.LockList do
        try      lpSyncProc^.Using :=  false;
          if FLastUsing >= 0 then
          begin
            lpTempSyncProc  :=  Items[FLastUsing];
            if lpTempSyncProc <> lpSyncProc then
              begin
                lpTempSyncProc^.WhereXY         :=  lpSyncProc^.WhereXY;
                lpSyncProc^.WhereXY             :=  FLastUsing;
                Items[FLastUsing]             :=  lpSyncProc;
                Items[lpTempSyncProc^.WhereXY]  :=  lpTempSyncProc;
              end;
            Dec(FLastUsing);      end;
        finally
          FMemoryList.UnlockList;
          self.Free;      
        end;
    end;procedure TSyncProcPool.RemoveSyncProcNode(lpSyncProc: TSyncProc);
    var
      I   : Integer;
    begin
      if Not (lpSyncProc <> nil) then
        Exit;
      if lpSyncProc^.Owner=nil then
        Exit;
      if lpSyncProc^.Owner<>self then
        begin
          lpSyncProc^.Owner.RemoveSyncProcNode(lpSyncProc);
          Exit;
        end;
      with FMemoryList.LockList do
        try
          for I := Count - 1 downto 0 do
            begin
              if lpSyncProc = Items[I] then
                begin
                  Delete(I);
                  InternalFreeSyncProc(lpSyncProc);
                  Dec(FLastMember);
                  break;
                end;
            end;
        finally
          FMemoryList.UnlockList;
        end;
    end;function TSyncProcPool.Attach:Integer;
    begin
      Result  :=  InterlockedIncrement(FManagementCount);
    end;function TSyncProcPool.Detach:Integer;
    begin
      Result  :=  InterlockedDecrement(FManagementCount);
    end;constructor TSyncProcPool.Create;
    begin
      Inherited;
      FManagementCount  :=  1;
      FLastMember       :=  FFirstMember;
      FLastUsing        :=  FFirstMember;
      FMemoryList       :=  TThreadList.Create;
    end;destructor TSyncProcPool.Destroy;
    var
      I     : Integer;
    begin
      with FMemoryList.LockList do
        try
          for I := 0 to Count - 1 do
            begin
              InternalFreeSyncProc(Items[I]);
            end;
        finally
          FMemoryList.UnlockList;
          FMemoryList.Free;
        end;
    end;procedure TSyncProcPool.Free;
    begin
      if Detach>0 then
        Exit;
      Inherited Free;
    end;initialization
      SyncProcPool  :=  TSyncProcPool.Create;
    finalization
      SyncProcPool.Free;
    end.
      

  5.   

    (******************************************************************************)
    (*  模 块 名: IOCPUnit.Pas                                                    *)
    (*  别    名: 完成端口封装                                                             *)
    (*  作    者: Unsigned(僵哥)                                                          *)
    (*  说    明: 提供线程队列计数,以方便性能检验机制的执行,当线程队列计数较小时,         *)
    (*            通常有两种可能性,各线程的作业处理时间过长,或者中间产生死锁情况,        *)
    (*            由此可以再根据线程处理过程的其它计数,判断是否线程数量饱和或不足,       *)
    (*            线程数量饱和通常的表现为资源占用较高,否则可以适当增加处理线程            *)
    (*                                                                            *)
    (*            采用引用计数机制以保证队列的正常释放并防止内存访问违例事件的发生         *)
    (*                                                                            *)
    (******************************************************************************)
    unit IOCPUnit;interface
    uses
      Windows;
    type
      TIOCP=class(TObject)
        strict private
          FHandle           : THandle;  //完成端口句柄
          FQueueThreadSize  : Integer;  //线程队列
          FManagementCount  : Integer;  //引用计数
          FIsShutingDown    : Boolean;    public
          property QueueThreads     : Integer read FQueueThreadSize; //线程队列
          property ManagementCount  : Integer read FManagementCount; //引用计数    public
          (*====================================================================*)
          (***                       IO句柄与完成端口关联                           ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数:                                                          *)
          (*    IOHandle      : IO句柄 (文件句柄,管道句柄,Socket等句柄类IO资源)     *)
          (*    IOContextKey  : IO上下文关键字 (描述 IOHandle 相关的上下文数据,     *)
          (*                                   结构体或者类指针等标识性信息 )      *)
          (*    Concurrent    : 并发线程数,通常为0                                *)
          (*  返 回 值:  完成端口句柄                                                    *)
          (*--------------------------------------------------------------------*)
          function CreateIoCompletionPort(    IOHandle      : THandle
                                            ; IOContextKey  : DWORD
                                            ; Concurrent    : DWORD = 0
                                            ): THandle;
          function AssociateWith (  IOHandle      : THandle
                                  ; IOContextKey  : DWORD
                                  ) : BOOL;      (*====================================================================*)
          (***                          完成队列检索                               ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数:                                                          *)
          (*    dwMilliseconds    : 等待时间(单位:毫秒),同WaitForSingleObject     *)
          (*  输出参数:                                                          *)
          (*    BytesTransferred  : 传输完成字节数,0表示IO关闭,或者出现异常          *)
          (*    IOContextKey      : IO上下文关键字                                   *)
          (*    Overlapped        : 重叠IO结构指针                                   *)
          (*  返 回 值:                                                         *)
          (*    成功:True,失败/出错:False(请使用GetLastError取得具体错误)          *)
          (*--------------------------------------------------------------------*)
          function GetQueuedCompletionStatus(   var BytesTransferred  : DWORD
                                              ; var IOContextKey      : DWORD
                                              ; var Overlapped        : POverlapped
                                              ;     dwMilliseconds    : DWORD
                                              ): BOOL;      (*====================================================================*)
          (***                         发送完成事件                                 ***)
          (*--------------------------------------------------------------------*)
          (*  传入参数:                                                          *)
          (*    lpBytesTransferred  : 与 GetQueuedCompletionStatus 相对应         *)
          (*    dwCompletionKey     : 与 GetQueuedCompletionStatus 相对应         *)
          (*    lpOverlapped        : 与 GetQueuedCompletionStatus 相对应         *)
          (*                                                                    *)
          (*  返回值:  成功:True,失败/出错:False(使用GetLastError取得具体错误)      *)
          (*--------------------------------------------------------------------*)
          function PostQueuedCompletionStatus(    lpBytesTransferred  : DWORD
                                                ; dwCompletionKey     : DWORD
                                                ; lpOverlapped        : POverlapped
                                                ): BOOL;      (*====================================================================*)
          (***                    发送一个特定的完成事件                            ***)
          (*--------------------------------------------------------------------*)
          (*  返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误)     *)
          (*  备    注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG)            *)
          (*--------------------------------------------------------------------*)
          function PostThreadQuitFlag : BOOL;      (*====================================================================*)
          (***                         增加引用计数                                ***)
          (*--------------------------------------------------------------------*)
          (*  返 回 值: 当前引用计数                                                     *)
          (*--------------------------------------------------------------------*)
          function Attach             : Integer;  overload;      (*====================================================================*)
          (***                         减少引用计数                                 ***)
          (*--------------------------------------------------------------------*)
          (*  返 回 值: 剩余引用计数                                                     *)
          (*  备    注: 减少引用计数,但不理会引用计数是否为0,请使用  Free 代替          *)
          (*--------------------------------------------------------------------*)
          function Detach             : Integer;      (*====================================================================*)
          (***         为每对列当中的每一个线程发送一个特定的完成事件                 ***)
          (*--------------------------------------------------------------------*)
          (*  返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误)     *)
          (*  备    注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG)            *)
          (*--------------------------------------------------------------------*)
          procedure ShutDownAll;
          procedure ShutDownAllEx;      (*====================================================================*)
          (***                          减少引用计数                               ***)
          (*--------------------------------------------------------------------*)
          (*  备    注: 当引用计数减少为0时,对象被自动释放                               *)
          (*--------------------------------------------------------------------*)
          procedure Free;    public
          constructor Create;
          destructor Destroy; override;
        public
          class function Attach(lpCompletion: TIOCP): TIOCP;  Overload;
      end;
      

  6.   

    //续上 unit IOCPUnit;const
      SHUTDOWN_FLAG = $FFFFFFFF; // Posted to the completion port when shutting down
    (******************************************************************************)
    (*                                                                            *)
    (*                             <<以下为实现部分>>                               *)
    (*                                                                            *)
    (******************************************************************************)
    (*  模 块 名: 完成端口封装                                                              *)
    (*  作    者: Unsigned(僵哥)                                                           *)
    (*  说    明: 提供线程队列计数,以方便性能检验机制的执行,当线程队列计数较小时,          *)
    (*            通常有两种可能性,各线程的作业处理时间过长,或者中间产生死锁情况,         *)
    (*            由此可以再根据线程处理过程的其它计数,判断是否线程数量饱和或不足,        *)
    (*            线程数量饱和通常的表现为资源占用较高,否则可以适当增加处理线程            *)
    (*                                                                            *)
    (*            采用引用计数机制以保证队列的正常释放并防止内存访问违例事件的发生         *)
    (*  备    注:                                                                  *)
    (******************************************************************************)
    implementationclass function TIOCP.Attach(lpCompletion: TIOCP): TIOCP;
    var
      Management  : Integer;
    begin
      Result  :=  nil;
      if Not Assigned(lpCompletion) then
        Exit;
      Management  :=  lpCompletion.Attach;
      if Management = 1 then
        begin
          lpCompletion.Detach;
          Exit;
        end;
      Result  :=  lpCompletion;
    end;function TIOCP.CreateIoCompletionPort(    IOHandle      : Cardinal
                                            ; IOContextKey  : Cardinal
                                            ; Concurrent    : Cardinal
                                            ):  THandle;
    begin
      Result  :=  Windows.CreateIoCompletionPort(   IOHandle
                                                  , FHandle
                                                  , IOContextKey
                                                  , Concurrent
                                                  );
    end;function TIOCP.AssociateWith(IOHandle: Cardinal; IOContextKey: Cardinal):BOOL;
    begin
      result := (CreateIoCompletionPort(IOHandle,IOContextKey)=self.FHandle);
    end;function TIOCP.GetQueuedCompletionStatus(   var BytesTransferred  : Cardinal
                                              ; var IOContextKey      : Cardinal
                                              ; var Overlapped        : POverlapped
                                              ;     dwMilliseconds    : Cardinal
                                              ): BOOL;
    begin
      InterlockedIncrement(FQueueThreadSize);
      Result  :=  Windows.GetQueuedCompletionStatus(    FHandle
                                                      , BytesTransferred
                                                      , IOContextKey
                                                      , Overlapped
                                                      , dwMilliseconds
                                                      );
      InterlockedDecrement(FQueueThreadSize);
    end;function TIOCP.PostQueuedCompletionStatus(  lpBytesTransferred  : Cardinal
                                              ; dwCompletionKey     : Cardinal
                                              ; lpOverlapped        : POverlapped
                                              ): BOOL;
    begin
      if FIsShutingDown then
        begin
          Result := False;
          Exit;
        end;
      Result  :=  Windows.PostQueuedCompletionStatus(   FHandle
                                                      , lpBytesTransferred
                                                      , dwCompletionKey
                                                      , lpOverlapped
                                                      );
    end;function TIOCP.PostThreadQuitFlag:  BOOL;
    begin
      Result  :=  Windows.PostQueuedCompletionStatus(   FHandle,   0
                                              , 0
                                              , POverLapped(SHUTDOWN_FLAG)
                                              );
    end;function TIOCP.Attach : Integer;
    begin
      Result  :=  InterlockedIncrement(FManagementCount);
    end;function TIOCP.Detach;
    begin
      Result  :=  InterlockedDecrement(FManagementCount);
    end;procedure TIOCP.Free;
    begin
      if not (self  <>  nil) then
        Exit;
      if Detach > 0 then
        Exit;
      Inherited Free;
    end;constructor TIOCP.Create;
    begin
      Inherited;
      FIsShutingDown    :=  False;
      FHandle           :=  INVALID_HANDLE_VALUE;
      FQueueThreadSize  :=  0;
      FManagementCount  :=  0;
      FHandle           := Windows.CreateIoCompletionPort(  INVALID_HANDLE_VALUE
                                                          , 0
                                                          , 0
                                                          , 0
                                                          );
      InterlockedIncrement(FManagementCount);end;procedure TIOCP.ShutDownAll;
    var
      I   : Integer;
    begin  I   :=  FQueueThreadSize;
      while I > 0 do
        begin
          PostThreadQuitFlag;
          Dec(I);
        end;
      //while FQueueThreadSize  > 0 do Sleep(1);
    end;procedure TIOCP.ShutDownAllEx;
    begin
      FIsShutingDown := True;
      ShutDownAll;
    end;Destructor TIOCP.Destroy;
    begin
      ShutDownAll;
      if FHandle  <>  INVALID_HANDLE_VALUE then
        CloseHandle(FHandle);
      FHandle :=  INVALID_HANDLE_VALUE;
    end;
    end.
      

  7.   

    TSyncThread 默认会启来一个,线程池,自然需要多个,可以另外再开,记得杀死...UserSyncProc可以用来增加任务,第一个参数为被调用的,即实际的处理函数,第二个参数是告诉线程是否需要切换到主线程去执行(是否需要同步),后面参数依据实际处理业务的函数有多少个参数就增加多少个(当前设计最多只支持5个参数),所有参数需要使用Pointer类型(void *),对于一些32位的类型比如Integer可以强制转换,在处理的时候再转换回来即可。但是对于结构体之类的,则需要使用指针类型。比如String,最好new一个PString,处理完之后释放掉即可。
      

  8.   

    上面是自己定义的,另外也可以参考一个系统线程使用的示例:
    unit ThreadPoolUnit;interfaceuses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, ExtCtrls;type
      TForm3 = class(TForm)
        PaintBox1: TPaintBox;
        Button1: TButton;
        ListBox1: TListBox;
        procedure Button1Click(Sender: TObject);
      private
        type
          TWorkerColor = class
            FThreadID: Integer;
            FColor: TColor;
            FForm: TForm3;
            procedure PaintLines(Sender: TObject);
            procedure PaintLine;
            constructor Create(AForm: TForm3; AColor: TColor);
          end;
        var
          FIndex: Integer;
      public
        { Public declarations }
      end;  TObjectHelper = class helper for TObject  end;  TThreadPool = class
      private
        type
          TUserWorkItem = class
            FSender: TObject;
            FWorkerEvent: TNotifyEvent;
          end;
        class procedure QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent; Flags: ULONG); overload; static;
      public
        class procedure QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent); overload; static;
        class procedure QueueIOWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent); static;
        class procedure QueueUIWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent); static;
      end;var
      Form3: TForm3;
      ThreadPool: TThreadPool;implementation{$R *.dfm}const
      WT_EXECUTEDEFAULT       = ULONG($00000000);
      WT_EXECUTEINIOTHREAD    = ULONG($00000001);
      WT_EXECUTEINUITHREAD    = ULONG($00000002);
      WT_EXECUTEINWAITTHREAD  = ULONG($00000004);
      WT_EXECUTEONLYONCE      = ULONG($00000008);
      WT_EXECUTEINTIMERTHREAD = ULONG($00000020);
      WT_EXECUTELONGFUNCTION  = ULONG($00000010);
      WT_EXECUTEINPERSISTENTIOTHREAD  = ULONG($00000040);
      WT_EXECUTEINPERSISTENTTHREAD = ULONG($00000080);
      WT_TRANSFER_IMPERSONATION = ULONG($00000100);function QueueUserWorkItem (func: TThreadStartRoutine; Context: Pointer; Flags: ULONG): BOOL; stdcall; external kernel32 name 'QueueUserWorkItem';function InternalThreadFunction(lpThreadParameter: Pointer): Integer; stdcall;
    begin
      Result := 0;
      try
        try
          with TThreadPool.TUserWorkItem(lpThreadParameter) do
            if Assigned(FWorkerEvent) then
              FWorkerEvent(FSender);
        finally
          TThreadPool.TUserWorkItem(lpThreadParameter).Free;
        end;
      except  end;
    end;{ TThreadPool }class procedure TThreadPool.QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent);
    begin
      QueueWorkItem(Sender, WorkerEvent, WT_EXECUTEDEFAULT);
    end;class procedure TThreadPool.QueueIOWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent);
    begin
      QueueWorkItem(Sender, WorkerEvent, WT_EXECUTEINIOTHREAD);
    end;class procedure TThreadPool.QueueUIWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent);
    begin
      QueueWorkItem(Sender, WorkerEvent, WT_EXECUTEINUITHREAD);
    end;class procedure TThreadPool.QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent; Flags: ULONG);
    var
      WorkItem: TUserWorkItem;
    begin
      if Assigned(WorkerEvent) then
      begin
        IsMultiThread := True;
        WorkItem := TUserWorkItem.Create;
        try
          WorkItem.FWorkerEvent := WorkerEvent;
          WorkItem.FSender := Sender;
          if not QueueUserWorkItem(InternalThreadFunction, WorkItem, Flags) then
            RaiseLastOSError;
        except
          WorkItem.Free;
          raise;
        end;
     end;
    end;procedure TForm3.Button1Click(Sender: TObject);
    begin
      FIndex := PaintBox1.Height;
      PaintBox1.Repaint;
      ListBox1.Items.Clear;
      TWorkerColor.Create(Self, clBlue);
      TWorkerColor.Create(Self, clRed);
      TWorkerColor.Create(Self, clYellow);
      TWorkerColor.Create(Self, clLime);
      TWorkerColor.Create(Self, clFuchsia);
      TWorkerColor.Create(Self, clTeal);
    end;{ TForm3.TWorkerColor }constructor TForm3.TWorkerColor.Create(AForm: TForm3; AColor: TColor);
    begin
      FForm := AForm;
      FColor := AColor;
      TThreadPool.QueueWorkItem(Self, PaintLines);
    end;procedure TForm3.TWorkerColor.PaintLines(Sender: TObject);
    var
      I: Integer;
    begin
      FThreadID := GetCurrentThreadID;
      for I := 0 to 9 do
      begin
        PaintLine;
        //TThread.Synchronize(nil, PaintLine);
        Sleep(100);
      end;
      Destroy;
    end;procedure TForm3.TWorkerColor.PaintLine;
    begin
      FForm.PaintBox1.Canvas.Lock;
      try
        FForm.ListBox1.Items.Add(IntToStr(FThreadID));
        with FForm.PaintBox1 do
        begin
          Canvas.Pen.Color := FColor;
          Canvas.Polyline([Point(0, FForm.FIndex), Point(Width, FForm.FIndex)]);
          Dec(FForm.FIndex);
          if FForm.FIndex <= 0 then
            FForm.FIndex := 0;
        end;
      finally
        FForm.PaintBox1.Canvas.Unlock;
      end;
    end;end.
      

  9.   

    厉害可惜没注释   我想了解原理   如果换了 linux 系统我就不 知道怎么用了!
      

  10.   

    另外想请教僵哥对Service Broker的理解
      

  11.   

    对象池跟线程池是两个概念。关键看如何理解,其实都是用完以后收回到池当中进行闲置处理,需用时再次使用。\关于Broker,Agent,Adapter的比较,下面是个人的理解:
    Adapter:主要是将后面复杂多样的接口采用统一的形式进行传递,相当于接口翻译,不影响任何业务。
    Agent:代理,可提供特定的接口协议。通常在一些Client无法直接到达或者无法直接访问或者不允许直接访问Server的一些服务时,通过一个Agent转发甚至是过滤,而进行访问,作为Agent,可以对相应的请求及服务进行数据上的篡改,以适应业务需求。对服务提供者通常是特定的或者由调用者指定。
    Broker:一个全权的代理,可以针对服务提供者进行决策性选择,并且可以不告知调用者(与调用者之间的可信度相当高)。比如均衡网关。单纯的Broker不做非必要性的数据篡改(比如UDP转发之后,源地址等不得不篡改)。