我创建了一个queue,
在socket中听到数据就写到这个queue中,然后由另外一个线程读取出来加以分析
现在的问题是:
queue中写入时速度要求快,否则会导致下一个数据包复盖上一个数据包的内容。与就是来不及写。
可是读取的方面要不停的读取,加以分析。
速度比:我接收到数据包的每秒大概有200包,也就是一定要每个包都写入queue。读取时最多每秒只有读取100包加以分析。而且在读取的时候可能会造成无法写入,这样我就出现丢包的现象。
我用.net例程中说的lock  wait的方法试了,一样呀!
他是其中A线程忙的时候,B线程等待,B线程忙的时候,A线程等待。
而我要的是B线程慢、等待不要紧,但要保证A线程不能等待。
我要怎么写呢?

解决方案 »

  1.   

    用异步实现不了么?
    http://dev.csdn.net/article/65798.shtm
      

  2.   

    我的问题不是接不下来数据,而是能接收到我的数据,但是接收下来后,我要进行处理分析,所以我把接收下来的数据放到queue中,然后由另一个线程来读取分析。可是当另一个线程分析时可能会造成写入queue的线程要等待,这时下一个数据包来了,这会把当前的这个数据包给丢弃或者复盖了。我处理数据包时就会出现丢包的问题
      

  3.   

    所以我要的解决的一个问题是,两个线程操作queue,一个写,一个读,怎么解决,以写的绝对优先,也就是说,我要怎么解决,读可以慢慢来,但写一定要及时的问题。
      

  4.   

    既然2个进程读写同一个Queue,那么每一个进程操作时需要
    lock(queue)
    {
      //do something
    }
    这样可以保证每个进程工作时的queue的完整和独立性。
    关于写优先,可以把写操作的ThreadPriority提高,使它获得更多的cpu时间,关于读的问题,个人认为,读写的速度相差不大,只是解析的时间多一些,不妨在建立单独的解析进程,读进程只负责将数据从queue中取出送给解析进程,这样应该可以解决“可是当另一个线程分析时可能会造成写入queue的线程要等待,这时下一个数据包来了,这会把当前的这个数据包给丢弃或者复盖了”问题。没有测试过,只是想法而已,仅供参考。
      

  5.   

    a 去 lock b 去try enter,可以保障a 控制的机率另外,b在a空闲时忙queue相关事务,但a突然有任务要用queue了,如果此时b 处理的事务不是关于queue的或者是处理Queue.Enqueue 后的事,就可以让它先挂起腾给a用,如果b 正处理的事务是queue的就用一个变量通知该线程提前结束处理比如for(int i=0; i < 100; i++) {
     if (classA.needUseQueue) return;
     lock(queue) {
     //TODO: u code
     }}
      

  6.   

    出现丢包的情况可能还有一种情况:B的处理过程占用太多的CPU。这时丢包的机率就高了,因为没有CPU来处理。我把B的线程每从A中读取一个数据休眠20毫秒就没有问题了,休眠的时候越多,丢包的机率就越低。可是这样的话,我收到一个email如果有2M大的话,处理的话可能就要10分钟的(包完全收到后处理要这么长时间)。而且处理时CPU使用率高达75%。我要怎么办呢,才能降低CPU使用率,并且速度加快呢?
      

  7.   

    处理方法:
    public bool ReadMailData()
    {
    int ID; //邮件标识
    string OriginationAddress = ""; //发出地址
    string OriginationPort = ""; //发出端口
    string DestinationAddress = ""; //接收地址
    string DestinationPort = ""; //接收端口
    string MailData = ""; //接收到的内容
    uint TCPSeq = 0; //TCP序号
    uint TCPAck = 0; //TCP确认号
    bool bDirection = false; //数据方向
    string TempData; //取得的临时数据

    if (Common.DataCount > 0)
    {
    lock(Common.QueueEmailTempData)
    {
    TempData = Common.QueueEmailTempData.Dequeue().ToString();
    Common.DataCount = Common.QueueEmailTempData.Count;
    }
    //取出的数据为空
    if (TempData == string.Empty)
    {
    return false;
    }
    //取得各项数据
    if (TempData.IndexOf(" ") > 0)
    {
    OriginationAddress = TempData.Substring(0,TempData.IndexOf(" "));
    TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
    }
    if (TempData.IndexOf(" ") > 0)
    {
    OriginationPort = TempData.Substring(0,TempData.IndexOf(" "));
    TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
    }
    if (TempData.IndexOf(" ") > 0)
    {
    DestinationAddress = TempData.Substring(0,TempData.IndexOf(" "));
    TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
    }
    if (TempData.IndexOf(" ") > 0)
    {
    DestinationPort = TempData.Substring(0,TempData.IndexOf(" "));
    TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
    }
    if (TempData.IndexOf(" ") > 0)
    {
    TCPSeq = uint.Parse(TempData.Substring(0,TempData.IndexOf(" ")));
    TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
    }
    if (TempData.IndexOf(" ") > 0)
    {
    TCPAck = uint.Parse(TempData.Substring(0,TempData.IndexOf(" ")));
    TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
    }
    MailData = TempData;
    //取得各项数据完成
    //发送数据时是以Data开始以"\r\n.\r\n"为结束。Data不保留
    //接收时是以RETR 开始以"\r\n.\r\n"为结束。Retr不保留
    string strSend = "DATA"; //发送数据标记
    string strReceive = "RETR "; //接收数据标记
    string strDataOver = "\r\n.\r\n"; //数据发送完或接收完标记

    DataRow[] drEmailAddressPorts;
    DataRow drEmailAddressPort;
    //正向查找Email临时数据信息中是否有存在的端口
    drEmailAddressPorts = Common.DSEmailTempData.Tables["AddressPortInfo"].Select("OriginationAddress = '" + OriginationAddress + "' AND OriginationPort = '" + OriginationPort + "' AND DestinationAddress = '" + DestinationAddress + "' AND DestinationPort = '" + DestinationPort + "'");
    if (drEmailAddressPorts.Length > 0)
    {
    drEmailAddressPort = drEmailAddressPorts[0]; //设置得到的数据行
    //表示查找到有存在的端口记录
    bDirection = true;
    }
    else
    {
    //反向查找Email临时数据信息中是否有存在的端口
    drEmailAddressPorts = Common.DSEmailTempData.Tables["AddressPortInfo"].Select("OriginationAddress = '" + DestinationAddress + "' AND OriginationPort = '" + DestinationPort + "' AND OriginationAddress = '" + DestinationAddress + "' AND OriginationPort = '" + DestinationPort + "'");
    if (drEmailAddressPorts.Length > 0)
    {
    drEmailAddressPort = drEmailAddressPorts[0]; //设置得到的数据行
    //表示查找到有存在的端口记录
    bDirection = false;
    }
    else
    {
    //表示没有找到相应的数据,则新增行
    if (MailData.ToUpper().IndexOf(strSend) == 0 || MailData.ToUpper().IndexOf(strReceive) == 0)
    {
    drEmailAddressPort = Common.DSEmailTempData.Tables["AddressPortInfo"].NewRow();
    if (MailData.ToUpper().IndexOf(strSend) == 0)
    {
    //发送数据
    drEmailAddressPort["OriginationAddress"] = OriginationAddress;
    drEmailAddressPort["OriginationPort"] = OriginationPort;
    drEmailAddressPort["DestinationAddress"] = DestinationAddress;
    drEmailAddressPort["DestinationPort"] = DestinationPort;
    drEmailAddressPort["NextTCPSeq"] = TCPSeq + (uint)MailData.Length; //应该出现数据的是当前TCP序号 + 当前数据。也就是下一个TCP序号
    }
    else
    {
    //接收数据 如果接收数据则要把数据反过来
    drEmailAddressPort["OriginationAddress"] = DestinationAddress;
    drEmailAddressPort["OriginationPort"] = DestinationPort;
    drEmailAddressPort["DestinationAddress"] = OriginationAddress;
    drEmailAddressPort["DestinationPort"] = OriginationPort;
    drEmailAddressPort["NextTCPSeq"] = TCPAck; //数据是反的,所以应该出现的序号是当前数据的确认号
    }
    //增加数据行
    drEmailAddressPort["MailData"] = "";
    drEmailAddressPort["LastChangTime"] = DateTime.Now;
    Common.DSEmailTempData.Tables["AddressPortInfo"].Rows.Add(drEmailAddressPort);
    return true;
    }
    else
    {
    return false; //不可接收数据项,直接返回
    }
    }
    }
    //得到地址标识
    ID = int.Parse(drEmailAddressPort["ID"].ToString());
    //判断正向数据是否收到结束标识
    if (MailData.ToUpper().IndexOf(strDataOver) >= 0 && bDirection)
    {
    MailData = MailData.Substring(0,MailData.Length - 5); //除去结束符
    if (MailData.Length > 0)
    {
    //增加记录的数据
    ReadMailData(drEmailAddressPort,TCPSeq,MailData);
    drEmailAddressPort["LastChangTime"] = DateTime.Now;
    }
    //保存文件,并传入当前TCPSeq用于确定当前Email长度
    SaveMailData(drEmailAddressPort,TCPSeq + (uint)MailData.Length);
    return true;
    }
    if ((bDirection && MailData.ToUpper().IndexOf(strSend) == 0) || (!bDirection & MailData.ToUpper().IndexOf(strReceive) == 0))
    {
    if(MailData.ToUpper().IndexOf(strSend) == 0)
    {
    //应该出现数据的是当前TCP序号 + 当前数据。也就是下一个TCP序号
    drEmailAddressPort["NextTCPSeq"] = TCPSeq + (uint)MailData.Length;
    }
    else
    {
    //数据是反的,所以应该出现的序号是当前数据的确认号
    drEmailAddressPort["NextTCPSeq"] = TCPAck;
    }
    //先清空缓冲数据,并删除临时文件
    ClearEmailData(drEmailAddressPort);
    return true;
    }
    //增加记录的数据,当数据方向正确并数据长度大于0
    if (bDirection && MailData.Length > 0)
    {
    ReadMailData(drEmailAddressPort,TCPSeq,MailData);
    drEmailAddressPort["LastChangTime"] = DateTime.Now;
    }
    return true;
    }
    return false;
    }
      

  8.   

    既然2个进程读写同一个Queue,那么每一个进程操作时需要
    lock(queue)
    {
      //do something
    }
    这样可以保证每个进程工作时的queue的完整和独立性。
    关于写优先,可以把写操作的ThreadPriority提高,使它获得更多的cpu时间,关于读的问题,个人认为,读写的速度相差不大,只是解析的时间多一些,不妨在建立单独的解析进程,读进程只负责将数据从queue中取出送给解析进程,这样应该可以解决“可是当另一个线程分析时可能会造成写入queue的线程要等待,这时下一个数据包来了,这会把当前的这个数据包给丢弃或者复盖了”问题。没有测试过,只是想法而已,仅供参考。
      

  9.   

    线程的优先级我已经处理的了。我把写的线程的ThreadPriority设为最高,读取的设为最低的了。我是能读取的。但CPU在读取的时候占用的太高了。说处理过程问题吧!但我的处理过程真的要用这么多代码呀!我还没有找到别的方法来处理呢
      

  10.   

    我想知道为什么线程为什么都要sleep阿  不是已经enter 了吗
      

  11.   

    休眠一下,时当前线程就不会占用CPU呀!