我现在在做一个接收和解析udp数据的项目,就是分两个线程 一个接收,一个解析;
接收的线程把接收的数据放在一个stringbuilder类型的对象中,用stringbuilder.add一包一包往后添加。解析从stringbuilder中按长度一个包一个包的取出数据解析并且写入txt文档中;我现在遇到一些问题,希望大神们给一点建议,最好给个伪码便于理解:
1、接收和解析不是同步的 ,接收会不时暂停或继续接收,也就是说stringbuilder中的数据在不停增加,如何控制两个线程,保证解析时stringbuilder中已有数据
2、如果在接收暂停后解析也完成已有stringbuilder的解析,在接收继续进行时,解析如何找断线。
3、按包解析中丢包怎么办

解决方案 »

  1.   

    我也要做个UDP接收数据的,看看楼下的大神有没有好的办法啊
      

  2.   

    分2个线程: 一个处理接收, 一个处理接收到数据. 之间通过一个队列存放待处理的数据.        private AutoResetEvent handleDataEvent = new AutoResetEvent(false);        // 接收线程
            private void receiveData()
            {
                byte[] bytesRecv = new byte[1024 * 1024];
                while (bConnect && client.Connected)
                {
                    byte[] data = null;
                    try
                    {
                        int nRecv = client.Client.Receive(bytesRecv);                    if (nRecv > 0)
                        {
                            if (packCount == 0) startTime = DateTime.Now;                        bytesCount += nRecv;
                            packCount++;                        data = new byte[nRecv];
                            Array.Copy(bytesRecv, data, nRecv);                        lock (queueSyncObj) // lock
                            {
                                recvDataQueue.Enqueue(data);
                                handleDataEvent.Set();
                            }                    }
                    }
                    catch (SocketException se)
                    {
                        LogClass.createLog(logPath, "MemoryTable", " 接收数据失败: " + se.Message);
                        break;
                    }
                    catch (ObjectDisposedException ode)
                    {
                        LogClass.createLog(logPath, "MemoryTable", " 接收数据失败: " + ode.Message);
                        break;
                    }
                    catch (Exception ex)
                    {
                        LogClass.createLog(logPath, "MemoryTable", " 接收数据异常: " + ex.Message);
                    }            }
                bConnect = false;
                client.Close();
                client = null;            // added on 2012/10/13 增加网络异常重连机制
                tryToConnectEvent.Set(); // 发出信号
            }        // debug: 记录异常
            private static int recvTimes = 0;
            private int errorDataLen = -1;
            private byte[] errorData = new byte[0];
            // private long handleBytes = 0, enqueueBytes =0;        private Thread threadHandleRecvData = null;
            private Queue<byte[]> recvDataQueue = new Queue<byte[]>();
            private object queueSyncObj = new object();
            private void handlerRecvData()
            {
                List<byte> bytesList = new List<byte>();
                int curPackLength = 0;
                bytesCount = 0;
                recvTimes = 0;            while (true)
                {
                    if (recvDataQueue.Count == 0)
                    {
                        handleDataEvent.WaitOne();
                    }
                    else
                    {
                        lock (queueSyncObj) //必须加锁以防止出错
                            bytesList.AddRange(recvDataQueue.Dequeue());
                    }                while (bytesList.Count > 20) // 循环处理封包 (minimum package length = 21)
                    {
                        if (!(bytesList[0] == 's' && bytesList[1] == 'm' && bytesList[2] == 't'))  // 数据异常(不是以 smt 标志开头)
                        {
                            errorData = bytesList.ToArray();
                            bytesList.Clear();
                            break;
                        }
                        curPackLength = (int)bytesList[11] + ((int)bytesList[12] << 8) + ((int)bytesList[13] << 16) + ((int)bytesList[14] << 24) + 21;
                        //curPackLength = BitConverter.ToInt32(new byte[] { bytesList[11], bytesList[12], bytesList[13], bytesList[14] }, 0) + 21;                    if (curPackLength > bytesList.Count) //数据长度小于标记的长度时,则跳出
                        {
                            //errorDataLen = curPackLength;
                            break;
                        }                    ProtocolData protocolData = ProtocolData.FromData(bytesList.GetRange(0, curPackLength).ToArray());
                        bytesList.RemoveRange(0, curPackLength); // 移除已经取得的包                    switch (protocolData.SecondaryCommand)
                        {
                            case 0x100: //报警字BJ
                                if (protocolData.PrimaryCommand == 0x30 && protocolData.Data.Length == 15 + 2) // sn + state(short)
                                {
                                    string sn = Encoding.ASCII.GetString(protocolData.Data, 0, 15);
                                    ushort status = BitConverter.ToUInt16(protocolData.Data, 15);
                                    int userId = 0;
                                    if (userIdSnDict.ContainsValue(sn)) // 通过sn反找userid
                                    {
                                        foreach (var item in userIdSnDict)
                                        {
                                            if (item.Value == sn)
                                            {
                                                userId = item.Key;
                                                break;
                                            }
                                        }
                                    }
                                    if (userId > 0)  // 添加到报警记录字典中
                                    {
                                        BJ bj = new BJ()
                                        {
                                            SN = sn,
                                            Status = status,
                                            Time = DateTime.Now,
                                            Alarm_ID = protocolData.RecNo
                                        };                                    lock (bjDict)
                                        {
                                            if (bjDict.ContainsKey(userId))
                                            {
                                                bjDict[userId].Add(bj);
                                            }
                                            else
                                            {
                                                List<BJ> bjlist = new List<BJ>();
                                                bjlist.Add(bj);
                                                bjDict.Add(userId, bjlist);
                                            }
                                        }
                                    }
                                }
                                break;                        case 0x101:  //added on 2012/11/07 Gps数据信息 1e
                                if (protocolData.Data.Length == 68) // data packege fixed length = 68
                                {
                                    try
                                    {
                                        int userId = BitConverter.ToInt32(protocolData.Data, 0);
                                        UserGpsData userGpsData = new UserGpsData(protocolData.Data);
                                        lock (userGpsDataDict)
                                        {
                                            if (userGpsDataDict.ContainsKey(userId))
                                            {
                                                userGpsDataDict[userId] = userGpsData;
                                            }
                                            else
                                            {
                                                userGpsDataDict.Add(userId, userGpsData);
                                            }
                                        }
                                        recvTimes++;
                                    }
                                    catch { }
                                }
                                break;                        default: // 默认处理, 服务端回复操作命令结果
                                if (protocolData.RecNo > 0 && operatorRecDict.ContainsKey(protocolData.RecNo))
                                {
                                    OperatorRecord optRec = operatorRecDict[protocolData.RecNo];
                                    optRec.State = (OperatorState)protocolData.Status;
                                    optRec.Time = DateTime.Now;                                updateOperatorRec(protocolData.RecNo, optRec);
                                }
                                break;
                        }                }// while end              
                }// while end            LogClass.createLog(logPath, "receive", "退出处理接收数据线程.");
            }