我在项目中使用多线程处理数据遇到一些问题,请大家帮忙看看
具体实现如下:
一个主控制函数Main(),它主要用来分配线程参数和控制线程的启动
Main()示例代码如下:
for (int i = 0; i < threadNum; i++)
{
    ......根据线程数分配参数:beginTime,endTime(代码省略)    string[] objGpsTime= {beginGPSTime.ToString("HHmmss"), endGPSTime.ToString("HHmmss")};
   //启动线程
   ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyseData), objGpsTime);
}
一个数据处理函数AnalyseData(beginTime,endTime),它主要用来处理不同时间段的数据
AnalyseData主要做以下动作:
1、根据传入的时间参数从数据库中取相关数据
2、遍历取出的数据进行处理(主要是做一些空间分析)我发现不管用多少个线程最终数据处理完成的时间都是一样的
请大家帮忙分析一下什么原因

解决方案 »

  1.   

    “AnalyseData主要做以下动作: 
    1、根据传入的时间参数从数据库中取相关数据 ”数据库是否加了锁,什么样的锁,是否会导致多个线程实际上的线性执行?
    PS:最好是将数据一次头读出来,然后多线程来处理这些数据。
      

  2.   


    for (int i = 0; i < threadNum; i++) 

        ......根据线程数分配参数:beginTime,endTime(代码省略)     string[] objGpsTime= {beginGPSTime.ToString("HHmmss"), endGPSTime.ToString("HHmmss")}; 
      //启动线程 
      ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyseData), objGpsTime); 
      //时间片要注意释放
      Thread.Sleep(0);
      

  3.   

    锁在AnalyseData方法中互斥,对一些共享的数据进行保护就可以了。保护的代码越少效率越高。
      

  4.   

    我把整个代码贴出来大家帮忙看看~
    public void Main(int threadNum)
            {
                Thread[] threads = new Thread[threadNum];            OnlineFcdDs fcdDs = new OnlineFcdDs();
                FCDCarStatusDs carStatusDs = new FCDCarStatusDs();
                PubRoadConfigDs roadCfgDs = new PubRoadConfigDs();
                //取当前数据库服务器时间
                //DateTime currentDbTime = DateTime.Now;
                DateTime currentDbTime = new DateTime(2009, 10, 8, 00, 08, 00);
                //数据过滤
                //fcdBll.OnlineFilter(beginGPSTime, endGPSTime);            //如果线程数大于当前时间间隔则将线程数置为当前时间间隔的1/4
                if (threadNum >= AnalyseEdgeStatus_Interval)
                {
                    threadNum = (int)(AnalyseEdgeStatus_Interval / 4);
                }            // 获取线程池的最大线程数和维护的最小空闲线程数
                int maxThreadNum, portThreadNum;
                int minThreadNum;
                ThreadPool.GetMaxThreads(out maxThreadNum, out portThreadNum);
                ThreadPool.GetMinThreads(out minThreadNum, out portThreadNum);
                WriteLog(string.Format("最大线程数:", maxThreadNum));
                WriteLog(string.Format("最小空闲线程数:", minThreadNum));            //根据线程数分割时间节点
                int perGPSTime = AnalyseEdgeStatus_Interval / threadNum;
                int remNum = AnalyseEdgeStatus_Interval % threadNum;
                DateTime beginGPSTime, endGPSTime;
                for (int i = 0; i < threadNum; i++)
                {
                    if (i == 0)
                    {
                        beginGPSTime = currentDbTime.AddSeconds(i * perGPSTime);
                    }
                    else
                    {
                        beginGPSTime = currentDbTime.AddSeconds(i * perGPSTime + i);
                    }
                    //if (i==threadNum-1 && remNum > 0)
                    //{
                    //    endGPSTime = beginGPSTime.AddSeconds(perGPSTime + remNum);
                    //}
                    if (i == threadNum - 1)
                    {
                        endGPSTime = currentDbTime.AddSeconds(AnalyseEdgeStatus_Interval);
                    }
                    else
                    {
                        endGPSTime = beginGPSTime.AddSeconds(perGPSTime);
                    }
                    //调用地址匹配线程
                    //Thread fcdThread = new Thread(delegate() { AnalyseFCDThread(beginGPSTime.ToString("HHmmss"), endGPSTime.ToString("HHmmss")); });
                    string[] objGpsTime= {beginGPSTime.ToString("HHmmss"), endGPSTime.ToString("HHmmss")};
                    //Thread fcdThread = new Thread(new ParameterizedThreadStart(AnalyseFCDThread));
                    //fcdThread.Name = string.Format("Thread_{0}", i.ToString());
                    //fcdThread.Start(objGpsTime);                //启动线程
                    ThreadPool.QueueUserWorkItem(new WaitCallback(AnalyseFCDThread), objGpsTime);
                }
                Console.ReadLine();
            }
      

  5.   

    private void AnalyseFCDThread(object objGpsTime)
            {
                WriteLog(string.Format("线程:{0} 开始启动......", Thread.CurrentThread.GetHashCode()));
                string[] gpsTime = objGpsTime as string[];
                string beginGPSTime = gpsTime[0];
                string endGPSTime = gpsTime[1];
                OnlineFcdDs fcdDs = new OnlineFcdDs();
                FCDCarStatusDs carStatusDs = new FCDCarStatusDs();
                perTimer.Start();
                //取过滤完成的FCD数据
                fcdBll.GetOnlineFcdData(fcdDs, beginGPSTime, endGPSTime);            //进行地址匹配(经纬度匹配)。
                //1、遍历FCD点进行最短距离计算
                //2、进行方位角判断
                double MinDistance = 0;
                double DistanceAlongCurve = 0;
                IPoint fromPoint = new PointClass();
                Boolean bAsRatio = false;
                Boolean bRight = false;
                //与缓冲区有交集的并且方位角夹角小于特定值的线集合
                Dictionary<IFeature, double> roadLine = new Dictionary<IFeature, double>();            foreach (DataRow dr in fcdDs.T_ONLINEFCD.Rows)
                {
                    double tx, ty;
                    if (double.TryParse(dr["LONGITUDE"].ToString(), out tx) && double.TryParse(dr["LATITUDE"].ToString(), out ty))
                    {
                        IPoint fcdPoint = new PointClass();
                        fcdPoint.SpatialReference = ispRef;
                        fcdPoint.PutCoords(tx, ty);
                        //构造缓冲区
                        IGeometry bufferGeo = gisHp.GetBufferGeometry(fcdPoint as IGeometry, bufferDistance);
                        //求出与该缓冲区有交集的所有路段
                        List<IFeature> listLines = gisHp.GetIntersectGeometry(bufferGeo, pLines);
                        //判断方位角
                        double fcdAngle;
                        double.TryParse(dr["Rate"].ToString(), out fcdAngle);
                        if (listLines != null)
                        {
                            roadLine.Clear();
                            foreach (IFeature pFeature in listLines)
                            {
                                double lineAngle;
                                double.TryParse(pFeature.get_Value(pFeature.Fields.FindField("ANGLE")).ToString(), out lineAngle);
                                double difAngle = Math.Abs(lineAngle - fcdAngle);
                                //double difAngle = Math.Abs(tangLineAngle - fcdAngle);
                                if (difAngle >= 0) // <= 65)
                                {
                                    roadLine.Add(pFeature, MinDistance);
                                }
                            }
                            if (roadLine.Count > 0)
                            {
                                //根据最短距离对线集合进行排序(升序)
                                var sortLines = from disValue in roadLine orderby disValue.Value select disValue;
                                IFeature edgeLine = null;
                                foreach (KeyValuePair<IFeature, double> pair in sortLines)
                                {
                                    int edgeID;
                                    double edgeLength;
                                    decimal speed;
                                    edgeLine = pair.Key as IFeature;
                                    Int32.TryParse(edgeLine.get_Value(edgeLine.Fields.FindField("EDGEID")).ToString(), out edgeID);
                                    double.TryParse(edgeLine.get_Value(edgeLine.Fields.FindField("LENGTH")).ToString(), out edgeLength);
                                    if (edgeID > 0 && edgeLength > 0)
                                    {
    T_OnLineFCD_CarStatus
                                        DataRow carStatusDr = carStatusDs.T_ONLINEFCD_CARSTATUS.NewRow();
                                        carStatusDr["TERMINAL"] = dr["TERMINAL"];
                                        decimal.TryParse(dr["SPEED"].ToString(), out speed);
                                        carStatusDr["SPEED"] = decimal.Round(speed, 3);
                                        carStatusDr["EDGE_ID"] = edgeID;
                                        carStatusDr["EDGE_LENGTH"] = edgeLength;
                                        carStatusDr["GPSTIME"] = dr["GPSTIME"];
                                        carStatusDr["UPDATE_DATE"] = DateTime.Now.ToString("yyyyMMdd");
                                        carStatusDr["UPDATE_TIME"] = DateTime.Now.ToString("HHmmss");
                                        carStatusDr["FLAG"] = dr["FLAG"];
                                        //carStatusDr["SATELLITER_COUNT"] = dr[""];
                                        //其他列赋值
                                        carStatusDs.T_ONLINEFCD_CARSTATUS.Rows.Add(carStatusDr);
                                    }
                                    break;
                                }
                            }
                        }
                    }
                    dr.Delete();
                }
                perTimer.Stop();
                WriteLog(string.Format("{5}线程结束,时间间隔:{0}-{1},FCD总数:{2},匹配成功:{3}条,耗时:{4}",
                    beginGPSTime, endGPSTime,
                    fcdDs.T_ONLINEFCD.Rows.Count.ToString(), carStatusDs.T_ONLINEFCD_CARSTATUS.Rows.Count.ToString(),
                    perTimer.Duration, Thread.CurrentThread.GetHashCode()));
                //停止当前线程
                Thread.CurrentThread.Abort();
                Console.ReadLine();
            }
      

  6.   

    perTimer 没在各线程里定义,在哪定义的?