求一个操作的相关线程池类的源码,希望大家给个示例,小弟我感谢你们!!!!!!!!!!!!!
解决方案 »
- webbrowser为什么刷新不了页面,内有代码
- Edit组件为空时运算出错如何解决?
- adoquery.delete的用法
- MSCOMM组件无法发送某些字符,急!!!
- 谁有 delphi 大整数运算库程序啊(用字符串实现很大的字符加减乘除),发个给我好吗?急死俺了!!!
- 如何用FastReport 打印多文档字段 (这个字段存的是备注内容)
- 如何查看一个进程的进程树以及一些信息和结束进程树
- 使用TADODataSet.Append插入5000笔资料,每一笔资料除了一个Field从1到5000
- DBGrid问题
- INI问题:如何删除里面的数据.高手问题,在线等待!。×××××
- 求cxgrid的详细使用方法和技巧集。
- delphi listview选中问题
所有线程元素调用GetQueuedCompletionStatus等待任务,有任务时使用PostQueuedCompletionStatus加入任务,其中一个线程便会得到任务,然后加以处理。unit HSSyncThread;interface
uses
Windows,Classes,SysUtils,iocpunit,UnitSyncProc,ExtCtrls;
type
TSyncThread = class(TThread)
private
FIOCP:TIOCP;
FCurrentProcObject:TSyncProc;
protected
procedure Execute;override;
procedure RunCurrentProc;
public
constructor Create(CreateSuspended: Boolean);
destructor Destroy;override;
procedure Shutdown;
procedure ShutdownEx;
function InQueue(SyncProc:TSyncProc):BOOL;
end;
function UserSyncProc( ObjectProc : TObjectProc
; IsSyncCall : Boolean
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc1
; IsSyncCall : Boolean
; Param0 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc2
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc3
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc4
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc5
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
; Param4 : Pointer
): BOOL; overload;var
SyncThread:TSyncThread = Nil;implementation//uses
// ActiveX;var
LSyncThread:TSyncThread = Nil;
function UserSyncProc( ObjectProc : TObjectProc
; IsSyncCall : Boolean
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit; CurrentSyncProc^.TimeredType := tmNone; CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0; //Modify It!
CurrentSyncProc^.ParamCount := 0;
CurrentSyncProc^.Param0 := Nil;
CurrentSyncProc^.Param1 := Nil;
CurrentSyncProc^.Param2 := Nil;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;
//Modify It! CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc); result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end; function UserSyncProc( ObjectProc : TObjectProc1
; IsSyncCall : Boolean
; Param0 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit; CurrentSyncProc^.TimeredType := tmNone; CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0; //Modify It!
CurrentSyncProc^.ParamCount := 1;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Nil;
CurrentSyncProc^.Param2 := Nil;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil; //Modify It!
CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc); result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end; function UserSyncProc( ObjectProc : TObjectProc2
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit; CurrentSyncProc^.TimeredType := tmNone; CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0; //Modify It!
CurrentSyncProc^.ParamCount := 2;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Nil;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;
//Modify It!
CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc); result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end; function UserSyncProc( ObjectProc : TObjectProc3
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit; CurrentSyncProc^.TimeredType := tmNone; CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0; //Modify It!
CurrentSyncProc^.ParamCount := 3;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Param2;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;
//Modify It!
CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc); result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;
function UserSyncProc( ObjectProc : TObjectProc4
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit; CurrentSyncProc^.TimeredType := tmNone; CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0; //Modify It!
CurrentSyncProc^.ParamCount := 4;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Param2;
CurrentSyncProc^.Param3 := Param3;
CurrentSyncProc^.Param4 := Nil;
//Modify It!
CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc); result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end; function UserSyncProc( ObjectProc : TObjectProc5
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
; Param4 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit; CurrentSyncProc^.TimeredType := tmNone; CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0; //Modify It!
CurrentSyncProc^.ParamCount := 5;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Param2;
CurrentSyncProc^.Param3 := Param3;
CurrentSyncProc^.Param4 := Param4;
//Modify It!
CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc); result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;function TSyncThread.InQueue(SyncProc:TSyncProc):BOOL;
begin
Result := False;
if Not Assigned(SyncProc) then
Exit;
Result := FIOCP.PostQueuedCompletionStatus(DWORD(SyncProc),0,Nil);
end;procedure TSyncThread.RunCurrentProc;
begin
FCurrentProcObject.Run;
end;procedure TSyncThread.Execute;
var
dwResult:Boolean;
CurrentProc:TSyncProc;
Param0,Param1:DWORD;
Overlapped:POVERLAPPED;
begin
//Toggle Comment if Needed
//CoInitializeEx(nil, COINIT_MULTITHREADED);
//try
while Not Terminated do
begin
dwResult := FIOCP.GetQueuedCompletionStatus(Param0,Param1,Overlapped,INFINITE );
if Not dwResult then
break;
if DWORD(Overlapped) = SHUTDOWN_FLAG then
break;
if Param0 = 0 then
continue;
CurrentProc := TSyncProc(Param0);
try
if CurrentProc.IsSyncCall then
begin
try
FCurrentProcObject := CurrentProc;
Synchronize(RunCurrentProc);
FCurrentProcObject := Nil;
except end;
end
else
begin
try
CurrentProc.Run;
except end;
end;
finally
CurrentProc.Owner.FreeSyncProc(CurrentProc);
end;
end;
//Toggle Comment if Needed
//finally
// CoUninitialize;
//end;
end;procedure TSyncThread.Shutdown;
begin FIOCP.ShutDownAll;
Terminate;
end;
procedure TSyncThread.ShutdownEx;
begin FIOCP.ShutDownAllEx;
Terminate;
end;constructor TSyncThread.Create(CreateSuspended: Boolean);
begin
Inherited Create(true);
FIOCP := TIOCP.Create;
if Not CreateSuspended then
begin
Resume;
end;
end;destructor TSyncThread.Destroy;
begin
FIOCP.ShutDownAll;
FIOCP.Free;
Inherited;
end;Initialization
SyncThread := TSyncThread.Create(false);
SyncThread.FreeOnTerminate := True;
finalization
LSyncThread := SyncThread;
SyncThread := Nil;
if LSyncThread<>Nil then LSyncThread.ShutdownEx;end.
Windows
, Classes
, DateUtils
, SysUtils;
type
TObjectProc = procedure of Object;
TObjectProc1 = procedure (Param0:Pointer) of Object;
TObjectProc2 = procedure (Param0:Pointer;Param1:Pointer) of Object;
TObjectProc3 = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer) of Object;
TObjectProc4 = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer;Param3:Pointer) of Object;
TObjectProc5 = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer;Param3:Pointer;Param4:Pointer) of Object; TTimeredType = ( tmNone
, tmOnce
, tmPerSecond
, tmPerMinute
, tmPerHour
, tmPerDay
, tmPerWeek
, tmPerMonth
, tmOften
); TSyncProcPool = class;
TSyncProc = ^_SyncProc;
_SyncProc = packed Record
//--Used by Pool Manager
Owner : TSyncProcPool;
WhereXY : Integer;
Using : Boolean;
//----------------------
TimeredType : TTimeredType;
ParamCount : Byte;
IsSyncCall : Boolean;
//定时器任务 InitDay : Integer;//yyyymmdd
InitTime : Integer;//hhnnss ExecTimes : Integer;
ExecTime : TDateTime;
TimeOutTime : TDateTime; Param0 : Pointer;
Param1 : Pointer;
Param2 : Pointer;
Param3 : Pointer;
Param4 : Pointer;
ObjectedProc : TObjectProc;
function CheckRun:Boolean;
procedure Run;
end; TSyncProcPool = class(TObject)
const FFirstMember : Integer = -1;
strict private
FMemoryList : TThreadList; //线程安全的内存切点列表 FLastUsing : Integer; //最后使用指针
FLastMember : Integer; //最后指针
FManagementCount : Integer; //内存池对象引用计数
strict private
//分配一个dwBytes大小的内存节点
function CreateNewSyncProc : TSyncProc;
//
procedure InternalFreeSyncProc( lpSyncProc : TSyncProc );
protected
procedure RemoveSyncProcNode ( lpSyncProc : TSyncProc ); public
function CreateSyncProc : TSyncProc; overload;
procedure FreeSyncProc ( lpSyncProc : TSyncProc );
procedure Reduce;
procedure FreeAllSyncProcs;
public
Property ManagementCount : Integer read FManagementCount;
public
procedure Free;
function Attach : Integer; overload;
function Detach : Integer;
procedure Init( const newPoolSize : Integer = 0);
public
constructor Create;
destructor Destroy;override;
public
class function Attach(lpSyncProcPool:TSyncProcPool): TSyncProcPool; overload;
end;
var
SyncProcPool : TSyncProcPool=nil;
implementation
//日志
procedure WriteLog( const s: string);
begin
end;function HeapAllocEx(dwBytes: DWORD): Pointer;
begin
Result := HeapAlloc(GetProcessHeap, HEAP_ZERO_MEMORY, dwBytes);
end;function HeapReallocEx(lpMem:Pointer;dwBytes: DWORD): Pointer;
begin
Result := HeapRealloc(GetProcessHeap, HEAP_ZERO_MEMORY, lpMem, dwBytes);
end;function HeapFreeEx(lpMem:Pointer):BOOL;
begin
Result := HeapFree( GetProcessHeap, 0, lpMem);
end;function _SyncProc.CheckRun:Boolean;
var
TimeStr:String;
begin
Result := false;
case self.TimeredType of
tmNone: Result := True;
tmOnce:
begin
TimeStr := FormatDatetime('yyyymmddhhnnss',Now);
if (TimeStr >= FormatDatetime('yyyymmddhhnnss',ExecTime)) then
begin
if (TimeOutTime > 0.000000001) and (TimeStr > FormatDatetime('yyyymmddhhnnss',TimeOutTime)) then
Exit;
Result := True;
end;
end;
tmPerSecond:
begin
if SecondOf(Now)<> (InitTime mod 100) then
Result := True;
end;
tmPerMinute:
begin
if MinuteOf(Now)<> ((InitTime div 100) mod 100) then
Result := True;
end;
tmPerHour:
begin
if HourOf(Now)<> ((InitTime div 10000) mod 100) then
Result := True;
end;
tmPerDay:
begin
//if SecondOf(Now)<> (InitTime mod 100) then
// Result := True;
end;
tmPerWeek:
begin
//if SecondOf(Now)<> (InitTime mod 100) then
// Result := True;
end;
tmPerMonth:
begin
//if SecondOf(Now)<> (InitTime mod 100) then
// Result := True;
end;
tmOften: Result := True;
end;
end;
procedure _SyncProc.Run;
begin
if Not Assigned(ObjectedProc) then
Exit;
case ParamCount of
1:TObjectProc1(ObjectedProc)(Param0);
2:TObjectProc2(ObjectedProc)(Param0,Param1);
3:TObjectProc3(ObjectedProc)(Param0,Param1,Param2);
4:TObjectProc4(ObjectedProc)(Param0,Param1,Param2,Param3);
5:TObjectProc5(ObjectedProc)(Param0,Param1,Param2,Param3,Param4);
else
ObjectedProc;
end;
end;class function TSyncProcPool.Attach(lpSyncProcPool: TSyncProcPool): TSyncProcPool;
var
ManagementCount : Integer;
begin
Result := nil; if Not Assigned(lpSyncProcPool) then
Exit; ManagementCount := lpSyncProcPool.Attach ;
if ManagementCount = 1 then
begin
lpSyncProcPool.Detach;
Exit;
end; Result := lpSyncProcPool;
end;
procedure TSyncProcPool.InternalFreeSyncProc( lpSyncProc : TSyncProc);
begin
if Not (lpSyncProc <> nil) then
Exit;
HeapFreeEx(lpSyncProc);
end;function TSyncProcPool.CreateNewSyncProc : TSyncProc;
begin
Result := TSyncProc(HeapAllocEx(sizeof(_SyncProc)));
if Result<>nil then
Result^.Owner:=self;
end;procedure TSyncProcPool.Init(const newPoolSize: Integer );
var
I : Integer;
lpSyncProc : TSyncProc;
begin
with FMemoryList.LockList do
try
I := Count;
for I := I to newPoolSize do
begin
lpSyncProc := CreateNewSyncProc;
if lpSyncProc = nil then
break;
lpSyncProc^.Using := false;
lpSyncProc^.WhereXY := Add(lpSyncProc);
Inc(FLastMember);
end;
finally
FMemoryList.UnlockList;
end;
end;procedure TSyncProcPool.FreeAllSyncProcs;
var
I:Integer;
begin
try
with FMemoryList.LockList do
try
for I := 0 to Count - 1 do
begin
if TSyncProc(Items[I])^.Using then
begin
FreeSyncProc(Items[I]);
end;
end;
finally
FMemoryList.UnlockList;
end;
Except
on E: Exception do
begin
WriteLog('Exception: TActionPool.FreeAllActions; '+ E.Message);
end;
end;
end;
procedure TSyncProcPool.Reduce;
var
I : Integer;
lpSyncProc : TSyncProc;
begin
with FMemoryList.LockList do
try
for I := Count downto 1 do
begin
lpSyncProc := Items[I-1];
if Not lpSyncProc^.Using then
begin
Dec(FLastMember);
delete(I-1);
InternalFreeSyncProc(lpSyncProc); end;
end;
finally
FMemoryList.UnlockList;
end;
end;function TSyncProcPool.CreateSyncProc:TSyncProc;
var
lpSyncProc : TSyncProc;
begin
Result := nil;
if Not Assigned(FMemoryList) then
Exit; with FMemoryList.LockList do
try
if (FLastMember<>FFirstMember) and (FLastUsing<>FLastMember) then
begin
Inc(FLastUsing);
lpSyncProc := Items[FLastUsing];
Result := lpSyncProc;
end;
if Result = nil then
begin
lpSyncProc := CreateNewSyncProc;
if lpSyncProc = nil then
Exit; lpSyncProc^.WhereXY := Add(lpSyncProc);
Result := lpSyncProc;
Inc(FLastMember);
FLastUsing := FLastMember;
end; Result^.Using := true;
self.Attach;
finally
FMemoryList.UnlockList;
end;end;procedure TSyncProcPool.FreeSyncProc(lpSyncProc: TSyncProc);
var
lpTempSyncProc : TSyncProc;
begin
if Not (lpSyncProc <> nil) then
Exit;
if lpSyncProc^.Owner=nil then
Exit;
if lpSyncProc^.Owner<>self then
begin
lpSyncProc^.Owner.FreeSyncProc(lpSyncProc);
Exit;
end;
with FMemoryList.LockList do
try lpSyncProc^.Using := false;
if FLastUsing >= 0 then
begin
lpTempSyncProc := Items[FLastUsing];
if lpTempSyncProc <> lpSyncProc then
begin
lpTempSyncProc^.WhereXY := lpSyncProc^.WhereXY;
lpSyncProc^.WhereXY := FLastUsing;
Items[FLastUsing] := lpSyncProc;
Items[lpTempSyncProc^.WhereXY] := lpTempSyncProc;
end;
Dec(FLastUsing); end;
finally
FMemoryList.UnlockList;
self.Free;
end;
end;procedure TSyncProcPool.RemoveSyncProcNode(lpSyncProc: TSyncProc);
var
I : Integer;
begin
if Not (lpSyncProc <> nil) then
Exit;
if lpSyncProc^.Owner=nil then
Exit;
if lpSyncProc^.Owner<>self then
begin
lpSyncProc^.Owner.RemoveSyncProcNode(lpSyncProc);
Exit;
end;
with FMemoryList.LockList do
try
for I := Count - 1 downto 0 do
begin
if lpSyncProc = Items[I] then
begin
Delete(I);
InternalFreeSyncProc(lpSyncProc);
Dec(FLastMember);
break;
end;
end;
finally
FMemoryList.UnlockList;
end;
end;function TSyncProcPool.Attach:Integer;
begin
Result := InterlockedIncrement(FManagementCount);
end;function TSyncProcPool.Detach:Integer;
begin
Result := InterlockedDecrement(FManagementCount);
end;constructor TSyncProcPool.Create;
begin
Inherited;
FManagementCount := 1;
FLastMember := FFirstMember;
FLastUsing := FFirstMember;
FMemoryList := TThreadList.Create;
end;destructor TSyncProcPool.Destroy;
var
I : Integer;
begin
with FMemoryList.LockList do
try
for I := 0 to Count - 1 do
begin
InternalFreeSyncProc(Items[I]);
end;
finally
FMemoryList.UnlockList;
FMemoryList.Free;
end;
end;procedure TSyncProcPool.Free;
begin
if Detach>0 then
Exit;
Inherited Free;
end;initialization
SyncProcPool := TSyncProcPool.Create;
finalization
SyncProcPool.Free;
end.
(* 模 块 名: IOCPUnit.Pas *)
(* 别 名: 完成端口封装 *)
(* 作 者: Unsigned(僵哥) *)
(* 说 明: 提供线程队列计数,以方便性能检验机制的执行,当线程队列计数较小时, *)
(* 通常有两种可能性,各线程的作业处理时间过长,或者中间产生死锁情况, *)
(* 由此可以再根据线程处理过程的其它计数,判断是否线程数量饱和或不足, *)
(* 线程数量饱和通常的表现为资源占用较高,否则可以适当增加处理线程 *)
(* *)
(* 采用引用计数机制以保证队列的正常释放并防止内存访问违例事件的发生 *)
(* *)
(******************************************************************************)
unit IOCPUnit;interface
uses
Windows;
type
TIOCP=class(TObject)
strict private
FHandle : THandle; //完成端口句柄
FQueueThreadSize : Integer; //线程队列
FManagementCount : Integer; //引用计数
FIsShutingDown : Boolean; public
property QueueThreads : Integer read FQueueThreadSize; //线程队列
property ManagementCount : Integer read FManagementCount; //引用计数 public
(*====================================================================*)
(*** IO句柄与完成端口关联 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* IOHandle : IO句柄 (文件句柄,管道句柄,Socket等句柄类IO资源) *)
(* IOContextKey : IO上下文关键字 (描述 IOHandle 相关的上下文数据, *)
(* 结构体或者类指针等标识性信息 ) *)
(* Concurrent : 并发线程数,通常为0 *)
(* 返 回 值: 完成端口句柄 *)
(*--------------------------------------------------------------------*)
function CreateIoCompletionPort( IOHandle : THandle
; IOContextKey : DWORD
; Concurrent : DWORD = 0
): THandle;
function AssociateWith ( IOHandle : THandle
; IOContextKey : DWORD
) : BOOL; (*====================================================================*)
(*** 完成队列检索 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* dwMilliseconds : 等待时间(单位:毫秒),同WaitForSingleObject *)
(* 输出参数: *)
(* BytesTransferred : 传输完成字节数,0表示IO关闭,或者出现异常 *)
(* IOContextKey : IO上下文关键字 *)
(* Overlapped : 重叠IO结构指针 *)
(* 返 回 值: *)
(* 成功:True,失败/出错:False(请使用GetLastError取得具体错误) *)
(*--------------------------------------------------------------------*)
function GetQueuedCompletionStatus( var BytesTransferred : DWORD
; var IOContextKey : DWORD
; var Overlapped : POverlapped
; dwMilliseconds : DWORD
): BOOL; (*====================================================================*)
(*** 发送完成事件 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* lpBytesTransferred : 与 GetQueuedCompletionStatus 相对应 *)
(* dwCompletionKey : 与 GetQueuedCompletionStatus 相对应 *)
(* lpOverlapped : 与 GetQueuedCompletionStatus 相对应 *)
(* *)
(* 返回值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(*--------------------------------------------------------------------*)
function PostQueuedCompletionStatus( lpBytesTransferred : DWORD
; dwCompletionKey : DWORD
; lpOverlapped : POverlapped
): BOOL; (*====================================================================*)
(*** 发送一个特定的完成事件 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备 注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG) *)
(*--------------------------------------------------------------------*)
function PostThreadQuitFlag : BOOL; (*====================================================================*)
(*** 增加引用计数 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 当前引用计数 *)
(*--------------------------------------------------------------------*)
function Attach : Integer; overload; (*====================================================================*)
(*** 减少引用计数 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 剩余引用计数 *)
(* 备 注: 减少引用计数,但不理会引用计数是否为0,请使用 Free 代替 *)
(*--------------------------------------------------------------------*)
function Detach : Integer; (*====================================================================*)
(*** 为每对列当中的每一个线程发送一个特定的完成事件 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备 注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG) *)
(*--------------------------------------------------------------------*)
procedure ShutDownAll;
procedure ShutDownAllEx; (*====================================================================*)
(*** 减少引用计数 ***)
(*--------------------------------------------------------------------*)
(* 备 注: 当引用计数减少为0时,对象被自动释放 *)
(*--------------------------------------------------------------------*)
procedure Free; public
constructor Create;
destructor Destroy; override;
public
class function Attach(lpCompletion: TIOCP): TIOCP; Overload;
end;
SHUTDOWN_FLAG = $FFFFFFFF; // Posted to the completion port when shutting down
(******************************************************************************)
(* *)
(* <<以下为实现部分>> *)
(* *)
(******************************************************************************)
(* 模 块 名: 完成端口封装 *)
(* 作 者: Unsigned(僵哥) *)
(* 说 明: 提供线程队列计数,以方便性能检验机制的执行,当线程队列计数较小时, *)
(* 通常有两种可能性,各线程的作业处理时间过长,或者中间产生死锁情况, *)
(* 由此可以再根据线程处理过程的其它计数,判断是否线程数量饱和或不足, *)
(* 线程数量饱和通常的表现为资源占用较高,否则可以适当增加处理线程 *)
(* *)
(* 采用引用计数机制以保证队列的正常释放并防止内存访问违例事件的发生 *)
(* 备 注: *)
(******************************************************************************)
implementationclass function TIOCP.Attach(lpCompletion: TIOCP): TIOCP;
var
Management : Integer;
begin
Result := nil;
if Not Assigned(lpCompletion) then
Exit;
Management := lpCompletion.Attach;
if Management = 1 then
begin
lpCompletion.Detach;
Exit;
end;
Result := lpCompletion;
end;function TIOCP.CreateIoCompletionPort( IOHandle : Cardinal
; IOContextKey : Cardinal
; Concurrent : Cardinal
): THandle;
begin
Result := Windows.CreateIoCompletionPort( IOHandle
, FHandle
, IOContextKey
, Concurrent
);
end;function TIOCP.AssociateWith(IOHandle: Cardinal; IOContextKey: Cardinal):BOOL;
begin
result := (CreateIoCompletionPort(IOHandle,IOContextKey)=self.FHandle);
end;function TIOCP.GetQueuedCompletionStatus( var BytesTransferred : Cardinal
; var IOContextKey : Cardinal
; var Overlapped : POverlapped
; dwMilliseconds : Cardinal
): BOOL;
begin
InterlockedIncrement(FQueueThreadSize);
Result := Windows.GetQueuedCompletionStatus( FHandle
, BytesTransferred
, IOContextKey
, Overlapped
, dwMilliseconds
);
InterlockedDecrement(FQueueThreadSize);
end;function TIOCP.PostQueuedCompletionStatus( lpBytesTransferred : Cardinal
; dwCompletionKey : Cardinal
; lpOverlapped : POverlapped
): BOOL;
begin
if FIsShutingDown then
begin
Result := False;
Exit;
end;
Result := Windows.PostQueuedCompletionStatus( FHandle
, lpBytesTransferred
, dwCompletionKey
, lpOverlapped
);
end;function TIOCP.PostThreadQuitFlag: BOOL;
begin
Result := Windows.PostQueuedCompletionStatus( FHandle, 0
, 0
, POverLapped(SHUTDOWN_FLAG)
);
end;function TIOCP.Attach : Integer;
begin
Result := InterlockedIncrement(FManagementCount);
end;function TIOCP.Detach;
begin
Result := InterlockedDecrement(FManagementCount);
end;procedure TIOCP.Free;
begin
if not (self <> nil) then
Exit;
if Detach > 0 then
Exit;
Inherited Free;
end;constructor TIOCP.Create;
begin
Inherited;
FIsShutingDown := False;
FHandle := INVALID_HANDLE_VALUE;
FQueueThreadSize := 0;
FManagementCount := 0;
FHandle := Windows.CreateIoCompletionPort( INVALID_HANDLE_VALUE
, 0
, 0
, 0
);
InterlockedIncrement(FManagementCount);end;procedure TIOCP.ShutDownAll;
var
I : Integer;
begin I := FQueueThreadSize;
while I > 0 do
begin
PostThreadQuitFlag;
Dec(I);
end;
//while FQueueThreadSize > 0 do Sleep(1);
end;procedure TIOCP.ShutDownAllEx;
begin
FIsShutingDown := True;
ShutDownAll;
end;Destructor TIOCP.Destroy;
begin
ShutDownAll;
if FHandle <> INVALID_HANDLE_VALUE then
CloseHandle(FHandle);
FHandle := INVALID_HANDLE_VALUE;
end;
end.
unit ThreadPoolUnit;interfaceuses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, ExtCtrls;type
TForm3 = class(TForm)
PaintBox1: TPaintBox;
Button1: TButton;
ListBox1: TListBox;
procedure Button1Click(Sender: TObject);
private
type
TWorkerColor = class
FThreadID: Integer;
FColor: TColor;
FForm: TForm3;
procedure PaintLines(Sender: TObject);
procedure PaintLine;
constructor Create(AForm: TForm3; AColor: TColor);
end;
var
FIndex: Integer;
public
{ Public declarations }
end; TObjectHelper = class helper for TObject end; TThreadPool = class
private
type
TUserWorkItem = class
FSender: TObject;
FWorkerEvent: TNotifyEvent;
end;
class procedure QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent; Flags: ULONG); overload; static;
public
class procedure QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent); overload; static;
class procedure QueueIOWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent); static;
class procedure QueueUIWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent); static;
end;var
Form3: TForm3;
ThreadPool: TThreadPool;implementation{$R *.dfm}const
WT_EXECUTEDEFAULT = ULONG($00000000);
WT_EXECUTEINIOTHREAD = ULONG($00000001);
WT_EXECUTEINUITHREAD = ULONG($00000002);
WT_EXECUTEINWAITTHREAD = ULONG($00000004);
WT_EXECUTEONLYONCE = ULONG($00000008);
WT_EXECUTEINTIMERTHREAD = ULONG($00000020);
WT_EXECUTELONGFUNCTION = ULONG($00000010);
WT_EXECUTEINPERSISTENTIOTHREAD = ULONG($00000040);
WT_EXECUTEINPERSISTENTTHREAD = ULONG($00000080);
WT_TRANSFER_IMPERSONATION = ULONG($00000100);function QueueUserWorkItem (func: TThreadStartRoutine; Context: Pointer; Flags: ULONG): BOOL; stdcall; external kernel32 name 'QueueUserWorkItem';function InternalThreadFunction(lpThreadParameter: Pointer): Integer; stdcall;
begin
Result := 0;
try
try
with TThreadPool.TUserWorkItem(lpThreadParameter) do
if Assigned(FWorkerEvent) then
FWorkerEvent(FSender);
finally
TThreadPool.TUserWorkItem(lpThreadParameter).Free;
end;
except end;
end;{ TThreadPool }class procedure TThreadPool.QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent);
begin
QueueWorkItem(Sender, WorkerEvent, WT_EXECUTEDEFAULT);
end;class procedure TThreadPool.QueueIOWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent);
begin
QueueWorkItem(Sender, WorkerEvent, WT_EXECUTEINIOTHREAD);
end;class procedure TThreadPool.QueueUIWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent);
begin
QueueWorkItem(Sender, WorkerEvent, WT_EXECUTEINUITHREAD);
end;class procedure TThreadPool.QueueWorkItem(Sender: TObject; WorkerEvent: TNotifyEvent; Flags: ULONG);
var
WorkItem: TUserWorkItem;
begin
if Assigned(WorkerEvent) then
begin
IsMultiThread := True;
WorkItem := TUserWorkItem.Create;
try
WorkItem.FWorkerEvent := WorkerEvent;
WorkItem.FSender := Sender;
if not QueueUserWorkItem(InternalThreadFunction, WorkItem, Flags) then
RaiseLastOSError;
except
WorkItem.Free;
raise;
end;
end;
end;procedure TForm3.Button1Click(Sender: TObject);
begin
FIndex := PaintBox1.Height;
PaintBox1.Repaint;
ListBox1.Items.Clear;
TWorkerColor.Create(Self, clBlue);
TWorkerColor.Create(Self, clRed);
TWorkerColor.Create(Self, clYellow);
TWorkerColor.Create(Self, clLime);
TWorkerColor.Create(Self, clFuchsia);
TWorkerColor.Create(Self, clTeal);
end;{ TForm3.TWorkerColor }constructor TForm3.TWorkerColor.Create(AForm: TForm3; AColor: TColor);
begin
FForm := AForm;
FColor := AColor;
TThreadPool.QueueWorkItem(Self, PaintLines);
end;procedure TForm3.TWorkerColor.PaintLines(Sender: TObject);
var
I: Integer;
begin
FThreadID := GetCurrentThreadID;
for I := 0 to 9 do
begin
PaintLine;
//TThread.Synchronize(nil, PaintLine);
Sleep(100);
end;
Destroy;
end;procedure TForm3.TWorkerColor.PaintLine;
begin
FForm.PaintBox1.Canvas.Lock;
try
FForm.ListBox1.Items.Add(IntToStr(FThreadID));
with FForm.PaintBox1 do
begin
Canvas.Pen.Color := FColor;
Canvas.Polyline([Point(0, FForm.FIndex), Point(Width, FForm.FIndex)]);
Dec(FForm.FIndex);
if FForm.FIndex <= 0 then
FForm.FIndex := 0;
end;
finally
FForm.PaintBox1.Canvas.Unlock;
end;
end;end.
Adapter:主要是将后面复杂多样的接口采用统一的形式进行传递,相当于接口翻译,不影响任何业务。
Agent:代理,可提供特定的接口协议。通常在一些Client无法直接到达或者无法直接访问或者不允许直接访问Server的一些服务时,通过一个Agent转发甚至是过滤,而进行访问,作为Agent,可以对相应的请求及服务进行数据上的篡改,以适应业务需求。对服务提供者通常是特定的或者由调用者指定。
Broker:一个全权的代理,可以针对服务提供者进行决策性选择,并且可以不告知调用者(与调用者之间的可信度相当高)。比如均衡网关。单纯的Broker不做非必要性的数据篡改(比如UDP转发之后,源地址等不得不篡改)。