昨天回去翻了一下书,找到一个解决的办法,在弹出的时候建立一个PopIndex 的快照,就是用另一个变量保存PopIndex的值function Pop var TempPopIndex:integer; begin 第一步 EnterCriticalSection(FSection); TempPopIndex:=PopIndex; LeaveCriticalSection(FSection); 第二步 if Assigned(OnStrPopChange) then//如果有弹出处理方法,则执行 OnStrPopChange(FBufArr[TempPopIndex],APopLen); 第三步 EnterCriticalSection(FSection); 根据返回的长度确定PopIndex的下一个值 LeaveCriticalSection(FSection); end;因为第一步和第二步的操作时间很短,所以不会造成写入线程的等待.当然这只能是适应于多个写入,一个读取的情况,如果是多个读取就不行.这主要是用在SOCKET接收时,收到数据后要立刻保存起来,而读取处理线程则允许慢一些.
上面的那个Singly Linked List,我有实现过,其他都不是问题,关键点就是那几个内存函数。unit MemFunctionUnit;interface uses windows; const MAX_NATURAL_ALIGNMENT = sizeof(DWORD); MEMORY_ALLOCATION_ALIGNMENT = 8; PTR_SZ : UINT = sizeof(Pointer); type SIZE_T = UINT; UINTPTR_T = UINT; PUINTPTR_T = ^UINTPTR_T; ERRNO_T = Integer; function _aligned_offset_malloc_base( size , align , offset : size_t ): Pointer; function _aligned_malloc_base( size , alignment : size_t ): Pointer; function _aligned_realloc_base( memblock : Pointer ; size , alignment : SIZE_T ): Pointer; function _aligned_recalloc_base( memblock : Pointer ; count , size , alignment : SIZE_T ): Pointer; function _aligned_offset_realloc_base( memblock : Pointer ; size , align , offset : SIZE_T ): Pointer; function _aligned_offset_recalloc_base( memblock : Pointer ; count , size , align , offset : SIZE_T ): Pointer; procedure _aligned_free_base( memblock : Pointer );implementation function IS_2_POW_N( X : SIZE_T ): Boolean;inline; begin Result :=(((X) and (X-1)) = 0); end;function memmove ( dst : Pointer ; const src : Pointer ; count : SIZE_T ): Pointer; var tmpSrc : Pointer; begin result := dst; tmpSrc := Src; if ((SIZE_T(dst) <= SIZE_T(tmpSrc)) or (Pchar(dst) >= (PChar(tmpSrc) + count))) then begin (* * Non-Overlapping Buffers * copy from lower addresses to higher addresses *) while (count>0) do begin dec(count); PChar(dst)^ := PChar(tmpSrc)^; Inc(Pchar(dst)); Inc(PChar(tmpSrc)); end; end else begin (* * Overlapping Buffers * copy from higher addresses to lower addresses *) dst := Pchar(dst) + count - 1; tmpSrc := Pchar(tmpSrc) + count - 1; while (count>0) do begin Dec(Count); PChar(dst)^ := PChar(tmpSrc)^; dst := PChar(dst) - 1; tmpSrc := Pchar(tmpSrc) - 1; end; end; end;function _msize_base ( pblock : Pointer ): size_t; begin Result :=$ffffffff; (* validation section *) if Not(pblock <> Nil) then Exit; if Not HeapLock(ProcessHeap) then Exit; try Result := size_t(HeapSize(ProcessHeap,0,pblock)); finally HeapUnlock(ProcessHeap); end; end;function _aligned_offset_malloc_base( size , align , offset : size_t ): Pointer; var ptr , retptr , gap : UINTPTR_T; tmpPtr : PUINTPTR_T; begin Result := Nil; if Not IS_2_POW_N(align) then Exit; if Not ((offset = 0) or (offset < size)) then Exit; if align < PTR_SZ then align := PTR_SZ - 1 else align := align -1; (* gap = number of bytes needed to round up offset to align with PTR_SZ*) gap := (0 - offset) and (PTR_SZ -1); ptr := uintptr_t(HeapAllocEx(PTR_SZ +gap +align +size)); if ( ptr = uintptr_t(0)) then Exit; retptr := ((ptr +PTR_SZ +gap +align +offset) and (Not align))- offset; tmpPtr := Puintptr_t(retptr - gap); Dec(tmpPtr); tmpPtr^ := ptr; result := Pointer(retptr); end;function _aligned_malloc_base( size , alignment : size_t ): Pointer; begin Result := _aligned_offset_malloc_base(size, alignment, 0); end;function _aligned_realloc_base( memblock : Pointer ; size , alignment : SIZE_T ): Pointer; begin result := _aligned_offset_realloc_base(memblock, size, alignment, 0); end;function _aligned_recalloc_base( memblock : Pointer ; count , size , alignment : SIZE_T ): Pointer; begin result := _aligned_offset_recalloc_base(memblock, count, size, alignment, 0); end;function _aligned_offset_realloc_base( memblock : Pointer ; size , align , offset : SIZE_T ): Pointer; var ptr , retptr , gap , stptr , diff : uintptr_t; movsz , reqsz : uintptr_t; bFree : boolean; save_errno : errno_t; tmpPtr : PUINTPTR_T; begin bFree := false; Result := Nil; (* special cases *) if (memblock = Nil) then begin Result := _aligned_offset_malloc_base(size, align, offset); Exit; end; if (size = 0) then begin _aligned_free_base(memblock); Exit; end; (* validation section *) if Not IS_2_POW_N(align) then Exit; if Not ((offset = 0) or (offset < size)) then Exit; stptr := uintptr_t(memblock); (* ptr points to the pointer to starting of the memory block *) stptr := (stptr or Not(PTR_SZ -1)) - PTR_SZ; (* ptr is the pointer to the start of memory block*) stptr := puintptr_t (stptr)^; if (align < PTR_SZ ) then align := PTR_SZ; align := align -1; (* gap = number of bytes needed to round up offset to align with PTR_SZ*) gap := (0 -offset) and (PTR_SZ -1); diff := uintptr_t(memblock) - stptr; (* Mov size is min of the size of data available and sizw requested. *) movsz := _msize_base(Pointer(stptr)) - (uintptr_t(memblock) - stptr); if movsz>size then movsz := size; reqsz := PTR_SZ +gap +align +size; (* First check if we can expand(reducing or expanding using expand) data * safely, ie no data is lost. eg, reducing alignment and keeping size * same might result in loss of data at the tail of data block while * expanding. * * If no, use malloc to allocate the new data and move data. * * If yes, expand and then check if we need to move the data. *) if ((stptr +align +PTR_SZ +gap)<uintptr_t(memblock)) then begin ptr := uintptr_t(HeapAllocEx(reqsz)); if ((ptr) = uintptr_t( 0)) then Exit; bFree := true; end else begin (* we need to save errno, which can be modified by _expand *) save_errno:= GetLastError; ptr:=uintptr_t(HeapReAllocEx(Pointer(stptr), reqsz)); if (ptr = uintptr_t(0)) then begin SetLastError(save_errno); ptr := uintptr_t(HeapAllocEx(reqsz)); if (ptr = uintptr_t( 0)) then Exit; bFree := true; end else stptr := ptr; end; if ( (ptr = (uintptr_t(memblock) - diff)) and ((Not( (uintptr_t(memblock) + gap +offset) and (Not align) ))<>0)) then //if (ptr = (uintptr_t(memblock) - diff) ) // and (Not( (size_t(memblock) + gap +offset)) and (Not(align)) ) then begin Result:= memblock; Exit; end; retptr :=((ptr +PTR_SZ +gap +align +offset) and Not align)- offset; memmove(Pointer(retptr), Pointer(stptr + diff), movsz); if ( bFree) then HeapfreeEx (Pointer(stptr)); //((uintptr_t *)(retptr - gap))[-1] = ptr; tmpPtr := Puintptr_t(retptr - gap); Dec(tmpPtr); tmpPtr^:=ptr; Result:= Pointer(retptr); end;function _aligned_offset_recalloc_base( memblock:Pointer; count, size, align, offset:SIZE_T ):Pointer; var size_orig: SIZE_T; begin //size_orig:=0; (* ensure that (size * num) does not overflow *) size_orig := size * count; Result:= _aligned_offset_realloc_base(memblock, size_orig, align, offset); end;procedure _aligned_free_base(memblock:Pointer); var ptr:uintptr_t ; begin if (memblock = Nil) then Exit; ptr := uintptr_t(memblock); (* ptr points to the pointer to starting of the memory block *) ptr := (ptr and Not (PTR_SZ -1)) - PTR_SZ; (* ptr is the pointer to the start of memory block*) ptr := (Puintptr_t(ptr))^; HeapFreeEx(Pointer(ptr)); end;end.
因为SOCKET在接收的时候每次RECEIVE都是将SOCKET缓冲区中的数据全部接收了,而些数据肯定会是包括了自己定义的多个包.读取的时候就是要进行分隔这些包,还有就是可能会有将包截断的情况,这里POP就不能全部弹出,TZStrPopChangeEvent=procedure (const S:String;var APopLen: Integer) of object;if Assigned(OnStrPopChange) then OnStrPopChange(FBufArr[TempPopIndex],APopLen);线程执行POP ,POP里调用OnStrPopChange来处理内容,OnStrPopChange是业务程序提供的,如文件处理,将收到的包合并成文件;表处理,将收到的数据组合成一个表格的形式.当然,每个业务也有自己的缓冲区. OnStrPopChange主要进行包业务归属判断和包分隔.但相比来说,SOCKET的RECEIVE事件收到后,总是能立刻放入缓冲区的话,对性能来说会更好一些. 如果SOCKET的RECEIVE的速度太快,会有造成堆积的可能性,但只要分配较大的缓冲空间,或者将客户端的发送间隔调整一下,或者让客户端有个策略,如在连续发送超过10M的情况下暂停几秒. 但是作为服务器来说,只要有数据就应该尽量快的接收下来.现在有问题是,如何才能测试比较出各种实现方式的性能差异和BUG,因为很少能达到极点,BUG会很隐晦
续上: public constructor Create; destructor Destroy; override; public class function Attach(lpCompletion: TIOCP): TIOCP; Overload; end; const SHUTDOWN_FLAG = $FFFFFFFF; // Posted to the completion port when shutting downimplementationclass function TIOCP.Attach(lpCompletion: TIOCP): TIOCP; var Management : Integer; begin Result := nil; if Not Assigned(lpCompletion) then Exit; Management := lpCompletion.Attach; if Management = 1 then begin lpCompletion.Detach; Exit; end; Result := lpCompletion; end;function TIOCP.CreateIoCompletionPort( IOHandle : Cardinal ; IOContextKey : Cardinal ; Concurrent : Cardinal ): THandle; begin Result := Windows.CreateIoCompletionPort( IOHandle , FHandle , IOContextKey , Concurrent ); end;function TIOCP.AssociateWith(IOHandle: Cardinal; IOContextKey: Cardinal):BOOL; begin result := (CreateIoCompletionPort(IOHandle,IOContextKey)=self.FHandle); end;function TIOCP.GetQueuedCompletionStatus( var BytesTransferred : Cardinal ; var IOContextKey : Cardinal ; var Overlapped : POverlapped ; dwMilliseconds : Cardinal ): BOOL; begin InterlockedIncrement(FQueueThreadSize); Result := Windows.GetQueuedCompletionStatus( FHandle , BytesTransferred , IOContextKey , Overlapped , dwMilliseconds ); InterlockedDecrement(FQueueThreadSize); end;function TIOCP.PostQueuedCompletionStatus( lpBytesTransferred : Cardinal ; dwCompletionKey : Cardinal ; lpOverlapped : POverlapped ): BOOL; begin if FIsShutingDown then begin Result := False; Exit; end; Result := Windows.PostQueuedCompletionStatus( FHandle , lpBytesTransferred , dwCompletionKey , lpOverlapped ); end;function TIOCP.PostThreadQuitFlag: BOOL; begin Result := Windows.PostQueuedCompletionStatus( FHandle, 0 , 0 , POverLapped(SHUTDOWN_FLAG) ); end;function TIOCP.Attach : Integer; begin Result := InterlockedIncrement(FManagementCount); end;function TIOCP.Detach; begin Result := InterlockedDecrement(FManagementCount); end;procedure TIOCP.Free; begin if not (self <> nil) then Exit; if Detach > 0 then Exit; Inherited Free; end;constructor TIOCP.Create; begin Inherited; FIsShutingDown := False; FHandle := INVALID_HANDLE_VALUE; FQueueThreadSize := 0; FManagementCount := 0; FHandle := Windows.CreateIoCompletionPort( INVALID_HANDLE_VALUE , 0 , 0 , 0 ); InterlockedIncrement(FManagementCount);end;procedure TIOCP.ShutDownAll; var I : Integer; begin I := FQueueThreadSize; while I > 0 do begin PostThreadQuitFlag; Dec(I); end; while FQueueThreadSize > 0 do Sleep(1); end;procedure TIOCP.ShutDownAllEx; begin FIsShutingDown := True; ShutDownAll; end;Destructor TIOCP.Destroy; begin ShutDownAll; if FHandle <> INVALID_HANDLE_VALUE then CloseHandle(FHandle); FHandle := INVALID_HANDLE_VALUE; end; end.
//---------------------------------------------------------------------------
// Que.h
//---------------------------------------------------------------------------
#ifndef QueH
#define QueH#include <vcl.h>
#pragma hdrstop//---------------------------------------------------------------------------
class TWaitableQueue
{
private:
TList *_List;
DWORD _MaxSize;
HANDLE _MutSem[2];
public:
// 队列的最大项个数为 65535 项
__fastcall TWaitableQueue(const DWORD MaxSize = 0x0000FFFF);
__fastcall ~TWaitableQueue(void); // 返回 Push 后,队列项的个数
// -1 : 参数错误, aItem == NULL
// -2 : 超时
// -3 : 队列满,无法 Push
int __fastcall PushItem(void* aItem, const DWORD aTimeout = 3000);
// 返回 Take 后,队列项的个数,如果队列为空,则一直等待
// -1 : 参数错误, aItem == NULL
// -2 : 超时
// -3 : 失败
int __fastcall TakeItem(void** aItem, const DWORD aTimeout = 3000);};
//---------------------------------------------------------------------------
#endif
//---------------------------------------------------------------------------
// Que.cpp
//---------------------------------------------------------------------------#pragma hdrstop#include "Que.h"//---------------------------------------------------------------------------
#pragma package(smart_init)
//---------------------------------------------------------------------------
__fastcall TWaitableQueue::TWaitableQueue(const DWORD MaxSize)
{
_List = new TList();
_MaxSize = MaxSize;
_MutSem[0] = ::CreateMutex(NULL, FALSE, NULL);
_MutSem[1] = ::CreateSemaphore(NULL, 0, _MaxSize, NULL);
}
//---------------------------------------------------------------------------
__fastcall TWaitableQueue::~TWaitableQueue(void)
{
::CloseHandle(_MutSem[0]);
::CloseHandle(_MutSem[1]);
delete _List;
}
//---------------------------------------------------------------------------
int __fastcall TWaitableQueue::PushItem(void* aItem, const DWORD aTimeout)
{
if(aItem == NULL) return -1; DWORD dwRet;
dwRet = ::WaitForSingleObject(_MutSem[0], aTimeout);
if(dwRet == WAIT_TIMEOUT)
return -2; long prev = -3; if(::ReleaseSemaphore(_MutSem[1], 1, &prev)) {
_List->Add(aItem);
prev = prev + 1;
}
::ReleaseMutex(_MutSem[0]); return prev;}
//---------------------------------------------------------------------------
int __fastcall TWaitableQueue::TakeItem(void** aItem, const DWORD aTimeout)
{
if(aItem == NULL) return -1;
*aItem = NULL; DWORD dwRet;
dwRet = ::WaitForMultipleObjects(2, _MutSem, TRUE, aTimeout);
if(dwRet == WAIT_TIMEOUT)
return -2; long prev = -3;
int i = _List->Count;
if(i>0) {
*aItem = _List->Items[0];
_List->Delete(0);
prev = i - 1;
} ::ReleaseMutex(_MutSem[0]);
return prev;
}
//---------------------------------------------------------------------------
var
TempPopIndex:integer;
begin
第一步
EnterCriticalSection(FSection);
TempPopIndex:=PopIndex;
LeaveCriticalSection(FSection);
第二步
if Assigned(OnStrPopChange) then//如果有弹出处理方法,则执行
OnStrPopChange(FBufArr[TempPopIndex],APopLen);
第三步
EnterCriticalSection(FSection);
根据返回的长度确定PopIndex的下一个值
LeaveCriticalSection(FSection);
end;因为第一步和第二步的操作时间很短,所以不会造成写入线程的等待.当然这只能是适应于多个写入,一个读取的情况,如果是多个读取就不行.这主要是用在SOCKET接收时,收到数据后要立刻保存起来,而读取处理线程则允许慢一些.
uses
windows;
const
MAX_NATURAL_ALIGNMENT = sizeof(DWORD);
MEMORY_ALLOCATION_ALIGNMENT = 8;
PTR_SZ : UINT = sizeof(Pointer);
type
SIZE_T = UINT;
UINTPTR_T = UINT;
PUINTPTR_T = ^UINTPTR_T;
ERRNO_T = Integer;
function _aligned_offset_malloc_base(
size
, align
, offset : size_t
): Pointer;
function _aligned_malloc_base(
size
, alignment : size_t
): Pointer; function _aligned_realloc_base(
memblock : Pointer
; size
, alignment : SIZE_T
): Pointer; function _aligned_recalloc_base(
memblock : Pointer
; count
, size
, alignment : SIZE_T
): Pointer;
function _aligned_offset_realloc_base(
memblock : Pointer
; size
, align
, offset : SIZE_T
): Pointer;
function _aligned_offset_recalloc_base(
memblock : Pointer
; count
, size
, align
, offset : SIZE_T
): Pointer;
procedure _aligned_free_base(
memblock : Pointer
);implementation
function IS_2_POW_N(
X : SIZE_T
): Boolean;inline;
begin
Result :=(((X) and (X-1)) = 0);
end;function memmove (
dst : Pointer
; const src : Pointer
; count : SIZE_T
): Pointer;
var
tmpSrc : Pointer;
begin
result := dst;
tmpSrc := Src;
if ((SIZE_T(dst) <= SIZE_T(tmpSrc))
or (Pchar(dst) >= (PChar(tmpSrc) + count))) then
begin
(*
* Non-Overlapping Buffers
* copy from lower addresses to higher addresses
*)
while (count>0) do
begin
dec(count);
PChar(dst)^ := PChar(tmpSrc)^;
Inc(Pchar(dst));
Inc(PChar(tmpSrc));
end;
end
else
begin
(*
* Overlapping Buffers
* copy from higher addresses to lower addresses
*)
dst := Pchar(dst) + count - 1;
tmpSrc := Pchar(tmpSrc) + count - 1; while (count>0) do
begin
Dec(Count);
PChar(dst)^ := PChar(tmpSrc)^;
dst := PChar(dst) - 1;
tmpSrc := Pchar(tmpSrc) - 1;
end;
end;
end;function _msize_base (
pblock : Pointer
): size_t;
begin
Result :=$ffffffff;
(* validation section *)
if Not(pblock <> Nil) then
Exit;
if Not HeapLock(ProcessHeap) then
Exit;
try
Result := size_t(HeapSize(ProcessHeap,0,pblock));
finally
HeapUnlock(ProcessHeap);
end;
end;function _aligned_offset_malloc_base(
size
, align
, offset : size_t
): Pointer;
var
ptr
, retptr
, gap : UINTPTR_T;
tmpPtr : PUINTPTR_T;
begin
Result := Nil;
if Not IS_2_POW_N(align) then
Exit;
if Not ((offset = 0) or (offset < size)) then
Exit;
if align < PTR_SZ then
align := PTR_SZ - 1
else
align := align -1;
(* gap = number of bytes needed to round up offset to align with PTR_SZ*)
gap := (0 - offset) and (PTR_SZ -1);
ptr := uintptr_t(HeapAllocEx(PTR_SZ +gap +align +size));
if ( ptr = uintptr_t(0)) then
Exit; retptr := ((ptr +PTR_SZ +gap +align +offset) and (Not align))- offset; tmpPtr := Puintptr_t(retptr - gap);
Dec(tmpPtr);
tmpPtr^ := ptr; result := Pointer(retptr);
end;function _aligned_malloc_base(
size
, alignment : size_t
): Pointer;
begin
Result := _aligned_offset_malloc_base(size, alignment, 0);
end;function _aligned_realloc_base(
memblock : Pointer
; size
, alignment : SIZE_T
): Pointer;
begin
result := _aligned_offset_realloc_base(memblock, size, alignment, 0);
end;function _aligned_recalloc_base(
memblock : Pointer
; count
, size
, alignment : SIZE_T
): Pointer;
begin
result := _aligned_offset_recalloc_base(memblock, count, size, alignment, 0);
end;function _aligned_offset_realloc_base(
memblock : Pointer
; size
, align
, offset : SIZE_T
): Pointer;
var
ptr
, retptr
, gap
, stptr
, diff : uintptr_t;
movsz
, reqsz : uintptr_t;
bFree : boolean;
save_errno : errno_t;
tmpPtr : PUINTPTR_T;
begin
bFree := false;
Result := Nil;
(* special cases *)
if (memblock = Nil) then
begin
Result := _aligned_offset_malloc_base(size, align, offset);
Exit;
end;
if (size = 0) then
begin
_aligned_free_base(memblock);
Exit;
end; (* validation section *)
if Not IS_2_POW_N(align) then
Exit;
if Not ((offset = 0) or (offset < size)) then
Exit; stptr := uintptr_t(memblock); (* ptr points to the pointer to starting of the memory block *)
stptr := (stptr or Not(PTR_SZ -1)) - PTR_SZ; (* ptr is the pointer to the start of memory block*)
stptr := puintptr_t (stptr)^;
if (align < PTR_SZ ) then
align := PTR_SZ;
align := align -1;
(* gap = number of bytes needed to round up offset to align with PTR_SZ*)
gap := (0 -offset) and (PTR_SZ -1); diff := uintptr_t(memblock) - stptr;
(* Mov size is min of the size of data available and sizw requested.
*)
movsz := _msize_base(Pointer(stptr)) - (uintptr_t(memblock) - stptr);
if movsz>size then
movsz := size;
reqsz := PTR_SZ +gap +align +size; (* First check if we can expand(reducing or expanding using expand) data
* safely, ie no data is lost. eg, reducing alignment and keeping size
* same might result in loss of data at the tail of data block while
* expanding.
*
* If no, use malloc to allocate the new data and move data.
*
* If yes, expand and then check if we need to move the data.
*)
if ((stptr +align +PTR_SZ +gap)<uintptr_t(memblock)) then
begin
ptr := uintptr_t(HeapAllocEx(reqsz));
if ((ptr) = uintptr_t( 0)) then
Exit;
bFree := true;
end
else
begin
(* we need to save errno, which can be modified by _expand *)
save_errno:= GetLastError;
ptr:=uintptr_t(HeapReAllocEx(Pointer(stptr), reqsz));
if (ptr = uintptr_t(0)) then
begin
SetLastError(save_errno);
ptr := uintptr_t(HeapAllocEx(reqsz));
if (ptr = uintptr_t( 0)) then
Exit;
bFree := true;
end
else
stptr := ptr;
end; if ( (ptr = (uintptr_t(memblock) - diff))
and ((Not( (uintptr_t(memblock) + gap +offset) and (Not align) ))<>0)) then
//if (ptr = (uintptr_t(memblock) - diff) )
// and (Not( (size_t(memblock) + gap +offset)) and (Not(align)) ) then
begin
Result:= memblock;
Exit;
end; retptr :=((ptr +PTR_SZ +gap +align +offset) and Not align)- offset;
memmove(Pointer(retptr), Pointer(stptr + diff), movsz);
if ( bFree) then
HeapfreeEx (Pointer(stptr)); //((uintptr_t *)(retptr - gap))[-1] = ptr;
tmpPtr := Puintptr_t(retptr - gap);
Dec(tmpPtr);
tmpPtr^:=ptr; Result:= Pointer(retptr);
end;function _aligned_offset_recalloc_base(
memblock:Pointer;
count,
size,
align,
offset:SIZE_T
):Pointer;
var
size_orig: SIZE_T;
begin
//size_orig:=0; (* ensure that (size * num) does not overflow *) size_orig := size * count; Result:= _aligned_offset_realloc_base(memblock, size_orig, align, offset);
end;procedure _aligned_free_base(memblock:Pointer);
var
ptr:uintptr_t ;
begin
if (memblock = Nil) then
Exit; ptr := uintptr_t(memblock); (* ptr points to the pointer to starting of the memory block *)
ptr := (ptr and Not (PTR_SZ -1)) - PTR_SZ; (* ptr is the pointer to the start of memory block*)
ptr := (Puintptr_t(ptr))^;
HeapFreeEx(Pointer(ptr));
end;end.
OnStrPopChange(FBufArr[TempPopIndex],APopLen);线程执行POP ,POP里调用OnStrPopChange来处理内容,OnStrPopChange是业务程序提供的,如文件处理,将收到的包合并成文件;表处理,将收到的数据组合成一个表格的形式.当然,每个业务也有自己的缓冲区.
OnStrPopChange主要进行包业务归属判断和包分隔.但相比来说,SOCKET的RECEIVE事件收到后,总是能立刻放入缓冲区的话,对性能来说会更好一些.
如果SOCKET的RECEIVE的速度太快,会有造成堆积的可能性,但只要分配较大的缓冲空间,或者将客户端的发送间隔调整一下,或者让客户端有个策略,如在连续发送超过10M的情况下暂停几秒.
但是作为服务器来说,只要有数据就应该尽量快的接收下来.现在有问题是,如何才能测试比较出各种实现方式的性能差异和BUG,因为很少能达到极点,BUG会很隐晦
uses
Windows;
type
TIOCP=class(TObject)
strict private
FHandle : THandle; //完成端口句柄
FQueueThreadSize : Integer; //线程队列
FManagementCount : Integer; //引用计数
FIsShutingDown : Boolean; public
property QueueThreads : Integer read FQueueThreadSize; //线程队列
property ManagementCount : Integer read FManagementCount; //引用计数 public
(*====================================================================*)
(*** IO句柄与完成端口关联 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* IOHandle : IO句柄 (文件句柄,管道句柄,Socket等句柄类IO资源) *)
(* IOContextKey : IO上下文关键字 (描述 IOHandle 相关的上下文数据, *)
(* 结构体或者类指针等标识性信息 ) *)
(* Concurrent : 并发线程数,通常为0 *)
(* 输出参数: (无) *)
(* 返 回 值: 完成端口句柄 *)
(* 备 注: *)
(*--------------------------------------------------------------------*)
function CreateIoCompletionPort( IOHandle : THandle
; IOContextKey : DWORD
; Concurrent : DWORD = 0
): THandle;
function AssociateWith ( IOHandle : THandle
; IOContextKey : DWORD
) : BOOL; (*====================================================================*)
(*** 完成队列检索 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* dwMilliseconds : 等待时间(单位:毫秒),同WaitForSingleObject *)
(* 输出参数: *)
(* BytesTransferred : 传输完成字节数,0表示IO关闭,或者出现异常 *)
(* IOContextKey : IO上下文关键字 *)
(* Overlapped : 重叠IO结构指针 *)
(* 返 回 值: *)
(* 成功:True,失败/出错:False(请使用GetLastError取得具体错误) *)
(* 备 注: *)
(*--------------------------------------------------------------------*)
function GetQueuedCompletionStatus( var BytesTransferred : DWORD
; var IOContextKey : DWORD
; var Overlapped : POverlapped
; dwMilliseconds : DWORD
): BOOL; (*====================================================================*)
(*** 发送完成事件 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* lpBytesTransferred : 与 GetQueuedCompletionStatus 相对应 *)
(* dwCompletionKey : 与 GetQueuedCompletionStatus 相对应 *)
(* lpOverlapped : 与 GetQueuedCompletionStatus 相对应 *)
(* 输出参数: (无) *)
(* 返回值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备注: *)
(*--------------------------------------------------------------------*)
function PostQueuedCompletionStatus( lpBytesTransferred : DWORD
; dwCompletionKey : DWORD
; lpOverlapped : POverlapped
): BOOL; (*====================================================================*)
(*** 发送一个特定的完成事件 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: (无) *)
(* 输出参数: (无) *)
(* 返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备 注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG) *)
(*--------------------------------------------------------------------*)
function PostThreadQuitFlag : BOOL; (*====================================================================*)
(*** 增加引用计数 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: (无) *)
(* 输出参数: (无) *)
(* 返 回 值: 当前引用计数 *)
(* 备 注: *)
(*--------------------------------------------------------------------*)
function Attach : Integer; overload; (*====================================================================*)
(*** 减少引用计数 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: (无) *)
(* 输出参数: (无) *)
(* 返 回 值: 剩余引用计数 *)
(* 备 注: 减少引用计数,但不理会引用计数是否为0,请使用 Free 代替 *)
(*--------------------------------------------------------------------*)
function Detach : Integer; (*====================================================================*)
(*** 为每对列当中的每一个线程发送一个特定的完成事件 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: (无) *)
(* 输出参数: (无) *)
(* 返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备 注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG) *)
(*--------------------------------------------------------------------*)
procedure ShutDownAll;
procedure ShutDownAllEx; (*====================================================================*)
(*** 减少引用计数 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: (无) *)
(* 输出参数: (无) *)
(* 返 回 值: (无) *)
(* 备 注: 当引用计数减少为0时,对象被自动释放 *)
(*--------------------------------------------------------------------*)
procedure Free;
constructor Create;
destructor Destroy; override;
public
class function Attach(lpCompletion: TIOCP): TIOCP; Overload;
end;
const
SHUTDOWN_FLAG = $FFFFFFFF; // Posted to the completion port when shutting downimplementationclass function TIOCP.Attach(lpCompletion: TIOCP): TIOCP;
var
Management : Integer;
begin
Result := nil;
if Not Assigned(lpCompletion) then
Exit;
Management := lpCompletion.Attach;
if Management = 1 then
begin
lpCompletion.Detach;
Exit;
end;
Result := lpCompletion;
end;function TIOCP.CreateIoCompletionPort( IOHandle : Cardinal
; IOContextKey : Cardinal
; Concurrent : Cardinal
): THandle;
begin
Result := Windows.CreateIoCompletionPort( IOHandle
, FHandle
, IOContextKey
, Concurrent
);
end;function TIOCP.AssociateWith(IOHandle: Cardinal; IOContextKey: Cardinal):BOOL;
begin
result := (CreateIoCompletionPort(IOHandle,IOContextKey)=self.FHandle);
end;function TIOCP.GetQueuedCompletionStatus( var BytesTransferred : Cardinal
; var IOContextKey : Cardinal
; var Overlapped : POverlapped
; dwMilliseconds : Cardinal
): BOOL;
begin
InterlockedIncrement(FQueueThreadSize);
Result := Windows.GetQueuedCompletionStatus( FHandle
, BytesTransferred
, IOContextKey
, Overlapped
, dwMilliseconds
);
InterlockedDecrement(FQueueThreadSize);
end;function TIOCP.PostQueuedCompletionStatus( lpBytesTransferred : Cardinal
; dwCompletionKey : Cardinal
; lpOverlapped : POverlapped
): BOOL;
begin
if FIsShutingDown then
begin
Result := False;
Exit;
end;
Result := Windows.PostQueuedCompletionStatus( FHandle
, lpBytesTransferred
, dwCompletionKey
, lpOverlapped
);
end;function TIOCP.PostThreadQuitFlag: BOOL;
begin
Result := Windows.PostQueuedCompletionStatus( FHandle, 0
, 0
, POverLapped(SHUTDOWN_FLAG)
);
end;function TIOCP.Attach : Integer;
begin
Result := InterlockedIncrement(FManagementCount);
end;function TIOCP.Detach;
begin
Result := InterlockedDecrement(FManagementCount);
end;procedure TIOCP.Free;
begin
if not (self <> nil) then
Exit;
if Detach > 0 then
Exit;
Inherited Free;
end;constructor TIOCP.Create;
begin
Inherited;
FIsShutingDown := False;
FHandle := INVALID_HANDLE_VALUE;
FQueueThreadSize := 0;
FManagementCount := 0;
FHandle := Windows.CreateIoCompletionPort( INVALID_HANDLE_VALUE
, 0
, 0
, 0
);
InterlockedIncrement(FManagementCount);end;procedure TIOCP.ShutDownAll;
var
I : Integer;
begin I := FQueueThreadSize;
while I > 0 do
begin
PostThreadQuitFlag;
Dec(I);
end;
while FQueueThreadSize > 0 do Sleep(1);
end;procedure TIOCP.ShutDownAllEx;
begin
FIsShutingDown := True;
ShutDownAll;
end;Destructor TIOCP.Destroy;
begin
ShutDownAll;
if FHandle <> INVALID_HANDLE_VALUE then
CloseHandle(FHandle);
FHandle := INVALID_HANDLE_VALUE;
end;
end.
写入:
PostQueuedCompletionStatus(DWORD(Parameter1),DWORD(Parameter2),POverlapped(Parameter3))
成功返回 Not (0),失败返回 0;
读取:
GetQueuedCompletionStatus(DWORD(Parameter1),DWORD(Parameter2),POverlapped(Parameter3),WaitForTime(*等待超时时间*))
判断超时:
GetLastError = WAIT_TIMEOUT 当WaitForTime参数设置为INFINITE表示一直等待,适合线程池调度.
而GetQueuedCompletionStatus本身就是一个很好的线程池。只要线程使用GetQueuedCompletionStatus进行工作等待,任务的调配就可以通过PostQueuedCompletionStatus来分派给线程。只要三个参数对齐就可以。而且其中的POverlapped也同样可以当作是指针来使用。