unit uThreadPool; { aPool.AddRequest(TMyRequest.Create(RequestParam1, RequestParam2, ...)); } interface uses Windows, Classes; // 是否记录日志 // {$DEFINE NOLOGS} type TCriticalSection = class(TObject) protected FSection: TRTLCriticalSection; public constructor Create; destructor Destroy; override; // 进入临界区 procedure Enter; // 离开临界区 procedure Leave; // 尝试进入 function TryEnter: Boolean; end; type // 储存请求数据的基本类 TWorkItem = class(TObject) protected public { 添加关注传递的信息 } SinaUser:string; SinaPwd:string; Cookie:string; SinaUserUid:string; FI:integer; // 是否有重复任务 function IsTheSame(DataObj: TWorkItem): Boolean; virtual; // 如果 NOLOGS 被定义,则禁用。 function TextForLog: string; virtual; end; type TThreadsPool = class; //线程状态 TThreadState = (tcsInitializing, tcsWaiting, tcsGetting, tcsProcessing, tcsProcessed, tcsTerminating, tcsCheckingDown); // 工作线程仅用于线程池内, 不要直接创建并调用它。 TProcessorThread = class(TThread) private // 创建线程时临时的Event对象, 阻塞线程直到初始化完成 hInitFinished: THandle; // 初始化出错信息 sInitError: string; // 记录日志 procedure WriteLog(const Str: string; Level: Integer = 0); protected // 线程临界区同步对像 csProcessingDataObject: TCriticalSection; // 平均处理时间 FAverageProcessing: Integer; // 等待请求的平均时间 FAverageWaitingTime: Integer; // 本线程实例的运行状态 FCurState: TThreadState; // 本线程实例所附属的线程池 FPool: TThreadsPool; // 当前处理的数据对像。 FProcessingDataObject: TWorkItem; // 线程停止 Event, TProcessorThread.Terminate 中开绿灯 hThreadTerminated: THandle; uProcessingStart: DWORD; // 开始等待的时间, 通过 GetTickCount 取得。 uWaitingStart: DWORD; // 计算平均工作时间 function AverageProcessingTime: DWORD; // 计算平均等待时间 function AverageWaitingTime: DWORD; procedure Execute; override; function IamCurrentlyProcess(DataObj: TWorkItem): Boolean; // 转换枚举类型的线程状态为字串类型 function InfoText: string; // 线程是否长时间处理同一个请求?(已死掉?) function IsDead: Boolean; // 线程是否已完成当成任务 function isFinished: Boolean; // 线程是否处于空闲状态 function isIdle: Boolean; // 平均值校正计算。 function NewAverage(OldAvg, NewVal: Integer): Integer; public Tag: Integer; constructor Create(APool: TThreadsPool); destructor Destroy; override; procedure Terminate; end; // 线程初始化时触发的事件 TProcessorThreadInitializing = procedure(Sender: TThreadsPool; aThread: TProcessorThread) of object; // 线程结束时触发的事件 TProcessorThreadFinalizing = procedure(Sender: TThreadsPool; aThread: TProcessorThread) of object; // 线程处理请求时触发的事件 TProcessRequest = procedure(Sender: TThreadsPool; WorkItem: TWorkItem; aThread: TProcessorThread) of object; TEmptyKind = ( ekQueueEmpty, //任务被取空后 ekProcessingFinished // 最后一个任务处理完毕后 ); // 任务队列空时触发的事件 TQueueEmpty = procedure(Sender: TThreadsPool; EmptyKind: TEmptyKind) of object; TThreadsPool = class(TComponent) private csQueueManagment: TCriticalSection; csThreadManagment: TCriticalSection; FProcessRequest: TProcessRequest; FQueue: TList; FQueueEmpty: TQueueEmpty; // 线程超时阀值 FThreadDeadTimeout: DWORD; FThreadFinalizing: TProcessorThreadFinalizing; FThreadInitializing: TProcessorThreadInitializing; // 工作中的线程 FThreads: TList; // 执行了 terminat 发送退出指令, 正在结束的线程. FThreadsKilling: TList; // 最少, 最大线程数 FThreadsMax: Integer; // 最少, 最大线程数 FThreadsMin: Integer; // 线程池等待事件 FWaitEvent: THandle; // 线程请求数量 RequestCount:Integer; // 池平均等待时间 function PoolAverageWaitingTime: Integer; procedure WriteLog(const Str: string; Level: Integer = 0); protected FLastGetPoint: Integer; // Semaphore, 统计任务队列 hSemRequestCount: THandle; // Waitable timer. 每30触发一次的时间量同步 hTimCheckPoolDown: THandle; // 线程池停机(检查并清除空闲线程和死线程) procedure CheckPoolDown; // 清除死线程,并补充不足的工作线程 procedure CheckThreadsForGrow; procedure DoProcessed; procedure DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread); virtual; procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual; procedure DoThreadFinalizing(aThread: TProcessorThread); virtual; // 执行事件 procedure DoThreadInitializing(aThread: TProcessorThread); virtual; // 释放 FThreadsKilling 列表中的线程 procedure FreeFinishedThreads; // 申请任务 procedure GetRequest(out Request: TWorkItem); // 清除死线程 procedure KillDeadThreads; //检测线程状态 //返回值:0-已释放;1-正在运行;2-已终止但未释放 function CheckThreadFreed(aThread: TThread): Byte; public constructor Create(AOwner: TComponent); override; destructor Destroy; override; // 就进行任务是否重复的检查, 检查发现重复就返回 False function AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean = False): Boolean; overload; // 转换枚举类型的线程状态为字串类型 function InfoText: string; procedure WaitFor; published // 线程处理任务时触发的事件 property OnProcessRequest: TProcessRequest read FProcessRequest write FProcessRequest; // 任务列表为空时解发的事件 property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty; // 线程结束时触发的事件 property OnThreadFinalizing: TProcessorThreadFinalizing read FThreadFinalizing write FThreadFinalizing; // 线程初始化时触发的事件 property OnThreadInitializing: TProcessorThreadInitializing read FThreadInitializing write FThreadInitializing; // 线程超时值(毫秒), 如果处理超时,将视为死线程 property ThreadDeadTimeout: DWORD read FThreadDeadTimeout write FThreadDeadTimeout default 0; // 最大线程数 property ThreadsMax: Integer read FThreadsMax write FThreadsMax default 1; // 最小线程数 property ThreadsMin: Integer read FThreadsMin write FThreadsMin default 0; end; type //日志记志函数 TLogWriteProc = procedure( const Str: string; //日志 LogID: Integer = 0; Level: Integer = 0 //Level = 0 - 跟踪信息, 10 - 致命错误 ); var WriteLog: TLogWriteProc; // 如果存在实例就写日志 implementation uses SysUtils; // 储存请求数据的基本类 { ********************************** TWorkItem *********************************** } function TWorkItem.IsTheSame(DataObj: TWorkItem): Boolean; begin Result := False; end; { TWorkItem.IsTheSame } function TWorkItem.TextForLog: string; begin Result := 'Request'; end; { TWorkItem.TextForLog } { ********************************* TThreadsPool ********************************* } constructor TThreadsPool.Create(AOwner: TComponent); var DueTo: Int64; begin {$IFNDEF NOLOGS} WriteLog('创建线程池', 5); {$ENDIF} inherited; csQueueManagment := TCriticalSection.Create; FQueue := TList.Create; csThreadManagment := TCriticalSection.Create; FThreads := TList.Create; FThreadsKilling := TList.Create; FThreadsMin := 0; FThreadsMax := 1; FThreadDeadTimeout := 0; FLastGetPoint := 0; // hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil); DueTo := -1; //可等待的定时器(只用于Window NT4或更高) hTimCheckPoolDown := CreateWaitableTimer(nil, False, nil); if hTimCheckPoolDown = 0 then // Win9x不支持 // In Win9x number of thread will be never decrised hTimCheckPoolDown := CreateEvent(nil, False, False, nil) else SetWaitableTimer(hTimCheckPoolDown, DueTo, 30000, nil, nil, False); end; { TThreadsPool.Create } destructor TThreadsPool.Destroy; var n, i: Integer; Handles: array of THandle; begin {$IFNDEF NOLOGS} WriteLog('线程池销毁', 5); {$ENDIF} csThreadManagment.Enter; SetLength(Handles, FThreads.Count); n := 0; for i := 0 to FThreads.Count - 1 do if FThreads[i] <> nil then begin Handles[n] := TProcessorThread(FThreads[i]).Handle; TProcessorThread(FThreads[i]).Terminate; Inc(n); end; WaitForMultipleObjects(n, @Handles[0], True, 30000); for i := 0 to FThreads.Count - 1 do TProcessorThread(FThreads[i]).Free; FThreads.Free; FThreadsKilling.Free; csThreadManagment.Free; csQueueManagment.Enter; for i := FQueue.Count - 1 downto 0 do TObject(FQueue[i]).Free; FQueue.Free; csQueueManagment.Free; CloseHandle(hSemRequestCount); CloseHandle(hTimCheckPoolDown); inherited; end; { TThreadsPool.Destroy } procedure TThreadsPool.WaitFor; begin FWaitEvent := CreateEvent(nil, True, False, nil); WaitForSingleObject( FWaitEvent, INFINITE ); CloseHandle( FWaitEvent ); //csThreadManagment.Leave; // SetLength(Handles, FThreads.Count); // n := 0; // csThreadManagment.Enter; // for i := 0 to FThreads.Count - 1 do // begin // if FThreads[i] <> nil then // begin // //ThreadState := CheckThreadFreed( TProcessorThread( FThreads[i]) ); // // if not TProcessorThread(FThreads[i]).Terminated then // begin // Handles[n] := TProcessorThread( FThreads[i]).Handle; // Inc( n ); // end; // end; // end; //WaitForMultipleObjects( n, @Handles[0], True, INFINITE ); end; function TThreadsPool.AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean = False): Boolean; var i: Integer; begin {$IFNDEF NOLOGS} WriteLog('AddRequest(' + aDataObject.TextForLog + ')', 2); {$ENDIF} Result := False; csQueueManagment.Enter; try // 如果 CheckForDoubles = TRUE // 则进行任务是否重复的检查 if CheckForDoubles then for i := 0 to FQueue.Count - 1 do if (FQueue[i] <> nil) and aDataObject.IsTheSame(TWorkItem(FQueue[i])) then Exit; // 发现有相同的任务 csThreadManagment.Enter; try // 清除死线程,并补充不足的工作线程 CheckThreadsForGrow; // 如果 CheckForDoubles = TRUE // 则检查是否有相同的任务正在处理中 if CheckForDoubles then for i := 0 to FThreads.Count - 1 do if TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject) then Exit; // 发现有相同的任务 finally csThreadManagment.Leave; end; //将任务加入队列 FQueue.Add(aDataObject); //释放一个同步信号量 ReleaseSemaphore(hSemRequestCount, 1, nil); {$IFNDEF NOLOGS} WriteLog('释放一个同步信号量)', 1); {$ENDIF} Result := True; finally csQueueManagment.Leave; end; {$IFNDEF NOLOGS} //调试信息 WriteLog('增加一个任务(' + aDataObject.TextForLog + ')', 1); {$ENDIF} //线程请求数加1 Inc( RequestCount ); end; { TThreadsPool.AddRequest } { 函 数 名:TThreadsPool.CheckPoolDown 功能描述:线程池停机(检查并清除空闲线程和死线程) 输入参数:无 返 回 值: 无 创建日期:2006.10.22 11:31 修改日期:2006. 作 者:Kook 附加说明: } procedure TThreadsPool.CheckPoolDown; var i: Integer; begin {$IFNDEF NOLOGS} WriteLog('TThreadsPool.CheckPoolDown', 1); {$ENDIF} csThreadManagment.Enter; try {$IFNDEF NOLOGS} WriteLog(InfoText, 2); {$ENDIF} // 清除死线程 KillDeadThreads; // 释放 FThreadsKilling 列表中的线程 FreeFinishedThreads; // 如果线程空闲,就终止它 for i := FThreads.Count - 1 downto FThreadsMin do if TProcessorThread(FThreads[i]).isIdle then begin //发出终止命令 TProcessorThread(FThreads[i]).Terminate; //加入待清除队列 FThreadsKilling.Add(FThreads[i]); //从工作队列中除名 FThreads.Delete(i); //todo: ?? Break; end; finally csThreadManagment.Leave; end; end; { TThreadsPool.CheckPoolDown } { 函 数 名:TThreadsPool.CheckThreadsForGrow 功能描述:清除死线程,并补充不足的工作线程 输入参数:无 返 回 值: 无 创建日期:2006.10.22 11:31 修改日期:2006. 作 者:Kook 附加说明: } procedure TThreadsPool.CheckThreadsForGrow; var AvgWait: Integer; i: Integer; begin { New thread created if: 新建线程的条件: 1. 工作线程数小于最小线程数 2. 工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms(系统忙) 3. 任务大于工作线程数的4倍 } csThreadManagment.Enter; try KillDeadThreads; if FThreads.Count < FThreadsMin then begin {$IFNDEF NOLOGS} WriteLog('工作线程数小于最小线程数', 4); {$ENDIF} for i := FThreads.Count to FThreadsMin - 1 do try FThreads.Add(TProcessorThread.Create(Self)); except on e: Exception do WriteLog( 'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: ' + e.Message, 9 ); end end else if FThreads.Count < FThreadsMax then begin {$IFNDEF NOLOGS} WriteLog('工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms', 3); {$ENDIF} AvgWait := PoolAverageWaitingTime; {$IFNDEF NOLOGS} WriteLog(Format( 'FThreads.Count (%d)<FThreadsMax(%d), AvgWait=%d', [FThreads.Count, FThreadsMax, AvgWait]), 4 ); {$ENDIF} if AvgWait < 100 then try FThreads.Add(TProcessorThread.Create(Self)); except on e: Exception do WriteLog( 'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: ' + e.Message, 9 ); end; end; finally csThreadManagment.Leave; end; end; { TThreadsPool.CheckThreadsForGrow } procedure TThreadsPool.DoProcessed; var i: Integer; begin if (FLastGetPoint < FQueue.Count) then Exit; csThreadManagment.Enter; try for i := 0 to FThreads.Count - 1 do if TProcessorThread(FThreads[i]).FCurState in [tcsProcessing] then Exit; finally csThreadManagment.Leave; end; DoQueueEmpty(ekProcessingFinished); end; { TThreadsPool.DoProcessed } procedure TThreadsPool.DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread); begin if Assigned(FProcessRequest) then FProcessRequest(Self, aDataObj, aThread); end; { TThreadsPool.DoProcessRequest } procedure TThreadsPool.DoQueueEmpty(EmptyKind: TEmptyKind); begin if Assigned(FQueueEmpty) then FQueueEmpty(Self, EmptyKind); end; { TThreadsPool.DoQueueEmpty } procedure TThreadsPool.DoThreadFinalizing(aThread: TProcessorThread); begin if Assigned(FThreadFinalizing) then FThreadFinalizing(Self, aThread); end; { TThreadsPool.DoThreadFinalizing } procedure TThreadsPool.DoThreadInitializing(aThread: TProcessorThread); begin if Assigned(FThreadInitializing) then FThreadInitializing(Self, aThread); end; { TThreadsPool.DoThreadInitializing } { 函 数 名:TThreadsPool.FreeFinishedThreads 功能描述:释放 FThreadsKilling 列表中的线程 输入参数:无 返 回 值: 无 创建日期:2006.10.22 11:34 修改日期:2006. 作 者:Kook 附加说明: } procedure TThreadsPool.FreeFinishedThreads; var i: Integer; begin if csThreadManagment.TryEnter then try for i := FThreadsKilling.Count - 1 downto 0 do if TProcessorThread(FThreadsKilling[i]).isFinished then begin TProcessorThread(FThreadsKilling[i]).Free; FThreadsKilling.Delete(i); end; finally csThreadManagment.Leave end; end; { TThreadsPool.FreeFinishedThreads } { 函 数 名:TThreadsPool.GetRequest 功能描述:申请任务 输入参数:out Request: TRequestDataObject 返 回 值: 无 创建日期:2006.10.22 11:34 修改日期:2006. 作 者:Kook 附加说明: } procedure TThreadsPool.GetRequest(out Request: TWorkItem); begin {$IFNDEF NOLOGS} WriteLog('申请任务', 2); {$ENDIF} csQueueManagment.Enter; try //跳过空的队列元素 while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do Inc(FLastGetPoint); Assert(FLastGetPoint < FQueue.Count); //压缩队列,清除空元素 if (FQueue.Count > 127) and (FLastGetPoint >= (3 * FQueue.Count) div 4) then begin {$IFNDEF NOLOGS} WriteLog('FQueue.Pack', 1); {$ENDIF} FQueue.Pack; FLastGetPoint := 0; end; Request := TWorkItem(FQueue[FLastGetPoint]); FQueue[FLastGetPoint] := nil; inc(FLastGetPoint); if (FLastGetPoint = FQueue.Count) then //如果队列中无任务 begin DoQueueEmpty(ekQueueEmpty); FQueue.Clear; FLastGetPoint := 0; end; finally csQueueManagment.Leave; end; end; { TThreadsPool.GetRequest } function TThreadsPool.InfoText: string; begin Result := ''; //end; //{$ELSE} //var // i: Integer; //begin // csQueueManagment.Enter; // csThreadManagment.Enter; // try // if (FThreads.Count = 0) and (FThreadsKilling.Count = 1) and // TProcessorThread(FThreadsKilling[0]).isFinished then // FreeFinishedThreads; // // Result := Format( // 'Pool thread: Min=%d, Max=%d, WorkingThreadsCount=%d, TerminatedThreadCount=%d, QueueLength=%d'#13#10, // [ThreadsMin, ThreadsMax, FThreads.Count, FThreadsKilling.Count, // FQueue.Count] // ); // if FThreads.Count > 0 then // Result := Result + 'Working threads:'#13#10; // for i := 0 to FThreads.Count - 1 do // Result := Result + TProcessorThread(FThreads[i]).InfoText + #13#10; // if FThreadsKilling.Count > 0 then // Result := Result + 'Terminated threads:'#13#10; // for i := 0 to FThreadsKilling.Count - 1 do // Result := Result + TProcessorThread(FThreadsKilling[i]).InfoText + #13#10; // finally // csThreadManagment.Leave; // csQueueManagment.Leave; // end; //end; //{$ENDIF} end; { TThreadsPool.InfoText } { 函 数 名:TThreadsPool.KillDeadThreads 功能描述:清除死线程 输入参数:无 返 回 值: 无 创建日期:2006.10.22 11:32 修改日期:2006. 作 者:Kook 附加说明: } procedure TThreadsPool.KillDeadThreads; var i: Integer; begin // Check for dead threads if csThreadManagment.TryEnter then try for i := 0 to FThreads.Count - 1 do if TProcessorThread(FThreads[i]).IsDead then begin // Dead thread moverd to other list. // New thread created to replace dead one TProcessorThread(FThreads[i]).Terminate; FThreadsKilling.Add(FThreads[i]); try FThreads[i] := TProcessorThread.Create(Self); except on e: Exception do begin FThreads[i] := nil; {$IFNDEF NOLOGS} WriteLog( 'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: ' + e.Message, 9 ); {$ENDIF} end; end; end; finally csThreadManagment.Leave end; end; { TThreadsPool.KillDeadThreads } function TThreadsPool.PoolAverageWaitingTime: Integer; var i: Integer; begin Result := 0; if FThreads.Count > 0 then begin for i := 0 to FThreads.Count - 1 do Inc(result, TProcessorThread(FThreads[i]).AverageWaitingTime); Result := Result div FThreads.Count end else Result := 1; end; { TThreadsPool.PoolAverageWaitingTime } procedure TThreadsPool.WriteLog(const Str: string; Level: Integer = 0); begin {$IFNDEF NOLOGS} uThreadPool.WriteLog(Str, 0, Level); {$ENDIF} end; { TThreadsPool.WriteLog } // 工作线程仅用于线程池内, 不要直接创建并调用它。 { ******************************* TProcessorThread ******************************* } constructor TProcessorThread.Create(APool: TThreadsPool); begin WriteLog('创建工作线程', 5); inherited Create(True); FPool := aPool; FAverageWaitingTime := 1000; FAverageProcessing := 3000; sInitError := ''; { 各参数的意义如下:   参数一:填上 nil 即可。 参数二:是否采用手动调整灯号。 参数三:灯号的起始状态,False 表示红灯。 参数四:Event 名称, 对象名称相同的话,会指向同一个对象,所以想要有两个Event对象,便要有两个不同的名称(这名称以字符串来存.为NIL的话系统每次会自己创建一个不同的名字,就是被次创建的都是新的EVENT)。 传回值:Event handle。 } hInitFinished := CreateEvent(nil, True, False, nil); hThreadTerminated := CreateEvent(nil, True, False, nil); csProcessingDataObject := TCriticalSection.Create; try WriteLog('TProcessorThread.Create::Resume', 3); Resume; //阻塞, 等待初始化完成 WaitForSingleObject(hInitFinished, INFINITE); if sInitError <> '' then raise Exception.Create(sInitError); finally CloseHandle(hInitFinished); end; WriteLog('TProcessorThread.Create::Finished', 3); end; { TProcessorThread.Create } destructor TProcessorThread.Destroy; begin WriteLog('工作线程销毁', 5); CloseHandle(hThreadTerminated); csProcessingDataObject.Free; inherited; end; { TProcessorThread.Destroy } function TProcessorThread.AverageProcessingTime: DWORD; begin if (FCurState in [tcsProcessing]) then Result := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart) else Result := FAverageProcessing end; { TProcessorThread.AverageProcessingTime } function TProcessorThread.AverageWaitingTime: DWORD; begin if (FCurState in [tcsWaiting, tcsCheckingDown]) then Result := NewAverage(FAverageWaitingTime, GetTickCount - uWaitingStart) else Result := FAverageWaitingTime end; { TProcessorThread.AverageWaitingTime } procedure TProcessorThread.Execute; type THandleID = (hidTerminateThread, hidRequest, hidCheckPoolDown); var WaitedTime: Integer; Handles: array[THandleID] of THandle; begin WriteLog('工作线程进常运行', 3); //当前状态:初始化 FCurState := tcsInitializing; try //执行外部事件 FPool.DoThreadInitializing(Self); except on e: Exception do sInitError := e.Message; end; //初始化完成,初始化Event绿灯 SetEvent(hInitFinished); WriteLog('TProcessorThread.Execute::Initialized', 3); //引用线程池的同步 Event Handles[hidTerminateThread] := hThreadTerminated; Handles[hidRequest] := FPool.hSemRequestCount; Handles[hidCheckPoolDown] := FPool.hTimCheckPoolDown; //时间戳, //todo: 好像在线程中用 GetTickCount; 会不正常 uWaitingStart := GetTickCount; //任务置空 FProcessingDataObject := nil; //大巡环 while not terminated do begin //当前状态:等待 FCurState := tcsWaiting; //阻塞线程,使线程休眠 case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) - WAIT_OBJECT_0 of WAIT_OBJECT_0 + ord(hidTerminateThread): begin WriteLog('TProcessorThread.Execute:: Terminate event signaled ', 5); //当前状态:正在终止线程 FCurState := tcsTerminating; //退出大巡环(结束线程) Break; end; WAIT_OBJECT_0 + ord(hidRequest): begin WriteLog('TProcessorThread.Execute:: Request semaphore signaled ', 3); //等待的时间 WaitedTime := GetTickCount - uWaitingStart; //重新计算平均等待时间 FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime); //当前状态:申请任务 FCurState := tcsGetting; //如果等待时间过短,则检查工作线程是否足够 if WaitedTime < 5 then FPool.CheckThreadsForGrow; //从线程池的任务队列中得到任务 FPool.GetRequest(FProcessingDataObject); //开始处理的时间戳 uProcessingStart := GetTickCount; //当前状态:执行任务 FCurState := tcsProcessing; try {$IFNDEF NOLOGS} WriteLog('Processing: ' + FProcessingDataObject.TextForLog, 2); {$ENDIF} //执行任务 FPool.DoProcessRequest(FProcessingDataObject, Self); except on e: Exception do WriteLog( 'OnProcessRequest for ' + FProcessingDataObject.TextForLog + #13#10'raise Exception: ' + e.Message, 8 ); end; //释放任务对象 csProcessingDataObject.Enter; try FProcessingDataObject.Free; FProcessingDataObject := nil; finally csProcessingDataObject.Leave; end; //重新计算 FAverageProcessing := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart); //当前状态:执行任务完毕 FCurState := tcsProcessed; //执行线程外事件 FPool.DoProcessed; uWaitingStart := GetTickCount; //线程请求数减1 Dec( FPool.RequestCount ); //如果所有线程请求都执行完毕 if FPool.RequestCount = 0 then begin SetEvent( FPool.FWaitEvent ); end; end; WAIT_OBJECT_0 + ord(hidCheckPoolDown): begin // !!! Never called under Win9x WriteLog('TProcessorThread.Execute:: CheckPoolDown timer signaled ', 4); //当前状态:线程池停机(检查并清除空闲线程和死线程) FCurState := tcsCheckingDown; FPool.CheckPoolDown; end; end; end; FCurState := tcsTerminating; FPool.DoThreadFinalizing(Self); end; { TProcessorThread.Execute } function TProcessorThread.IamCurrentlyProcess(DataObj: TWorkItem): Boolean; begin csProcessingDataObject.Enter; try Result := (FProcessingDataObject <> nil) and DataObj.IsTheSame(FProcessingDataObject); finally csProcessingDataObject.Leave; end; end; { TProcessorThread.IamCurrentlyProcess } function TProcessorThread.InfoText: string; const ThreadStateNames: array[TThreadState] of string = ( 'tcsInitializing', 'tcsWaiting', 'tcsGetting', 'tcsProcessing', 'tcsProcessed', 'tcsTerminating', 'tcsCheckingDown' ); begin {$IFNDEF NOLOGS} Result := Format( '%5d: %15s, AverageWaitingTime=%6d, AverageProcessingTime=%6d', [ThreadID, ThreadStateNames[FCurState], AverageWaitingTime, AverageProcessingTime] ); case FCurState of tcsWaiting: Result := Result + ', WaitingTime=' + IntToStr(GetTickCount - uWaitingStart); tcsProcessing: Result := Result + ', ProcessingTime=' + IntToStr(GetTickCount - uProcessingStart); end; csProcessingDataObject.Enter; try if FProcessingDataObject <> nil then Result := Result + ' ' + FProcessingDataObject.TextForLog; finally csProcessingDataObject.Leave; end; {$ENDIF} end; { TProcessorThread.InfoText } function TProcessorThread.IsDead: Boolean; begin Result := Terminated or (FPool.ThreadDeadTimeout > 0) and (FCurState = tcsProcessing) and (GetTickCount - uProcessingStart > FPool.ThreadDeadTimeout); if Result then WriteLog('Thread dead', 5); end; { TProcessorThread.IsDead } function TProcessorThread.isFinished: Boolean; begin Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0; end; { TProcessorThread.isFinished } function TProcessorThread.isIdle: Boolean; begin // 如果线程状态是 tcsWaiting, tcsCheckingDown // 并且 空间时间 > 100ms, // 并且 平均等候任务时间大于平均工作时间的 50% // 则视为空闲。 Result := (FCurState in [tcsWaiting, tcsCheckingDown]) and (AverageWaitingTime > 100) and (AverageWaitingTime * 2 > AverageProcessingTime); end; { TProcessorThread.isIdle } function TProcessorThread.NewAverage(OldAvg, NewVal: Integer): Integer; begin Result := (OldAvg * 2 + NewVal) div 3; end; { TProcessorThread.NewAverage } procedure TProcessorThread.Terminate; begin WriteLog('TProcessorThread.Terminate', 5); inherited Terminate; SetEvent(hThreadTerminated); end; { TProcessorThread.Terminate } procedure TProcessorThread.WriteLog(const Str: string; Level: Integer = 0); begin {$IFNDEF NOLOGS} uThreadPool.WriteLog(Str, ThreadID, Level); {$ENDIF} end; { TProcessorThread.WriteLog } { ******************************* TCriticalSection ******************************* } constructor TCriticalSection.Create; begin InitializeCriticalSection(FSection); end; { TCriticalSection.Create } destructor TCriticalSection.Destroy; begin DeleteCriticalSection(FSection); end; { TCriticalSection.Destroy } procedure TCriticalSection.Enter; begin EnterCriticalSection(FSection); end; { TCriticalSection.Enter } procedure TCriticalSection.Leave; begin LeaveCriticalSection(FSection); end; { TCriticalSection.Leave } function TCriticalSection.TryEnter: Boolean; begin Result := TryEnterCriticalSection(FSection); end; { TCriticalSection.TryEnter } procedure NoLogs(const Str: string; LogID: Integer = 0; Level: Integer = 0); begin end; function TThreadsPool.CheckThreadFreed(aThread: TThread): Byte; var i: DWord; IsQuit:Boolean; begin if Assigned(aThread) then begin IsQuit := GetExitCodeThread(aThread.Handle, i); if IsQuit then //If the function succeeds, the return value is nonzero. begin if i = STILL_ACTIVE then //If the specified thread has not terminated, Result := 1 else Result := 2; //aThread未Free,因为Tthread.Destroy中有执行语句 end else Result := 0; //可以用GetLastError取得错误代码 end else Result := 3; end; initialization WriteLog := NoLogs; end.

