自定义的TCP/IP协议,在传输大消息时容易有粘包现象发生,请高手赐教如何进行拆包解包,谢谢!!
协议格式类似:0x7e 0x00 0x19 0x85 0x83 0x74 0x7e,其中0x7e为消息头、尾标记。

解决方案 »

  1.   

    每个逻辑包,开头的固定位置,表明这个包的类型和长度。下面的帖子,我举了个初步的例子:
    http://topic.csdn.net/u/20111030/22/a5d2c17e-ebad-4cc6-9f62-9b0b6e3f39fc.html
    你光靠0x7e,是不行的,除非你的数据中根本没有0x7e
      

  2.   


    正解,当传输的数据中包含0x7e字符时就悲剧了..
    最好就是以特殊字符做标识,类似TCP/IP的包头包尾一样..
      

  3.   

    对,我会保证传输数据中没有0x7e的,除包头包尾外,0x7e都会被转义,因为转义的原因,所以消息为变长,服务器端解析效率会比较低。另外,如果同时有上万条消息时,MINA的效率会比较差。
      

  4.   

    保证没有0x7e的话,那直接参考org.apache.mina.filter.codec.textline.TextLineEncoder/TextLineDecoder“同时有上万条消息”,别说mina,什么框架、语言都有压力。但是,你真有这么大的访问量吗?
      

  5.   

    openfire是基于mina的框架的一个实现,lz可是看看去
      

  6.   

    你的包不是定义在tcp/ip层之上的嘛
      

  7.   


    是在TCP/IP之上,但包大小超过1024字节时,就很容易出现粘包、断包现象的
      

  8.   

    你确保你的代码正确?
    话说,一个逻辑包,可能分散在几个Buffer里面过来,这个你能正常处理??
      

  9.   

    另外,你用了那些Filter/Handler?
      

  10.   


    目前看是正确的,目前没有用Filter,Handler是我自己写的继承了IoHandlerAdapter的实现类。
      

  11.   

    另外,一个逻辑包,会分散在几个Buffer里面?那样的话岂不是拆包组包就没法做了吗?
      

  12.   

    http://freemart.iteye.com/blog/836654这里资料具体解决方案和shine的方式差不多
      

  13.   

    有没有朋友试过MINA处理大量消息时的效率如何?
      

  14.   

    你的协议定义的有问题,一般我们都是做的TLV的格式
    例如 前4个byte,是code,类似你刚才的0x7e,然后紧接的4个byte是你对应的code的内容的长度,然后后面是内容,这样你只需要每次按长度截图后面的对应的内容就可以了,这样也可以嵌套实现,很容易。你的那种包,是不对的,因为你都不知道包的长度,那样如果数据流很大,你岂不每个byte都要遍历,而且不能保证包的内容没有你定义的0x7e,太耗cpu了。
      

  15.   


    我们的服务器就是MINA,聊天服务的,还有文件传输等。 并发每台机器每秒消息解包打包20万次。很稳定。
      

  16.   

    当然能做,而且必须能做。参考我最早的回复。我怀疑其实不是粘包,而是你codec中对Buffer没处理正确
      

  17.   


    我之前没有做DECODE的,只是从buffer中取出直接用的,发现在buffer中会有断包,粘包,后来在handler中对断包、粘包做了处理,现在想使用Decoder来做这个工作,MINA新手,请不吝赐教,谢谢!!
      

  18.   

    那你那个就不叫长度,顶多算个备注或者摘要。长度就是这个包的长度
    tlv是可以嵌套的,包大小,可以通过定义包头的TLV来解决。我们的跨平台程序都是这么定义的。
      

  19.   

    我没用过 Mina,但是我可以推荐你参考一下其他 Socket 框架的做法:Grizzly:
    http://grizzly.java.net/nonav/docs/docbkx2.0/html/coreframework-samples.html
    User Guide 第二章的样例:解析收到的消息。Grizzly 采用了消息过滤器的机器,在消息过滤器的 handleRead 中处理收到的消息,完整的代码详见:
    http://java.net/projects/grizzly/sources/git/content/samples/framework-samples/src/main/java/org/glassfish/grizzly/samples/filterchain/GIOPFilter.javaxSocket:
    http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm
    第 18 节的代码以及下面的图示。xSocket 使用监听方法 onData 处理收到的消息桢。Netty:
    http://netty.io/docs/3.2.6.Final/api/org/jboss/netty/handler/codec/frame/FrameDecoder.html
    FrameDecoder 的 API 文档。Netty 抽象了一个“消息桢解码器”的类来处理这些。我找了一下 Mina 2 的文档,也有类似 Netty FrameDecoder 相关的类:
    http://mina.apache.org/chapter-11-codec-filter.html
      

  20.   

    if(msgHead==SystemConstants.MsgHead){
       //解析正确,略去
    }else {
    int currentPosition = in.position();
                    int length = in.limit();
    if (currentPosition == 1) {//如果第一个字节不是消息标记
    for (int i = currentPosition; i < length; i++) {
    byte temp = in.get();
    if (temp == SystemConstants.DEVICE_MSG_TAIL) {
         int reaminLength = in.remaining();
    if (reaminLength > 0) {
    byte[] remainBytes = new byte[reaminLength];
    in.get(remainBytes, 0, reaminLength);
    in.clear();
    in.put(remainBytes);
    in.position(0);
    return false;
    }
    }
    }
    }
    break;
    }
    }
    in.position(startPos);
    return false;
    }现在这段解析有问题,我想把buffer中不合法的消息丢弃掉,但这样处理出错。请高手指教哈
      

  21.   

    不能丢掉,因为,所有的数据,都是你(或者其他人按照你协议)发来的,不能丢啊。// $Id: AbstractPacketDecoder.java,v 1.5 2008/06/15 19:07:27 michael Exp $
    package cn.oceanmedia.rdc.common.codec.decoder;import static cn.oceanmedia.rdc.common.DataPacketConstants.DEFAULT_HEADER_LENGTH;import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.mina.common.ByteBuffer;
    import org.apache.mina.common.IoSession;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    import org.apache.mina.filter.codec.demux.MessageDecoderAdapter;
    import org.apache.mina.filter.codec.demux.MessageDecoderResult;/**
     * 数据包解码对象解码器抽象超类。
     * 
     * @author Michael J Chane
     * @version $Revision: 1.5 $ $Date: 2008/06/15 19:07:27 $
     * @param
     *          <P>
     *          解码器返回的数据包内容
     */
    public abstract class AbstractPacketDecoder<P> extends MessageDecoderAdapter {  /**
       * 日志对象
       */
      protected final Log log = LogFactory.getLog(getClass());  /**
       * 数据包类型
       */
      protected byte type;  /**
       * 创建一个新的<code>AbstractPacketDecoder</code>对象。
       * 
       * @param type
       *          可以解码的数据类型
       */
      protected AbstractPacketDecoder(byte type) {
        this.type = type;
      }  /**
       * 判断当前解码器能否解码最新读入的数据。
       * 
       * @param session
       *          连接会话
       * @param in
       *          读入的字节缓冲
       * @return 是否可解码: {@code MessageDecoder.OK} 可以解码;
       *         {@code MessageDecoder.NOT_OK} 不能解码;
       *         {@code MessageDecoder.NEED_DATA} 需要更多数据才能判断是否能够解码
       * @see org.apache.mina.filter.codec.demux.MessageDecoder#decodable(org.apache.mina.common.IoSession,
       *      org.apache.mina.common.ByteBuffer)
       */
      public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
        // 数据包头长度
        int headerLength = retrieveHeaderLength();
        if (in.remaining() < headerLength) {
          // 读入的字节缓冲中的剩余数据长度小于数据包头长度,需要更多数据
          // 注意:即使有标志位便可以确认当前类能否解码,仍然需要确保第一次解码时能够正确读入长度数据
          return NEED_DATA;
        }    // 数据包类型
        byte contentType = in.get();
        if (contentType == type) {
          // 数据包类型与当前解码器可以解码的数据类型匹配,可以解码
          return OK;
        } else {
          // 数据包类型与当前解码器可以解码的数据类型匹配,不能解码
          return NOT_OK;
        }
      }  /**
       * 解码最新读入的数据。
       * 
       * @param session
       *          会话连接
       * @param in
       *          读入的字节缓冲
       * @param out
       *          存放解码结果的输出
       * @return 处理结果: {@code MessageDecoder.OK} 可以解码; {@code MessageDecoder.NOT_OK}
       *         不能解码; {@code MessageDecoder.NEED_DATA} 需要更多数据才能判断是否能够解码
       * @throws Exception
       *           如果在处理中发生异常的话
       * @see org.apache.mina.filter.codec.demux.MessageDecoder#decode(org.apache.mina.common.IoSession,
       *      org.apache.mina.common.ByteBuffer,
       *      org.apache.mina.filter.codec.ProtocolDecoderOutput)
       */
      public MessageDecoderResult decode(IoSession session,
                                         ByteBuffer in,
                                         ProtocolDecoderOutput out)
          throws Exception {
        // 记录原有位置
        in.();
        // 跳过数据包标志位
        in.get();
        // 数据包长度
        int contentLength = getContentLength(session, in);    if (in.remaining() < contentLength) {
          // 当前物理数据包长度小于逻辑数据包长度,重置开始位置
          in.position(in.Value());
          // 需要更多数据
          return NEED_DATA;
        }    // 解码数据体
        P packet = decodeBody(session, in, contentLength);
        if (packet == null) {
          // 错误
          return NOT_OK;
        }    // 保存数据包对象
        out.write(packet);
        // 解码成功
        return OK;
      }  /**
       * 返回数据包头的长度。
       * <p>
       * 针对个别包4位长度位或者没有长度位的情况,子类应该覆盖该方法,返回对应的数据包头长度。
       * </p>
       * 
       * @return 数据包头的长度
       */
      protected int retrieveHeaderLength() {
        return DEFAULT_HEADER_LENGTH;
      }  /**
       * 获取数据体(不含标志位及长度位本身)的字节长度。
       * <p>
       * 本方法会在获取数据包类型后被调用,默认从当前字节缓冲中读取2字节长度信息。
       * </p>
       * <p>
       * 子类可以覆盖该方法,直接返回长度信息而不从字节缓冲中读取。
       * </p>
       * 
       * @param session
       *          会话连接
       * @param in
       *          字节缓冲
       * @return 数据体长度,它可以是:
       *         <ul>
       *         <li>从数据包中特定位置读取的长度信息</li>
       *         <li><code>NEED_MORE_LENGTH_INFO</code>代表需要更多字节才能判断长度</li>
       *         </ul>
       */
      protected int getContentLength(IoSession session, ByteBuffer in) {
        return in.getShort();
      }  /**
       * 解码数据体内容,并返回对应的数据对象。
       * 
       * @param session
       *          会话连接
       * @param in
       *          读入的字节缓冲
       * @param contentLength
       *          数据体长度
       * @return 数据体内容解码后得到的对应的数据对象,或者 <code>null</code> 代表解码时发生错误。
       * @throws Exception
       *           如果在处理中发生异常的话
       */
      protected abstract P decodeBody(IoSession session,
                                      ByteBuffer in,
                                      int contentLength) throws Exception;
    }
      

  22.   

    ClientVersion是客户端版本,就不贴了。import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;import org.apache.mina.common.ByteBuffer;
    import org.apache.mina.common.IoSession;/**
     * 客户端本信息解码器。
     * 
     * @author Michael J Chane
     * @version $Revision: 1.4 $ $Date: 2008/06/12 04:18:06 $
     */
    public class ClientVersionDecoder extends AbstractPacketDecoder<ClientVersion> {  /**
       * 客户端版本信息数据包标志位
       */
      public static final byte HEADER_VER = 0x00;  /**
       * 字符集:ASCII
       */
      protected static final CharsetDecoder CHARSET_DECODER = Charset
          .forName("ASCII").newDecoder();  /**
       * 创建一个新的<code>ClientVersionDecoder</code>对象,数据包头{@link #HEADER_VER HEADER_VER}
       */
      public ClientVersionDecoder() {
        super(HEADER_VER);
      }  /**
       * 解码并返回读入的客户端版本信息。
       * 
       * @param session
       *          会话连接
       * @param in
       *          读入的字节缓冲
       * @param contentLength
       *          数据包内容长度。
       * @return 客户端版本信息
       * @throws Exception
       *           如果处理中发生异常的话
       */
      @Override
      protected ClientVersion decodeBody(IoSession session,
                                         ByteBuffer in,
                                         int contentLength) throws Exception {
        // 读入字节
        String versionInfo = in.getString(contentLength, CHARSET_DECODER).trim();
        // 返回客户端版本信息
        return new ClientVersion(versionInfo);
      }  /**
       * 从读入的字节缓冲中读取一个字节的版本信息长度。
       * 
       * @param session
       *          会话连接
       * @param in
       *          字节缓冲
       * @return 版本信息长度
       * @throws Exception
       *           如果处理中发生异常的话
       */
      @Override
      protected int getContentLength(IoSession session, ByteBuffer in) {
        return in.get();
      }}
      

  23.   


    import java.awt.image.BufferedImage;import javax.imageio.ImageIO;import org.apache.mina.common.ByteBuffer;
    import org.apache.mina.common.IoSession;
    /**
     * 分区数据解码器。
     * 
     * @author Michael J Chane
     * @version $Revision: 1.3 $ $Date: 2008/06/15 19:07:27 $
     */
    public class ScreenPartitionDecoder extends
        AbstractPacketDecoder<ScreenPartition> {  /**
       * 是否是服务器侧
       */
      private boolean serverSide;  /**
       * 创建一个新的<code>ScreenPartitionDecoder</code>对象。
       */
      public ScreenPartitionDecoder() {
        super(HEADER_SCRN);
      }  /**
       * 返回 <code>serverSide</code> 的当前值。
       * 
       * @return <code>serverSide</code> 的当前值
       */
      public boolean isServerSide() {
        return serverSide;
      }  /**
       * 设置 <code>serverSide</code> 的当前值。
       * 
       * @param serverSide
       *          <code>serverSide</code> 的当前值
       */
      public void setServerSide(boolean serverSide) {
        this.serverSide = serverSide;
      }  /**
       * 分区数据解码方法。
       * 
       * @param session
       *          会话连接
       * @param in
       *          读入的字节缓冲
       * @param contentLength
       *          内容长度
       * @return 对应的已经装载了新数据的分区对象
       * @throws Exception
       *           如果处理中发生异常的话
       */
      @Override
      protected ScreenPartition decodeBody(IoSession session,
                                           ByteBuffer in,
                                           int contentLength) throws Exception {
        if (serverSide) {
          return serverDecode(session, in, contentLength);
        } else {
          return viewerDecode(session, in, contentLength);
        }
      }  /**
       * 服务端解码方法。
       * 
       * @param session
       *          绘画连接
       * @param in
       *          读入的字节
       * @param contentLength
       *          内容长度
       * @return 读入的分区
       * @throws Exception
       *           如果处理中发生异常的话
       */
      protected ScreenPartition serverDecode(IoSession session,
                                             ByteBuffer in,
                                             int contentLength) throws Exception {
        // 远程桌面对象
        RemoteDesktop desktop = (RemoteDesktop) session.getAttachment();
        // 编码后的字节
        ByteBuffer encodedBytes = ByteBuffer.allocate(contentLength
            + DEFAULT_HEADER_LENGTH);
        // 设置标志位
        encodedBytes.put(DataPacketConstants.HEADER_SCRN);
        // 长度
        encodedBytes.putShort((short) contentLength);
        // 读取分区索引
        int partitionIndex = in.getShort();
        encodedBytes.putShort((short) partitionIndex);
        // 数据内容
        for (int i = 0; i < contentLength - 2; i++) {
          encodedBytes.put(in.get());
        }
        // 编码完成
        encodedBytes.flip();
        // 对应的分区
        ScreenPartition screenPartition = desktop.findPartition(partitionIndex);
        // 设置编码后的字节
        screenPartition.saveEncodedBytes(encodedBytes);
        // 返回对应的分区对象
        return screenPartition;
      }  /**
       * 客户端解码方法。
       * 
       * @param session
       *          会话连接
       * @param in
       *          读入的字节
       * @param contentLength
       *          内容长度
       * @return 读入的分区
       * @throws Exception
       *           如果处理中发生异常的话
       */
      protected ScreenPartition viewerDecode(IoSession session,
                                             ByteBuffer in,
                                             int contentLength) throws Exception {
        // 远程桌面对象
        RemoteDesktop desktop = (RemoteDesktop) session.getAttachment();
        // 当前数据包之后新的数据包(如果存在的话)的位置
        int newPos = in.position() + contentLength;
        // 读取分区索引
        int partitionIndex = in.getShort();
        // 读取分区图像
        BufferedImage image = ImageIO.read(in.asInputStream());
        // 对应的分区
        ScreenPartition screenPartition = desktop.findPartition(partitionIndex);
        // 设置新图像
        screenPartition.setImage(image);
        // 设置字节缓冲新位置,这个必须设置一下。因为上面的ImageIO.read可能读取的字节数与发送的字节数不匹配,必须设置,以免多读或者少读,导致后面的Decoder出错
        in.position(newPos);
        // 返回对应的分区对象
        return screenPartition;
      }}我这些代码是mina 1的,大体上mina 2原理一致。我的功能是类似现在TeamViewer的功能,远程桌面,被控端——我称之为Puppet(傀儡,提线木偶)发送数据到服务器(Server,称之为舞台),Server根据各个观众(Viewer)的数据请求情况,发送Puppet的桌面信息。反过来,将Viewer的控制信息,通过Server传给Puppet。其中,你刚才提到的“无用数据”,其实并非无用,参考最后viewerDecode方法中,最后的in.position(newPos);
      

  24.   

    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
    int startPos = 0;
    in.position(startPos);
    while (in.hasRemaining()) {
    byte b = in.get();// 取得第一个字节
    int currentlen=in.limit();
    if (b == SystemConstants.DEVICE_MSG_HEAD) {// 如果为消息头
    in.position(startPos + 3);
    byte thirdByte = in.get();// 读取第三个字节标记
    int tmpDivided = (thirdByte >>> 5) & 0x1;
    in.position(startPos + 4);
    int msgContentLength = (thirdByte & 0x3) * 256 + (in.get() & 0xff);
    int totalMsgLength = msgContentLength + 15;
    if (tmpDivided == SystemConstants.IS_DIVIDE_MSG) {
    totalMsgLength += 4;
    }
    if (in.limit() >= totalMsgLength) {
    in.position(totalMsgLength - 1);// 指向消息最大长度位置
    byte tail = in.get();
    if (tail == SystemConstants.DEVICE_MSG_TAIL) {
    byte[] aMsgBytes = new byte[totalMsgLength];
    in.position(startPos);
    in.get(aMsgBytes, 0, totalMsgLength);
    out.write(aMsgBytes);
    return true;
    } else {
    int currentPosition = in.position();
    in.position(currentPosition + 1);
    for (int i = currentPosition + 1; i < in.limit(); i++) {
    byte temp = in.get();
    if (temp == SystemConstants.DEVICE_MSG_TAIL) {
    byte[] aMsgBytes = new byte[i + 1];
    in.position(startPos);
    in.get(aMsgBytes, 0, i + 1);
    out.write(aMsgBytes);
    return true;
    }
    }
    }
    } else {
    in.position(startPos);
    return false;
    }
    } else {
    int currentPosition = in.position();
    // debug
    System.out.println("Read first byte from buffer is:******************************" + b);
    System.out.println("*********start pos is:" + startPos + ",*************************current position is:" + currentPosition + ",Remaining is:"
    + in.remaining());
    int length = in.limit();
    if (currentPosition == 1) {// 如果第一个字节不是0x7e
    for (int i = currentPosition; i < length; i++) {
    byte temp = in.get();
    if (temp == SystemConstants.DEVICE_MSG_TAIL) {
    // int reaminLength = in.remaining();
    // if (reaminLength > 0) {
    // byte[] remainBytes = new byte[reaminLength];
    // in.get(remainBytes, 0, reaminLength);
    // in.clear();
    // in.put(remainBytes);
    // in.position(0);
    return false;
    // }
    }
    }
    }
    break;
    }
    }
    // in.position(startPos);
    return false;
    }
    我继承了CumulativeProtocolDecoder实现。现在发现第一包数据会莫名奇妙的被丢掉
      

  25.   

    OK了,修订后代码如下:
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
    int startPos = 0;
    in.position(startPos);
    int currentlen=in.limit();
    System.out.println("Current length is*******************:"+currentlen+",current position is:"+in.position());
    while (in.hasRemaining()) {
    byte b = in.get();// 取得第一个字节
    if (b == SystemConstants.DEVICE_MSG_HEAD) {// 如果为消息头
    in.position(startPos + 3);
    byte thirdByte = in.get();// 读取第三个字节标记
    int tmpDivided = (thirdByte >>> 5) & 0x1;
    in.position(startPos + 4);
    int msgContentLength = (thirdByte & 0x3) * 256 + (in.get() & 0xff);
    int totalMsgLength = msgContentLength + 15;
    if (tmpDivided == SystemConstants.IS_DIVIDE_MSG) {
    totalMsgLength += 4;
    }
    if (in.limit() >= totalMsgLength) {
    in.position(totalMsgLength - 1);// 指向消息最大长度位置
    byte tail = in.get();
    if (tail == SystemConstants.DEVICE_MSG_TAIL) {
    byte[] aMsgBytes = new byte[totalMsgLength];
    in.position(startPos);
    in.get(aMsgBytes, 0, totalMsgLength);
    out.write(aMsgBytes);
    return true;
    } else {
    int currentPosition = in.position();
    // in.position(currentPosition + 1);
    for (int i = currentPosition; i < in.limit(); i++) {
    byte temp = in.get();
    if (temp == SystemConstants.DEVICE_MSG_TAIL) {
    byte[] aMsgBytes = new byte[i + 1];
    in.position(startPos);
    in.get(aMsgBytes, 0, i + 1);
    out.write(aMsgBytes);
    return true;
    }
    }
    }
    } else {
    in.position(startPos);
    return false;
    }
    } else {
    int currentPosition = in.position();
    // debug
    System.out.println("Read first byte from buffer is:******************************" + b);
    System.out.println("*********start pos is:" + startPos + ",*************************current position is:" + currentPosition + ",Remaining is:"
    + in.remaining());
    int length = in.limit();
    if (currentPosition == 1) {// 如果第一个字节不是0x7e
    for (int i = currentPosition; i < length; i++) {
    byte temp = in.get();
    if (temp == SystemConstants.DEVICE_MSG_TAIL) {
    // int reaminLength = in.remaining();
    // if (reaminLength > 0) {
    // byte[] remainBytes = new byte[reaminLength];
    // in.get(remainBytes, 0, reaminLength);
    // in.clear();
    // in.put(remainBytes);
    // in.position(0);
    return false;
    // }
    }
    }
    }
    break;
    }
    }
    // in.position(startPos);
    return false;
    }