我创建了一个queue,
在socket中听到数据就写到这个queue中,然后由另外一个线程读取出来加以分析
现在的问题是:
queue中写入时速度要求快,否则会导致下一个数据包复盖上一个数据包的内容。与就是来不及写。
可是读取的方面要不停的读取,加以分析。
速度比:我接收到数据包的每秒大概有200包,也就是一定要每个包都写入queue。读取时最多每秒只有读取100包加以分析。而且在读取的时候可能会造成无法写入,这样我就出现丢包的现象。
我用.net例程中说的lock wait的方法试了,一样呀!
他是其中A线程忙的时候,B线程等待,B线程忙的时候,A线程等待。
而我要的是B线程慢、等待不要紧,但要保证A线程不能等待。
我要怎么写呢?
在socket中听到数据就写到这个queue中,然后由另外一个线程读取出来加以分析
现在的问题是:
queue中写入时速度要求快,否则会导致下一个数据包复盖上一个数据包的内容。与就是来不及写。
可是读取的方面要不停的读取,加以分析。
速度比:我接收到数据包的每秒大概有200包,也就是一定要每个包都写入queue。读取时最多每秒只有读取100包加以分析。而且在读取的时候可能会造成无法写入,这样我就出现丢包的现象。
我用.net例程中说的lock wait的方法试了,一样呀!
他是其中A线程忙的时候,B线程等待,B线程忙的时候,A线程等待。
而我要的是B线程慢、等待不要紧,但要保证A线程不能等待。
我要怎么写呢?
http://dev.csdn.net/article/65798.shtm
lock(queue)
{
//do something
}
这样可以保证每个进程工作时的queue的完整和独立性。
关于写优先,可以把写操作的ThreadPriority提高,使它获得更多的cpu时间,关于读的问题,个人认为,读写的速度相差不大,只是解析的时间多一些,不妨在建立单独的解析进程,读进程只负责将数据从queue中取出送给解析进程,这样应该可以解决“可是当另一个线程分析时可能会造成写入queue的线程要等待,这时下一个数据包来了,这会把当前的这个数据包给丢弃或者复盖了”问题。没有测试过,只是想法而已,仅供参考。
if (classA.needUseQueue) return;
lock(queue) {
//TODO: u code
}}
public bool ReadMailData()
{
int ID; //邮件标识
string OriginationAddress = ""; //发出地址
string OriginationPort = ""; //发出端口
string DestinationAddress = ""; //接收地址
string DestinationPort = ""; //接收端口
string MailData = ""; //接收到的内容
uint TCPSeq = 0; //TCP序号
uint TCPAck = 0; //TCP确认号
bool bDirection = false; //数据方向
string TempData; //取得的临时数据
if (Common.DataCount > 0)
{
lock(Common.QueueEmailTempData)
{
TempData = Common.QueueEmailTempData.Dequeue().ToString();
Common.DataCount = Common.QueueEmailTempData.Count;
}
//取出的数据为空
if (TempData == string.Empty)
{
return false;
}
//取得各项数据
if (TempData.IndexOf(" ") > 0)
{
OriginationAddress = TempData.Substring(0,TempData.IndexOf(" "));
TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
}
if (TempData.IndexOf(" ") > 0)
{
OriginationPort = TempData.Substring(0,TempData.IndexOf(" "));
TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
}
if (TempData.IndexOf(" ") > 0)
{
DestinationAddress = TempData.Substring(0,TempData.IndexOf(" "));
TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
}
if (TempData.IndexOf(" ") > 0)
{
DestinationPort = TempData.Substring(0,TempData.IndexOf(" "));
TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
}
if (TempData.IndexOf(" ") > 0)
{
TCPSeq = uint.Parse(TempData.Substring(0,TempData.IndexOf(" ")));
TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
}
if (TempData.IndexOf(" ") > 0)
{
TCPAck = uint.Parse(TempData.Substring(0,TempData.IndexOf(" ")));
TempData = TempData.Substring(TempData.IndexOf(" ") + 1);
}
MailData = TempData;
//取得各项数据完成
//发送数据时是以Data开始以"\r\n.\r\n"为结束。Data不保留
//接收时是以RETR 开始以"\r\n.\r\n"为结束。Retr不保留
string strSend = "DATA"; //发送数据标记
string strReceive = "RETR "; //接收数据标记
string strDataOver = "\r\n.\r\n"; //数据发送完或接收完标记
DataRow[] drEmailAddressPorts;
DataRow drEmailAddressPort;
//正向查找Email临时数据信息中是否有存在的端口
drEmailAddressPorts = Common.DSEmailTempData.Tables["AddressPortInfo"].Select("OriginationAddress = '" + OriginationAddress + "' AND OriginationPort = '" + OriginationPort + "' AND DestinationAddress = '" + DestinationAddress + "' AND DestinationPort = '" + DestinationPort + "'");
if (drEmailAddressPorts.Length > 0)
{
drEmailAddressPort = drEmailAddressPorts[0]; //设置得到的数据行
//表示查找到有存在的端口记录
bDirection = true;
}
else
{
//反向查找Email临时数据信息中是否有存在的端口
drEmailAddressPorts = Common.DSEmailTempData.Tables["AddressPortInfo"].Select("OriginationAddress = '" + DestinationAddress + "' AND OriginationPort = '" + DestinationPort + "' AND OriginationAddress = '" + DestinationAddress + "' AND OriginationPort = '" + DestinationPort + "'");
if (drEmailAddressPorts.Length > 0)
{
drEmailAddressPort = drEmailAddressPorts[0]; //设置得到的数据行
//表示查找到有存在的端口记录
bDirection = false;
}
else
{
//表示没有找到相应的数据,则新增行
if (MailData.ToUpper().IndexOf(strSend) == 0 || MailData.ToUpper().IndexOf(strReceive) == 0)
{
drEmailAddressPort = Common.DSEmailTempData.Tables["AddressPortInfo"].NewRow();
if (MailData.ToUpper().IndexOf(strSend) == 0)
{
//发送数据
drEmailAddressPort["OriginationAddress"] = OriginationAddress;
drEmailAddressPort["OriginationPort"] = OriginationPort;
drEmailAddressPort["DestinationAddress"] = DestinationAddress;
drEmailAddressPort["DestinationPort"] = DestinationPort;
drEmailAddressPort["NextTCPSeq"] = TCPSeq + (uint)MailData.Length; //应该出现数据的是当前TCP序号 + 当前数据。也就是下一个TCP序号
}
else
{
//接收数据 如果接收数据则要把数据反过来
drEmailAddressPort["OriginationAddress"] = DestinationAddress;
drEmailAddressPort["OriginationPort"] = DestinationPort;
drEmailAddressPort["DestinationAddress"] = OriginationAddress;
drEmailAddressPort["DestinationPort"] = OriginationPort;
drEmailAddressPort["NextTCPSeq"] = TCPAck; //数据是反的,所以应该出现的序号是当前数据的确认号
}
//增加数据行
drEmailAddressPort["MailData"] = "";
drEmailAddressPort["LastChangTime"] = DateTime.Now;
Common.DSEmailTempData.Tables["AddressPortInfo"].Rows.Add(drEmailAddressPort);
return true;
}
else
{
return false; //不可接收数据项,直接返回
}
}
}
//得到地址标识
ID = int.Parse(drEmailAddressPort["ID"].ToString());
//判断正向数据是否收到结束标识
if (MailData.ToUpper().IndexOf(strDataOver) >= 0 && bDirection)
{
MailData = MailData.Substring(0,MailData.Length - 5); //除去结束符
if (MailData.Length > 0)
{
//增加记录的数据
ReadMailData(drEmailAddressPort,TCPSeq,MailData);
drEmailAddressPort["LastChangTime"] = DateTime.Now;
}
//保存文件,并传入当前TCPSeq用于确定当前Email长度
SaveMailData(drEmailAddressPort,TCPSeq + (uint)MailData.Length);
return true;
}
if ((bDirection && MailData.ToUpper().IndexOf(strSend) == 0) || (!bDirection & MailData.ToUpper().IndexOf(strReceive) == 0))
{
if(MailData.ToUpper().IndexOf(strSend) == 0)
{
//应该出现数据的是当前TCP序号 + 当前数据。也就是下一个TCP序号
drEmailAddressPort["NextTCPSeq"] = TCPSeq + (uint)MailData.Length;
}
else
{
//数据是反的,所以应该出现的序号是当前数据的确认号
drEmailAddressPort["NextTCPSeq"] = TCPAck;
}
//先清空缓冲数据,并删除临时文件
ClearEmailData(drEmailAddressPort);
return true;
}
//增加记录的数据,当数据方向正确并数据长度大于0
if (bDirection && MailData.Length > 0)
{
ReadMailData(drEmailAddressPort,TCPSeq,MailData);
drEmailAddressPort["LastChangTime"] = DateTime.Now;
}
return true;
}
return false;
}
lock(queue)
{
//do something
}
这样可以保证每个进程工作时的queue的完整和独立性。
关于写优先,可以把写操作的ThreadPriority提高,使它获得更多的cpu时间,关于读的问题,个人认为,读写的速度相差不大,只是解析的时间多一些,不妨在建立单独的解析进程,读进程只负责将数据从queue中取出送给解析进程,这样应该可以解决“可是当另一个线程分析时可能会造成写入queue的线程要等待,这时下一个数据包来了,这会把当前的这个数据包给丢弃或者复盖了”问题。没有测试过,只是想法而已,仅供参考。