转载于:https://www.cnblogs.com/pcdelphi/archive/2011/04/15/2017939.html

为uThreadPool增加线程池等待功能相关推荐

  1. Hippo4J v1.3.1 发布,增加 Netty 监控上报、SpringCloud Hystrix 线程池监控等特性

    文章首发在公众号(龙台的技术笔记),之后同步到 CSDN 和个人网站:xiaomage.info Hippo4J v1.3.1 正式发布,本次发布增加了 Netty 上传动态线程池监控数据.适配 Hy ...

  2. python 进程池 等待数量_【2020Python修炼记】python并发编程(六)补充—进程池和线程池...

    1. 2. 为啥要有 进程池和线程池 进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数 3.创建进程池的类pool 如果指定numprocess为3,则进程池会从无到有创建三 ...

  3. mongodb线程池_常用高并发网络线程模型设计及MongoDB线程模型优化实践

    服务端通常需要支持高并发业务访问,如何设计优秀的服务端网络IO工作线程/进程模型对业务的高并发访问需求起着至关重要的核心作用. 本文总结了了不同场景下的多种网络IO线程/进程模型,并给出了各种模型的优 ...

  4. Rocksdb 的优秀代码(三)-- 工业级 线程池实现分享

    文章目录 前言 1. Rocksdb线程池概览 2. Rocksdb 线程池实现 2.1 基本数据结构 2.2 线程池创建 2.3 线程池 调度线程执行 2.4 线程池销毁线程 2.5 线程池优先级调 ...

  5. Java线程池实现原理及其在美团业务中的实践

    来自:美团技术团队 随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池ThreadPoolExecuto ...

  6. 浅谈线程池(中):独立线程池的作用及IO线程池

    在上一篇文章中,我们简单讨论了线程池的作用,以及CLR线程池的一些特性.不过关于线程池的基本概念还没有结束,这次我们再来补充一些必要的信息,有助于我们在程序中选择合适的使用方式. 独立线程池 上次我们 ...

  7. java——自己实现基础的线程池及带有任务数过多拒绝策略、线程池销毁、自动扩充线程数量及闲时自动回收线程等操作的改进版线程池

    1. 实现第一版基础的线程池 1.1 首先我们定义一个线程池类ThreadPool,然后线程池有一个容器存放我们创建的线程,另一个容器则是存放当前线程池需要处理的任务队列,线程容器用ArrayList ...

  8. 非常精简的Linux线程池实现(一)——使用互斥锁和条件变量

    https://blog.csdn.net/kxcfzyk/article/details/31719687 线程池的含义跟它的名字一样,就是一个由许多线程组成的池子. 有了线程池,在程序中使用多线程 ...

  9. 算法高级(9)-线程池的实现方式

    电脑的CPU资源是有限的,任务的处理速度与线程数量之间并不是正相关.当线程数量过多,CPU要频繁的在不同线程切换,反而会引起处理性能的下降.线程池中最大的线程数,是考虑多种因素来事先设定的,比如硬件的 ...

