操作说明:
我用多线程同时连接不同机器的数据库(Sql Server),将数据库中的数据导入到本地机器的数据库中,我启用的是并发线程,并不是队列等待形式的。
在这个过程中,如果连接机器在3台以上(数据量大约在每台机器60万条),就会发生数据库错误,我同时在memo控件中显示连接信息,但memo控件却没有像paint控件的lock那样的属性,所以在线程同时运行情况下,都会对memo进行写入,显示的文字也非常的乱。
各位兄弟能不能帮小弟一把,费费心,解决一下。必重谢!!
我用多线程同时连接不同机器的数据库(Sql Server),将数据库中的数据导入到本地机器的数据库中,我启用的是并发线程,并不是队列等待形式的。
在这个过程中,如果连接机器在3台以上(数据量大约在每台机器60万条),就会发生数据库错误,我同时在memo控件中显示连接信息,但memo控件却没有像paint控件的lock那样的属性,所以在线程同时运行情况下,都会对memo进行写入,显示的文字也非常的乱。
各位兄弟能不能帮小弟一把,费费心,解决一下。必重谢!!
Memo.Lines.EndUpdate应该是你要的
源码:
unit uServiceDBPooler;interfaceuses
DB, Provider, Classes, SqlExpr, IniFiles, variants, SysUtils, Forms, DBCommon, DSIntf, DBClient,Dialogs;type
//TDBExecuteFlag = (exeGetData, exeSetData, exeOpenSQL, exeExecuteSQL); EDatabasePoolMax = class(EDatabaseError);
TDBExpressConnPool = class(TComponent)
private
FConnectionName: String;
FDriverName: string;
FGetDriverFunc: string;
FLibraryName: string;
FVendorLib: string;
ConList : TThreadList;
InUseList : TBits;
FMaxConnections: integer;
FParams: TStrings;
FAutoOpen: boolean;
procedure SetMaxConnections(const Value: integer);
protected
public
property ConnectionName :string read FConnectionName write FConnectionName;
property DriverName :string read FDriverName write FDriverName;
property GetDriverFunc :string read FGetDriverFunc write FGetDriverFunc;
property LibraryName :string read FLibraryName write FLibraryName;
property VendorLib :string read FVendorLib write FVendorLib;
property Params :TStrings read FParams write FParams;
constructor Create(AOwner : TComponent); override;
destructor Destroy; override;
procedure OpenAll; virtual;
procedure CloseAll; virtual;
function AcquireDB : TSQLConnection; virtual;
procedure ReleaseDB(DBExpressConn : TSQLConnection); virtual;
published
property MaxConnections : integer read FMaxConnections write SetMaxConnections;
Property AutoOpen : boolean read FAutoOpen write FAutoOpen default true;
end; TDBExpressQueryQueue = class;
EQueryQueueMax = class(EDatabaseError); TDBExpressQueueItem = class(TObject)
private
FExecuteQuery: boolean;
FReadOnly: boolean;
FText: string;
FQueryObject: TSQLQuery;
FDataSetProvider: TDataSetProvider;
FIsReady: boolean;
FData: OleVariant;
FNeedFree: boolean;
FNErr: integer;
public
constructor Create; virtual;
destructor Destroy;override;
property Text : string read FText write FText;
property ReadOnly : boolean read FReadOnly write FReadOnly;
property ExecuteQuery : boolean read FExecuteQuery write FExecuteQuery;
property QueryObject : TSQLQuery read FQueryObject write FQueryObject;
property DataSetProvider: TDataSetProvider read FDataSetProvider write FDataSetProvider;
property IsReady : boolean read FIsReady write FIsReady;
property NeedFree : boolean read FNeedFree write FNeedFree;
end; TDBExpressQueueManager = class(TThread)
private
FIsOver: Boolean;
FDBExpressQueryQueue : TDBExpressQueryQueue;
protected
procedure Execute; override;
public
constructor Create(QQ : TDBExpressQueryQueue);
end; TDBExpressQueryQueue = class(TComponent)
private
FDBExpressConnPool: TDBExpressConnPool;
Queue : TThreadList;
QueueManagerList : TTHreadList;
FMaxQueueFactor: integer;
FQueueManagers: integer;
procedure SetDBExpressDatabasePool(const Value: TDBExpressConnPool);
procedure SetQueueManagers(const Value: integer);
protected
procedure Notification(AComponent: TComponent; Operation: TOperation); override;
public
constructor Create(AOwner : TComponent); override;
destructor Destroy; override;
function ExecuteSQL(Delta,ProviderFlagsParams: OleVariant): integer;
function OpenSQL(Text: string): OleVariant; virtual;
published
property DBExpressDatabasePool : TDBExpressConnPool read FDBExpressConnPool write SetDBExpressDatabasePool;
property MaxQueueFactor : integer read FMaxQueueFactor write FMaxQueueFactor;
property QueueManagers : integer read FQueueManagers write SetQueueManagers;
end; TAdvDataSetProvider = class(TDataSetProvider)
private
FProviderFlagsParams: OleVariant;
procedure GetDataSetProperties(Sender: TObject;
DataSet: TDataSet; out Properties: OleVariant);
procedure OnBeforeUpdateRecord(Sender: TObject; SourceDS: TDataSet; DeltaDS:
TCustomClientDataSet; UpdateKind: TUpdateKind;var Applied: Boolean);
public
property ProviderFlagsParams: OleVariant read FProviderFlagsParams write FProviderFlagsParams;
constructor Create(AOwner: TComponent);override;
end;
begin
inherited;
Queue := TThreadList.Create;
QueueManagerList := TThreadList.Create;
FMaxQueueFactor := 3;
FQueueManagers := 1;
end;destructor TDBExpressQueryQueue.Destroy;
var
i : integer;
begin
with QueueManagerList.LockList do
try
for i := 0 to count-1 do
begin
TDBExpressQueueManager(Items[i]).Terminate;
TDBExpressQueueManager(Items[i]).Free;
end;
finally
QueueManagerList.UnLockList;
end;
QueueManagerList.Free;
with Queue.LockList do
try
for i := 0 to count-1 do
TDBExpressQueueItem(Items[i]).Free;
finally
Queue.UnlockList;
end;
Queue.Free;
inherited;
end;function TDBExpressQueryQueue.ExecuteSQL(Delta,ProviderFlagsParams: OleVariant): integer;
var
SQLConnection: TSQLConnection;
o : TDBExpressQueueItem;
nErr: integer;
Query: TSQLQuery;
DSP: TAdvDataSetProvider;
iCount: Integer;
begin
Query := TSQLQuery.Create(nil);
DSP := TAdvDataSetProvider.Create(nil);
DSP.DataSet := Query;
DSP.ProviderFlagsParams := ProviderFlagsParams;
try
SQLConnection := FDBExpressConnPool.AcquireDB;
except
with Queue.LockList do
try
if Count >= FDBExpressConnPool.MaxConnections*FMaxQueueFactor then
raise Exception.Create('The database queue is full. Please try again in a few seconds.');
finally
Queue.UnlockList;
end;
with QueueManagerList.LockList do
begin
try
if Count = 0 then
QueueManagerList.Add(TDBExpressQueueManager.Create(Self));
finally
QueueManagerList.UnlockList;
end;
end; o := TDBExpressQueueItem.Create; //SQL敷值
o.ExecuteQuery := true;
o.QueryObject := Query;
o.FDataSetProvider := DSP;
o.FData := Delta;
Queue.Add(o);
try
while not o.IsReady do
Application.ProcessMessages;
Result := o.FNErr;
finally
o.Free;
end;
exit;
end;
try
Query.SQLConnection := SQLConnection;
DSP.UpdateMode := upWhereKeyOnly;
DSP.DataSet := Query;
DSP.ApplyUpdates(Delta, 0, nErr);
result := nErr;
finally
FDBExpressConnPool.ReleaseDB(SQLConnection);
end;
DSP.Free;
Query.Free;
end;procedure TDBExpressQueryQueue.Notification(AComponent: TComponent;
Operation: TOperation);
begin
inherited;
if (Operation = opRemove) then
begin
if (AComponent = FDBExpressConnPool) then
FDBExpressConnPool := nil;
end;
end;function TDBExpressQueryQueue.OpenSQL(Text: string): OleVariant;
var
SQLConnection : TSQLConnection;
o : TDBExpressQueueItem;
Query: TSQLQuery;
DSP: TAdvDataSetProvider;
begin
Query := TSQLQuery.Create(nil);
DSP := TAdvDataSetProvider.Create(nil);
DSP.DataSet := Query;
try
SQLConnection := FDBExpressConnPool.AcquireDB;
except
with Queue.LockList do
try
if Count >= FDBExpressConnPool.MaxConnections*FMaxQueueFactor then
raise Exception.Create('The database queue is full. Please try again in a few seconds.');
finally
Queue.UnlockList;
end; with QueueManagerList.LockList do
begin
try
if Count = 0 then
QueueManagerList.Add(TDBExpressQueueManager.Create(Self));
finally
QueueManagerList.UnlockList;
end;
end;
o := TDBExpressQueueItem.Create;
o.Text := Text;
o.ExecuteQuery := False;
o.QueryObject := Query;
o.FDataSetProvider := DSP;
Queue.Add(o);
try
while not o.IsReady do
Application.ProcessMessages;
Result := o.FData;
finally
o.Free;
end;
exit;
end;
try
Query.SQLConnection := SQLConnection;
Query.SQL.Text := Text;
DSP.DataSet := Query ;
Query.Open;
Query.First;
result := DSP.Data;
finally
FDBExpressConnPool.ReleaseDB(SQLConnection);
end;
DSP.Free;
Query.Free;
end;procedure TDBExpressQueryQueue.SetDBExpressDatabasePool(
const Value: TDBExpressConnPool);
begin
FDBExpressConnPool := Value;
if assigned(FDBExpressConnPool) then
begin
FDBExpressConnPool.FreeNotification(self);
end;
end;procedure TDBExpressQueryQueue.SetQueueManagers(const Value: integer);
var
i : integer;
begin
if Value <=0 then raise Exception.Create('There must be a positive number of Queue Managers');
if csDesigning in ComponentState then
begin
FQueueManagers := Value;
exit;
end;
if (FQueueManagers < Value) or (csLoading in ComponentState) then
begin
for i := FQueueManagers to Value do
begin
QueueManagerList.Add(TDBExpressQueueManager.Create(Self));
end;
end else
begin
if FQueueManagers > Value then
begin
with QueueManagerList.LockList do
try
while Count < Value do
begin
TDBExpressQueueManager(Items[Count-1]).Terminate;
Delete(Count-1);
end;
finally
QueueManagerList.UnlockList;
end;
end;
end;
end;{ TAdvDataSetProvider }procedure TAdvDataSetProvider.OnBeforeUpdateRecord(Sender: TObject;
SourceDS: TDataSet; DeltaDS: TCustomClientDataSet;
UpdateKind: TUpdateKind; var Applied: Boolean);
var
iCount, j: integer;
flags: TProviderFlags;
v: OleVariant;
begin
if (VarIsArray(FProviderFlagsParams)) and (not VarIsNull(FProviderFlagsParams)) then
for iCount := VarArrayLowBound(FProviderFlagsParams, 1) to VarArrayHighBound(FProviderFlagsParams, 1) do
begin
flags := [];
//v := VarArrayCreate([0,3],varVariant);
v := FProviderFlagsParams[iCount][1];
if (VarIsArray(v)) and (not VarIsNull(v)) then
begin
for j := VarArrayLowBound(v, 1) to VarArrayHighBound(v, 1) do
begin
if v[j] = 'pfInUpdate' then
flags := flags + [pfInUpdate];
if v[j] = 'pfInWhere' then
flags := flags + [pfInWhere];
if v[j] = 'pfInKey' then
flags := flags + [pfInKey];
if v[j] = 'pfHidden' then
flags := flags + [pfHidden];
end;
end;
DeltaDS.FieldByName(FProviderFlagsParams[iCount][0]).ProviderFlags := flags;
end;
end;constructor TAdvDataSetProvider.Create(AOwner: TComponent);
begin
inherited;
OnGetDataSetProperties := GetDataSetProperties;
BeforeUpdateRecord := OnBeforeUpdateRecord;
UpdateMode := upWhereKeyOnly;
end;procedure TAdvDataSetProvider.GetDataSetProperties(Sender: TObject;
DataSet: TDataSet; out Properties: OleVariant);
var
v: OLEVariant;
begin
Properties := VarArrayCreate([0,0],varVariant);
v := VarArrayCreate([0,2],varVariant);
v[0] := szTABLE_NAME;
v[1] := GetTableNameFromQuery(TSQLQuery(DataSet).SQL.Text);
v[2] := true;
Properties[0] := v;
end;end.
调用
initialization
TAutoObjectFactory.Create(ComServer, TServerCPMISDB, Class_ServerCPMISDB,
ciMultiInstance, tmBoth);
FDBExpressConnPool := TDBExpressConnPool.Create(nil);
FDBExpressQueryQueue := TDBExpressQueryQueue.Create(FDBExpressConnPool);
FDBExpressQueryQueue.DBExpressDatabasePool := FDBExpressConnPool;
FDBExpressQueryQueue.MaxQueueFactor := 5;
//FDBExpressQueryQueue.QueueManagers := 1;
finalization
FDBExpressConnPool.CloseAll;
FDBExpressConnPool.Free;
end.
newc_k(帕拉丁):还能给点注释吗?
再过两天结帖,一定重谢各位!!