我现在在做一个接收和解析udp数据的项目,就是分两个线程 一个接收,一个解析;
接收的线程把接收的数据放在一个stringbuilder类型的对象中,用stringbuilder.add一包一包往后添加。解析从stringbuilder中按长度一个包一个包的取出数据解析并且写入txt文档中;我现在遇到一些问题,希望大神们给一点建议,最好给个伪码便于理解:
1、接收和解析不是同步的 ,接收会不时暂停或继续接收,也就是说stringbuilder中的数据在不停增加,如何控制两个线程,保证解析时stringbuilder中已有数据
2、如果在接收暂停后解析也完成已有stringbuilder的解析,在接收继续进行时,解析如何找断线。
3、按包解析中丢包怎么办
接收的线程把接收的数据放在一个stringbuilder类型的对象中,用stringbuilder.add一包一包往后添加。解析从stringbuilder中按长度一个包一个包的取出数据解析并且写入txt文档中;我现在遇到一些问题,希望大神们给一点建议,最好给个伪码便于理解:
1、接收和解析不是同步的 ,接收会不时暂停或继续接收,也就是说stringbuilder中的数据在不停增加,如何控制两个线程,保证解析时stringbuilder中已有数据
2、如果在接收暂停后解析也完成已有stringbuilder的解析,在接收继续进行时,解析如何找断线。
3、按包解析中丢包怎么办
private void receiveData()
{
byte[] bytesRecv = new byte[1024 * 1024];
while (bConnect && client.Connected)
{
byte[] data = null;
try
{
int nRecv = client.Client.Receive(bytesRecv); if (nRecv > 0)
{
if (packCount == 0) startTime = DateTime.Now; bytesCount += nRecv;
packCount++; data = new byte[nRecv];
Array.Copy(bytesRecv, data, nRecv); lock (queueSyncObj) // lock
{
recvDataQueue.Enqueue(data);
handleDataEvent.Set();
} }
}
catch (SocketException se)
{
LogClass.createLog(logPath, "MemoryTable", " 接收数据失败: " + se.Message);
break;
}
catch (ObjectDisposedException ode)
{
LogClass.createLog(logPath, "MemoryTable", " 接收数据失败: " + ode.Message);
break;
}
catch (Exception ex)
{
LogClass.createLog(logPath, "MemoryTable", " 接收数据异常: " + ex.Message);
} }
bConnect = false;
client.Close();
client = null; // added on 2012/10/13 增加网络异常重连机制
tryToConnectEvent.Set(); // 发出信号
} // debug: 记录异常
private static int recvTimes = 0;
private int errorDataLen = -1;
private byte[] errorData = new byte[0];
// private long handleBytes = 0, enqueueBytes =0; private Thread threadHandleRecvData = null;
private Queue<byte[]> recvDataQueue = new Queue<byte[]>();
private object queueSyncObj = new object();
private void handlerRecvData()
{
List<byte> bytesList = new List<byte>();
int curPackLength = 0;
bytesCount = 0;
recvTimes = 0; while (true)
{
if (recvDataQueue.Count == 0)
{
handleDataEvent.WaitOne();
}
else
{
lock (queueSyncObj) //必须加锁以防止出错
bytesList.AddRange(recvDataQueue.Dequeue());
} while (bytesList.Count > 20) // 循环处理封包 (minimum package length = 21)
{
if (!(bytesList[0] == 's' && bytesList[1] == 'm' && bytesList[2] == 't')) // 数据异常(不是以 smt 标志开头)
{
errorData = bytesList.ToArray();
bytesList.Clear();
break;
}
curPackLength = (int)bytesList[11] + ((int)bytesList[12] << 8) + ((int)bytesList[13] << 16) + ((int)bytesList[14] << 24) + 21;
//curPackLength = BitConverter.ToInt32(new byte[] { bytesList[11], bytesList[12], bytesList[13], bytesList[14] }, 0) + 21; if (curPackLength > bytesList.Count) //数据长度小于标记的长度时,则跳出
{
//errorDataLen = curPackLength;
break;
} ProtocolData protocolData = ProtocolData.FromData(bytesList.GetRange(0, curPackLength).ToArray());
bytesList.RemoveRange(0, curPackLength); // 移除已经取得的包 switch (protocolData.SecondaryCommand)
{
case 0x100: //报警字BJ
if (protocolData.PrimaryCommand == 0x30 && protocolData.Data.Length == 15 + 2) // sn + state(short)
{
string sn = Encoding.ASCII.GetString(protocolData.Data, 0, 15);
ushort status = BitConverter.ToUInt16(protocolData.Data, 15);
int userId = 0;
if (userIdSnDict.ContainsValue(sn)) // 通过sn反找userid
{
foreach (var item in userIdSnDict)
{
if (item.Value == sn)
{
userId = item.Key;
break;
}
}
}
if (userId > 0) // 添加到报警记录字典中
{
BJ bj = new BJ()
{
SN = sn,
Status = status,
Time = DateTime.Now,
Alarm_ID = protocolData.RecNo
}; lock (bjDict)
{
if (bjDict.ContainsKey(userId))
{
bjDict[userId].Add(bj);
}
else
{
List<BJ> bjlist = new List<BJ>();
bjlist.Add(bj);
bjDict.Add(userId, bjlist);
}
}
}
}
break; case 0x101: //added on 2012/11/07 Gps数据信息 1e
if (protocolData.Data.Length == 68) // data packege fixed length = 68
{
try
{
int userId = BitConverter.ToInt32(protocolData.Data, 0);
UserGpsData userGpsData = new UserGpsData(protocolData.Data);
lock (userGpsDataDict)
{
if (userGpsDataDict.ContainsKey(userId))
{
userGpsDataDict[userId] = userGpsData;
}
else
{
userGpsDataDict.Add(userId, userGpsData);
}
}
recvTimes++;
}
catch { }
}
break; default: // 默认处理, 服务端回复操作命令结果
if (protocolData.RecNo > 0 && operatorRecDict.ContainsKey(protocolData.RecNo))
{
OperatorRecord optRec = operatorRecDict[protocolData.RecNo];
optRec.State = (OperatorState)protocolData.Status;
optRec.Time = DateTime.Now; updateOperatorRec(protocolData.RecNo, optRec);
}
break;
} }// while end
}// while end LogClass.createLog(logPath, "receive", "退出处理接收数据线程.");
}