最新文章

  1. 【迁移学习(Transfer L)全面指南】2021年迁移学习发展现状及案例探究
  2. QTP自动化测试视频系列
  3. python水平_python水平
  4. python flask 分页前后端分离_flask展示pyecharts图表前后端分离的问题
  5. C++ 求一元二次方程的根
  6. highgui java opencv_OpenCV在C Qt应用程序中的highgui
  7. SCRUM Beta Day 3
  8. framework —— auth认证
  9. IOS开发中的几种设计模式介绍
  10. java 图片居中裁剪_Java图片居中裁剪代码详解
  11. 移动wap浏览器网页调试工具vconsole和eruda
  12. 苹果手机屏幕镜像搜索不到电视_康佳电视投屏不了,3个办法解决!
  13. 【强化学习】策略梯度(Policy Gradient)
  14. 【百战GAN】SRGAN人脸低分辨率老照片修复代码实战
  15. Ubuntu系统管理 —— 磁盘的初始化及自动挂载
  16. 计算机在职博士要考吗,在职博士容易考吗?
  17. 道氏理论:如何买入开仓和止损点设置?
  18. php字符串过滤qq号,qq互联获取到的昵称怎么过滤图案等字符
  19. 华兴资本CEO包凡:今年将更积极参与保荐人业务
  20. 5.类似mouse-click方法的替代方案(netlogo)

热门文章

  1. mingw控制台中文乱码
  2. maven添加oracle jdbc依赖
  3. SpringBoot 之 MVC
  4. Cs Round#56 D Find Path Union
  5. 【AHOI2005】病毒检测
  6. MAC配置DNS服务器
  7. pywin32的安装
  8. FastReport 使用入门
  9. linux查找技巧: find grep xargs amp;amp; linux系统信息查看大全
  10. [java语言]——InetAddress类的getByName()方法