using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;namespace HanaPac.Jobs
{
    public class JobPool : HanaPac.Log.IStaticLog
    {
        // create some number of thread for jobs
        public const int MaxThreads = 20;
        public const int WorkingTimeout = 5 * 1000;
        
        // FIXME : Are we using thread array? or threadpool?
        //         If we use threadpool, we need to set the maximum number of threads somewhere
        private static System.Threading.Thread[] _WorkThreadPool = new System.Threading.Thread[MaxThreads];
        private static Job[] _WorkThreadJob = new Job[MaxThreads];        private static System.Collections.ArrayList _JobPool = new System.Collections.ArrayList();
        private static Thread _MainThread;
       _        public delegate void JobHandler(Job job);
        public static JobHandler DoJob = null;        public static void Start()
        {
            if (_MainThread != null &&
                _MainThread.ThreadState != ThreadState.Unstarted &&
                _MainThread.ThreadState == ThreadState.Stopped)
                throw new Exception("Cannot start thread");            _MainThread = new Thread(new ThreadStart(DispatchThread));
            _MainThread.Start();
        }
       
        public static void Stop()
        {
            for (Int32 i = 0; i < _WorkThreadPool.Length; i++)
            {
                try
                {
                    _WorkThreadPool[i].Abort();
                }
                catch { }
            }
            _MainThread.Abort();
        }        private static void DispatchThread()
        {
            while (true)
            {
                Job job = null;
                Int32 threadToRun = -1;
                lock (_JobPool.SyncRoot)
                {
                    while (true)
                    {
                        threadToRun = -1;
                        if (_JobPool.Count > 0)
                        {
                            for (Int32 i = 0; i < _WorkThreadPool.Length; i++)
                            {
                                try
                                {
                                    if (_WorkThreadPool[i] == null ||
                                        _WorkThreadPool[i].ThreadState == ThreadState.Unstarted ||
                                        _WorkThreadPool[i].ThreadState == ThreadState.Stopped)
                                    {
                                        job = (Job)_JobPool[0];
                                        _JobPool.RemoveAt(0);
                                        threadToRun = i;
                                        break;
                                    }
                                }
                                catch { }
                            }
                        }
                        else
                            break;                        if (job != null && threadToRun >= 0)
                        {
                            job._ThreadID = threadToRun;
                            _WorkThreadPool[threadToRun] = new Thread(new ParameterizedThreadStart(DoJobThread));
                            _WorkThreadPool[threadToRun].Start(job);
                            _WorkThreadJob[threadToRun] = job;
                        }
                        else
                            break;
                    }
                }
                Thread.Sleep(5);
            }
        }        private static void DoJobThread(Object obj)
        {
            if (DoJob == null)
                throw new Exception("HanaPac.Jobs.JobPool.DoJob is not implemented");            Job job = obj as Job;            TimeSpan timeWaited = DateTime.Now - job.TimeAdded;
            job._TimeDispatched = DateTime.Now;
            Logger.WriteLine("JobPool[" + job._ThreadID.ToString() + "]>> Job(" + job.ID + ") dispatched " +
                    "(waiting time : " + timeWaited.TotalSeconds.ToString() + " sec)");
            
            DoJob(job.Clone());            TimeSpan timeProcess = DateTime.Now - job.TimeDispatched;
            Logger.WriteLine("JobPool[" + job._ThreadID.ToString() + "]>> Job(" + job.ID + ") done " +
                    "(processing time : " + timeProcess.TotalSeconds.ToString() + " sec)");            _WorkThreadPool[job._ThreadID] = null;
        }        public static void AddJob(Job job)
        {
            if (IsSameJobHere(job))
                return;
            lock (_JobPool.SyncRoot)
            {
                job._TimeAdded = DateTime.Now;
                _JobPool.Add(job);
            }
            Logger.Write(String.Format("JobPool[{0} Jobs] - Added -- \r\n", _JobPool.Count) + job + "--\r\n");
        }        // check if there is same job here.
        // for "SystemDataReceived", before taking care of job, new data might arrive.
        private static bool IsSameJobHere(Job job)
        {
            lock (_JobPool.SyncRoot)
            {
                foreach (Job curJob in _JobPool)
                {
                    if (curJob.ID == job.ID && curJob.Stream == job.Stream)
                        return true;
                }
            }
            return false;
        }        public static Job GetJobById(String id)
        {
            lock (_JobPool.SyncRoot)
            {
                foreach (Job curJob in _JobPool)
                {
                    if (curJob.ID == id)
                        return curJob;
                }
            }
            return null;
        }        public static void RemoveJob(Job job)
        {
            lock (_JobPool.SyncRoot)
            {
                for (Int32 i = 0; i < _JobPool.Count; )
                {
                    if ((Job)_JobPool[i] == job)
                        _JobPool.Remove(_JobPool[i]);
                    else
                        i++;
                }
            }
        }        public static void RemoveJobById(String id)
        {
            lock (_JobPool.SyncRoot)
            {
                System.Collections.ArrayList willRemoveList = new System.Collections.ArrayList();
                for (Int32 i = 0; i < _JobPool.Count; i++)
                {
                    if (((Job)_JobPool[i]).ID == id)
                        willRemoveList.Add(_JobPool[i]);
                }                foreach (object obj in willRemoveList)
                {
                    _JobPool.Remove(obj);
                }
            }
        }        public static void RemoveJobsByStream(Communication.Stream.IStream stream)
        {
            lock (_JobPool.SyncRoot)
            {
                System.Collections.ArrayList willRemoveList = new System.Collections.ArrayList();
                for (Int32 i = 0; i < _JobPool.Count; i++)
                {
                    if (((Job)_JobPool[i]).Stream == stream)
                        willRemoveList.Add(_JobPool[i]);
                }                foreach (object obj in willRemoveList)
                {
                    _JobPool.Remove(obj);
                }
            }
            // need to kill the job is now running
            lock (_JobPool.SyncRoot)
            {
                System.Collections.ArrayList willRemoveList = new System.Collections.ArrayList();
                for (Int32 i = 0; i < _WorkThreadJob.Length; i++)
                {
                    if (_WorkThreadJob[i] == null)
                        continue;
                    if (_WorkThreadJob[i].Stream == stream)
                        willRemoveList.Add(i);
                }                foreach (Int32 i in willRemoveList)
                {
                    try
                    {
                        _WorkThreadPool[i].Abort();
                        Logger.Write(String.Format("JobPool[{0} Jobs] - Aborted -- \r\n", _JobPool.Count) + _WorkThreadJob[i] + "--\r\n");
                    }
                    catch { }
                }
            }
        }        public static void RemoveJobsByStreamId(String streamId)
        {
            lock (_JobPool.SyncRoot)
            {
                System.Collections.ArrayList willRemoveList = new System.Collections.ArrayList();
                for (Int32 i = 0; i < _JobPool.Count; i++)
                {
                    if (((Job)_JobPool[i]).Stream.ID == streamId)
                        willRemoveList.Add(_JobPool[i]);
                }                foreach (object obj in willRemoveList)
                {
                    _JobPool.Remove(obj);
                }
            }
            // need to kill the job is now running
            lock (_WorkThreadJob.SyncRoot)
            {
                System.Collections.ArrayList willRemoveList = new System.Collections.ArrayList();
                for (Int32 i = 0; i < _WorkThreadJob.Length; i++)
                {
                    if (_WorkThreadJob[i].Stream.ID == streamId)
                        willRemoveList.Add(i);
                }                foreach (Int32 i in willRemoveList)
                {
                    _WorkThreadPool[i].Abort();
                    Logger.Write(String.Format("JobPool[{0} Jobs] - #{1} Job Aborted -- \r\n", _JobPool.Count, i) + _WorkThreadJob[i] + "--\r\n");
                }
            }
        }
    }
}
将Thread[] 用线程池实现,,,不会,,999999,谢谢