MINA粘包消息 自定义的TCP/IP协议,在传输大消息时容易有粘包现象发生,请高手赐教如何进行拆包解包,谢谢!!协议格式类似:0x7e 0x00 0x19 0x85 0x83 0x74 0x7e,其中0x7e为消息头、尾标记。 解决方案 » 免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货 每个逻辑包,开头的固定位置,表明这个包的类型和长度。下面的帖子,我举了个初步的例子:http://topic.csdn.net/u/20111030/22/a5d2c17e-ebad-4cc6-9f62-9b0b6e3f39fc.html你光靠0x7e,是不行的,除非你的数据中根本没有0x7e 正解,当传输的数据中包含0x7e字符时就悲剧了..最好就是以特殊字符做标识,类似TCP/IP的包头包尾一样.. 对,我会保证传输数据中没有0x7e的,除包头包尾外,0x7e都会被转义,因为转义的原因,所以消息为变长,服务器端解析效率会比较低。另外,如果同时有上万条消息时,MINA的效率会比较差。 保证没有0x7e的话,那直接参考org.apache.mina.filter.codec.textline.TextLineEncoder/TextLineDecoder“同时有上万条消息”,别说mina,什么框架、语言都有压力。但是,你真有这么大的访问量吗? openfire是基于mina的框架的一个实现,lz可是看看去 你的包不是定义在tcp/ip层之上的嘛 是在TCP/IP之上,但包大小超过1024字节时,就很容易出现粘包、断包现象的 你确保你的代码正确?话说,一个逻辑包,可能分散在几个Buffer里面过来,这个你能正常处理?? 另外,你用了那些Filter/Handler? 目前看是正确的,目前没有用Filter,Handler是我自己写的继承了IoHandlerAdapter的实现类。 另外,一个逻辑包,会分散在几个Buffer里面?那样的话岂不是拆包组包就没法做了吗? http://freemart.iteye.com/blog/836654这里资料具体解决方案和shine的方式差不多 有没有朋友试过MINA处理大量消息时的效率如何? 你的协议定义的有问题,一般我们都是做的TLV的格式例如 前4个byte,是code,类似你刚才的0x7e,然后紧接的4个byte是你对应的code的内容的长度,然后后面是内容,这样你只需要每次按长度截图后面的对应的内容就可以了,这样也可以嵌套实现,很容易。你的那种包,是不对的,因为你都不知道包的长度,那样如果数据流很大,你岂不每个byte都要遍历,而且不能保证包的内容没有你定义的0x7e,太耗cpu了。 我们的服务器就是MINA,聊天服务的,还有文件传输等。 并发每台机器每秒消息解包打包20万次。很稳定。 当然能做,而且必须能做。参考我最早的回复。我怀疑其实不是粘包,而是你codec中对Buffer没处理正确 我之前没有做DECODE的,只是从buffer中取出直接用的,发现在buffer中会有断包,粘包,后来在handler中对断包、粘包做了处理,现在想使用Decoder来做这个工作,MINA新手,请不吝赐教,谢谢!! 那你那个就不叫长度,顶多算个备注或者摘要。长度就是这个包的长度tlv是可以嵌套的,包大小,可以通过定义包头的TLV来解决。我们的跨平台程序都是这么定义的。 我没用过 Mina,但是我可以推荐你参考一下其他 Socket 框架的做法:Grizzly:http://grizzly.java.net/nonav/docs/docbkx2.0/html/coreframework-samples.htmlUser Guide 第二章的样例:解析收到的消息。Grizzly 采用了消息过滤器的机器,在消息过滤器的 handleRead 中处理收到的消息,完整的代码详见:http://java.net/projects/grizzly/sources/git/content/samples/framework-samples/src/main/java/org/glassfish/grizzly/samples/filterchain/GIOPFilter.javaxSocket:http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm第 18 节的代码以及下面的图示。xSocket 使用监听方法 onData 处理收到的消息桢。Netty:http://netty.io/docs/3.2.6.Final/api/org/jboss/netty/handler/codec/frame/FrameDecoder.htmlFrameDecoder 的 API 文档。Netty 抽象了一个“消息桢解码器”的类来处理这些。我找了一下 Mina 2 的文档,也有类似 Netty FrameDecoder 相关的类:http://mina.apache.org/chapter-11-codec-filter.html if(msgHead==SystemConstants.MsgHead){ //解析正确,略去}else { int currentPosition = in.position(); int length = in.limit(); if (currentPosition == 1) {//如果第一个字节不是消息标记 for (int i = currentPosition; i < length; i++) { byte temp = in.get(); if (temp == SystemConstants.DEVICE_MSG_TAIL) { int reaminLength = in.remaining(); if (reaminLength > 0) { byte[] remainBytes = new byte[reaminLength]; in.get(remainBytes, 0, reaminLength); in.clear(); in.put(remainBytes); in.position(0); return false; } } } } break; } }in.position(startPos);return false;}现在这段解析有问题,我想把buffer中不合法的消息丢弃掉,但这样处理出错。请高手指教哈 不能丢掉,因为,所有的数据,都是你(或者其他人按照你协议)发来的,不能丢啊。// $Id: AbstractPacketDecoder.java,v 1.5 2008/06/15 19:07:27 michael Exp $package cn.oceanmedia.rdc.common.codec.decoder;import static cn.oceanmedia.rdc.common.DataPacketConstants.DEFAULT_HEADER_LENGTH;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.IoSession;import org.apache.mina.filter.codec.ProtocolDecoderOutput;import org.apache.mina.filter.codec.demux.MessageDecoderAdapter;import org.apache.mina.filter.codec.demux.MessageDecoderResult;/** * 数据包解码对象解码器抽象超类。 * * @author Michael J Chane * @version $Revision: 1.5 $ $Date: 2008/06/15 19:07:27 $ * @param * <P> * 解码器返回的数据包内容 */public abstract class AbstractPacketDecoder<P> extends MessageDecoderAdapter { /** * 日志对象 */ protected final Log log = LogFactory.getLog(getClass()); /** * 数据包类型 */ protected byte type; /** * 创建一个新的<code>AbstractPacketDecoder</code>对象。 * * @param type * 可以解码的数据类型 */ protected AbstractPacketDecoder(byte type) { this.type = type; } /** * 判断当前解码器能否解码最新读入的数据。 * * @param session * 连接会话 * @param in * 读入的字节缓冲 * @return 是否可解码: {@code MessageDecoder.OK} 可以解码; * {@code MessageDecoder.NOT_OK} 不能解码; * {@code MessageDecoder.NEED_DATA} 需要更多数据才能判断是否能够解码 * @see org.apache.mina.filter.codec.demux.MessageDecoder#decodable(org.apache.mina.common.IoSession, * org.apache.mina.common.ByteBuffer) */ public MessageDecoderResult decodable(IoSession session, ByteBuffer in) { // 数据包头长度 int headerLength = retrieveHeaderLength(); if (in.remaining() < headerLength) { // 读入的字节缓冲中的剩余数据长度小于数据包头长度,需要更多数据 // 注意:即使有标志位便可以确认当前类能否解码,仍然需要确保第一次解码时能够正确读入长度数据 return NEED_DATA; } // 数据包类型 byte contentType = in.get(); if (contentType == type) { // 数据包类型与当前解码器可以解码的数据类型匹配,可以解码 return OK; } else { // 数据包类型与当前解码器可以解码的数据类型匹配,不能解码 return NOT_OK; } } /** * 解码最新读入的数据。 * * @param session * 会话连接 * @param in * 读入的字节缓冲 * @param out * 存放解码结果的输出 * @return 处理结果: {@code MessageDecoder.OK} 可以解码; {@code MessageDecoder.NOT_OK} * 不能解码; {@code MessageDecoder.NEED_DATA} 需要更多数据才能判断是否能够解码 * @throws Exception * 如果在处理中发生异常的话 * @see org.apache.mina.filter.codec.demux.MessageDecoder#decode(org.apache.mina.common.IoSession, * org.apache.mina.common.ByteBuffer, * org.apache.mina.filter.codec.ProtocolDecoderOutput) */ public MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { // 记录原有位置 in.(); // 跳过数据包标志位 in.get(); // 数据包长度 int contentLength = getContentLength(session, in); if (in.remaining() < contentLength) { // 当前物理数据包长度小于逻辑数据包长度,重置开始位置 in.position(in.Value()); // 需要更多数据 return NEED_DATA; } // 解码数据体 P packet = decodeBody(session, in, contentLength); if (packet == null) { // 错误 return NOT_OK; } // 保存数据包对象 out.write(packet); // 解码成功 return OK; } /** * 返回数据包头的长度。 * <p> * 针对个别包4位长度位或者没有长度位的情况,子类应该覆盖该方法,返回对应的数据包头长度。 * </p> * * @return 数据包头的长度 */ protected int retrieveHeaderLength() { return DEFAULT_HEADER_LENGTH; } /** * 获取数据体(不含标志位及长度位本身)的字节长度。 * <p> * 本方法会在获取数据包类型后被调用,默认从当前字节缓冲中读取2字节长度信息。 * </p> * <p> * 子类可以覆盖该方法,直接返回长度信息而不从字节缓冲中读取。 * </p> * * @param session * 会话连接 * @param in * 字节缓冲 * @return 数据体长度,它可以是: * <ul> * <li>从数据包中特定位置读取的长度信息</li> * <li><code>NEED_MORE_LENGTH_INFO</code>代表需要更多字节才能判断长度</li> * </ul> */ protected int getContentLength(IoSession session, ByteBuffer in) { return in.getShort(); } /** * 解码数据体内容,并返回对应的数据对象。 * * @param session * 会话连接 * @param in * 读入的字节缓冲 * @param contentLength * 数据体长度 * @return 数据体内容解码后得到的对应的数据对象,或者 <code>null</code> 代表解码时发生错误。 * @throws Exception * 如果在处理中发生异常的话 */ protected abstract P decodeBody(IoSession session, ByteBuffer in, int contentLength) throws Exception;} ClientVersion是客户端版本,就不贴了。import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.IoSession;/** * 客户端本信息解码器。 * * @author Michael J Chane * @version $Revision: 1.4 $ $Date: 2008/06/12 04:18:06 $ */public class ClientVersionDecoder extends AbstractPacketDecoder<ClientVersion> { /** * 客户端版本信息数据包标志位 */ public static final byte HEADER_VER = 0x00; /** * 字符集:ASCII */ protected static final CharsetDecoder CHARSET_DECODER = Charset .forName("ASCII").newDecoder(); /** * 创建一个新的<code>ClientVersionDecoder</code>对象,数据包头{@link #HEADER_VER HEADER_VER} */ public ClientVersionDecoder() { super(HEADER_VER); } /** * 解码并返回读入的客户端版本信息。 * * @param session * 会话连接 * @param in * 读入的字节缓冲 * @param contentLength * 数据包内容长度。 * @return 客户端版本信息 * @throws Exception * 如果处理中发生异常的话 */ @Override protected ClientVersion decodeBody(IoSession session, ByteBuffer in, int contentLength) throws Exception { // 读入字节 String versionInfo = in.getString(contentLength, CHARSET_DECODER).trim(); // 返回客户端版本信息 return new ClientVersion(versionInfo); } /** * 从读入的字节缓冲中读取一个字节的版本信息长度。 * * @param session * 会话连接 * @param in * 字节缓冲 * @return 版本信息长度 * @throws Exception * 如果处理中发生异常的话 */ @Override protected int getContentLength(IoSession session, ByteBuffer in) { return in.get(); }} import java.awt.image.BufferedImage;import javax.imageio.ImageIO;import org.apache.mina.common.ByteBuffer;import org.apache.mina.common.IoSession;/** * 分区数据解码器。 * * @author Michael J Chane * @version $Revision: 1.3 $ $Date: 2008/06/15 19:07:27 $ */public class ScreenPartitionDecoder extends AbstractPacketDecoder<ScreenPartition> { /** * 是否是服务器侧 */ private boolean serverSide; /** * 创建一个新的<code>ScreenPartitionDecoder</code>对象。 */ public ScreenPartitionDecoder() { super(HEADER_SCRN); } /** * 返回 <code>serverSide</code> 的当前值。 * * @return <code>serverSide</code> 的当前值 */ public boolean isServerSide() { return serverSide; } /** * 设置 <code>serverSide</code> 的当前值。 * * @param serverSide * <code>serverSide</code> 的当前值 */ public void setServerSide(boolean serverSide) { this.serverSide = serverSide; } /** * 分区数据解码方法。 * * @param session * 会话连接 * @param in * 读入的字节缓冲 * @param contentLength * 内容长度 * @return 对应的已经装载了新数据的分区对象 * @throws Exception * 如果处理中发生异常的话 */ @Override protected ScreenPartition decodeBody(IoSession session, ByteBuffer in, int contentLength) throws Exception { if (serverSide) { return serverDecode(session, in, contentLength); } else { return viewerDecode(session, in, contentLength); } } /** * 服务端解码方法。 * * @param session * 绘画连接 * @param in * 读入的字节 * @param contentLength * 内容长度 * @return 读入的分区 * @throws Exception * 如果处理中发生异常的话 */ protected ScreenPartition serverDecode(IoSession session, ByteBuffer in, int contentLength) throws Exception { // 远程桌面对象 RemoteDesktop desktop = (RemoteDesktop) session.getAttachment(); // 编码后的字节 ByteBuffer encodedBytes = ByteBuffer.allocate(contentLength + DEFAULT_HEADER_LENGTH); // 设置标志位 encodedBytes.put(DataPacketConstants.HEADER_SCRN); // 长度 encodedBytes.putShort((short) contentLength); // 读取分区索引 int partitionIndex = in.getShort(); encodedBytes.putShort((short) partitionIndex); // 数据内容 for (int i = 0; i < contentLength - 2; i++) { encodedBytes.put(in.get()); } // 编码完成 encodedBytes.flip(); // 对应的分区 ScreenPartition screenPartition = desktop.findPartition(partitionIndex); // 设置编码后的字节 screenPartition.saveEncodedBytes(encodedBytes); // 返回对应的分区对象 return screenPartition; } /** * 客户端解码方法。 * * @param session * 会话连接 * @param in * 读入的字节 * @param contentLength * 内容长度 * @return 读入的分区 * @throws Exception * 如果处理中发生异常的话 */ protected ScreenPartition viewerDecode(IoSession session, ByteBuffer in, int contentLength) throws Exception { // 远程桌面对象 RemoteDesktop desktop = (RemoteDesktop) session.getAttachment(); // 当前数据包之后新的数据包(如果存在的话)的位置 int newPos = in.position() + contentLength; // 读取分区索引 int partitionIndex = in.getShort(); // 读取分区图像 BufferedImage image = ImageIO.read(in.asInputStream()); // 对应的分区 ScreenPartition screenPartition = desktop.findPartition(partitionIndex); // 设置新图像 screenPartition.setImage(image); // 设置字节缓冲新位置,这个必须设置一下。因为上面的ImageIO.read可能读取的字节数与发送的字节数不匹配,必须设置,以免多读或者少读,导致后面的Decoder出错 in.position(newPos); // 返回对应的分区对象 return screenPartition; }}我这些代码是mina 1的,大体上mina 2原理一致。我的功能是类似现在TeamViewer的功能,远程桌面,被控端——我称之为Puppet(傀儡,提线木偶)发送数据到服务器(Server,称之为舞台),Server根据各个观众(Viewer)的数据请求情况,发送Puppet的桌面信息。反过来,将Viewer的控制信息,通过Server传给Puppet。其中,你刚才提到的“无用数据”,其实并非无用,参考最后viewerDecode方法中,最后的in.position(newPos); protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { int startPos = 0; in.position(startPos); while (in.hasRemaining()) { byte b = in.get();// 取得第一个字节 int currentlen=in.limit(); if (b == SystemConstants.DEVICE_MSG_HEAD) {// 如果为消息头 in.position(startPos + 3); byte thirdByte = in.get();// 读取第三个字节标记 int tmpDivided = (thirdByte >>> 5) & 0x1; in.position(startPos + 4); int msgContentLength = (thirdByte & 0x3) * 256 + (in.get() & 0xff); int totalMsgLength = msgContentLength + 15; if (tmpDivided == SystemConstants.IS_DIVIDE_MSG) { totalMsgLength += 4; } if (in.limit() >= totalMsgLength) { in.position(totalMsgLength - 1);// 指向消息最大长度位置 byte tail = in.get(); if (tail == SystemConstants.DEVICE_MSG_TAIL) { byte[] aMsgBytes = new byte[totalMsgLength]; in.position(startPos); in.get(aMsgBytes, 0, totalMsgLength); out.write(aMsgBytes); return true; } else { int currentPosition = in.position(); in.position(currentPosition + 1); for (int i = currentPosition + 1; i < in.limit(); i++) { byte temp = in.get(); if (temp == SystemConstants.DEVICE_MSG_TAIL) { byte[] aMsgBytes = new byte[i + 1]; in.position(startPos); in.get(aMsgBytes, 0, i + 1); out.write(aMsgBytes); return true; } } } } else { in.position(startPos); return false; } } else { int currentPosition = in.position(); // debug System.out.println("Read first byte from buffer is:******************************" + b); System.out.println("*********start pos is:" + startPos + ",*************************current position is:" + currentPosition + ",Remaining is:" + in.remaining()); int length = in.limit(); if (currentPosition == 1) {// 如果第一个字节不是0x7e for (int i = currentPosition; i < length; i++) { byte temp = in.get(); if (temp == SystemConstants.DEVICE_MSG_TAIL) { // int reaminLength = in.remaining(); // if (reaminLength > 0) { // byte[] remainBytes = new byte[reaminLength]; // in.get(remainBytes, 0, reaminLength); // in.clear(); // in.put(remainBytes); // in.position(0); return false; // } } } } break; } } // in.position(startPos); return false; }我继承了CumulativeProtocolDecoder实现。现在发现第一包数据会莫名奇妙的被丢掉 OK了,修订后代码如下:@Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { int startPos = 0; in.position(startPos); int currentlen=in.limit(); System.out.println("Current length is*******************:"+currentlen+",current position is:"+in.position()); while (in.hasRemaining()) { byte b = in.get();// 取得第一个字节 if (b == SystemConstants.DEVICE_MSG_HEAD) {// 如果为消息头 in.position(startPos + 3); byte thirdByte = in.get();// 读取第三个字节标记 int tmpDivided = (thirdByte >>> 5) & 0x1; in.position(startPos + 4); int msgContentLength = (thirdByte & 0x3) * 256 + (in.get() & 0xff); int totalMsgLength = msgContentLength + 15; if (tmpDivided == SystemConstants.IS_DIVIDE_MSG) { totalMsgLength += 4; } if (in.limit() >= totalMsgLength) { in.position(totalMsgLength - 1);// 指向消息最大长度位置 byte tail = in.get(); if (tail == SystemConstants.DEVICE_MSG_TAIL) { byte[] aMsgBytes = new byte[totalMsgLength]; in.position(startPos); in.get(aMsgBytes, 0, totalMsgLength); out.write(aMsgBytes); return true; } else { int currentPosition = in.position();// in.position(currentPosition + 1); for (int i = currentPosition; i < in.limit(); i++) { byte temp = in.get(); if (temp == SystemConstants.DEVICE_MSG_TAIL) { byte[] aMsgBytes = new byte[i + 1]; in.position(startPos); in.get(aMsgBytes, 0, i + 1); out.write(aMsgBytes); return true; } } } } else { in.position(startPos); return false; } } else { int currentPosition = in.position(); // debug System.out.println("Read first byte from buffer is:******************************" + b); System.out.println("*********start pos is:" + startPos + ",*************************current position is:" + currentPosition + ",Remaining is:" + in.remaining()); int length = in.limit(); if (currentPosition == 1) {// 如果第一个字节不是0x7e for (int i = currentPosition; i < length; i++) { byte temp = in.get(); if (temp == SystemConstants.DEVICE_MSG_TAIL) { // int reaminLength = in.remaining(); // if (reaminLength > 0) { // byte[] remainBytes = new byte[reaminLength]; // in.get(remainBytes, 0, reaminLength); // in.clear(); // in.put(remainBytes); // in.position(0); return false; // } } } } break; } } // in.position(startPos); return false; } java 集合 hashcode Jcreator中 如何结束一个正在调试运行的程序 如死循环 碰到的头疼问题 第一次写GUI但没看到相应结果,请高手帮忙 通过java本地方法调用c++(见内),大侠帮忙看看啊,搞了一天都没成功! 在看Thinking in Java中遇到关于return的一个问题 超简单的问题,如何对一维数组初始化 JDK安装错误,请指教,谢谢! 门外汉:java的和平台无关性? 怪!!!我在用vj得applet中获取sqlserver数据时成功,但catch出错 新手求教 java 高手进....关于音乐频率
http://topic.csdn.net/u/20111030/22/a5d2c17e-ebad-4cc6-9f62-9b0b6e3f39fc.html
你光靠0x7e,是不行的,除非你的数据中根本没有0x7e
正解,当传输的数据中包含0x7e字符时就悲剧了..
最好就是以特殊字符做标识,类似TCP/IP的包头包尾一样..
是在TCP/IP之上,但包大小超过1024字节时,就很容易出现粘包、断包现象的
话说,一个逻辑包,可能分散在几个Buffer里面过来,这个你能正常处理??
目前看是正确的,目前没有用Filter,Handler是我自己写的继承了IoHandlerAdapter的实现类。
例如 前4个byte,是code,类似你刚才的0x7e,然后紧接的4个byte是你对应的code的内容的长度,然后后面是内容,这样你只需要每次按长度截图后面的对应的内容就可以了,这样也可以嵌套实现,很容易。你的那种包,是不对的,因为你都不知道包的长度,那样如果数据流很大,你岂不每个byte都要遍历,而且不能保证包的内容没有你定义的0x7e,太耗cpu了。
我们的服务器就是MINA,聊天服务的,还有文件传输等。 并发每台机器每秒消息解包打包20万次。很稳定。
我之前没有做DECODE的,只是从buffer中取出直接用的,发现在buffer中会有断包,粘包,后来在handler中对断包、粘包做了处理,现在想使用Decoder来做这个工作,MINA新手,请不吝赐教,谢谢!!
tlv是可以嵌套的,包大小,可以通过定义包头的TLV来解决。我们的跨平台程序都是这么定义的。
http://grizzly.java.net/nonav/docs/docbkx2.0/html/coreframework-samples.html
User Guide 第二章的样例:解析收到的消息。Grizzly 采用了消息过滤器的机器,在消息过滤器的 handleRead 中处理收到的消息,完整的代码详见:
http://java.net/projects/grizzly/sources/git/content/samples/framework-samples/src/main/java/org/glassfish/grizzly/samples/filterchain/GIOPFilter.javaxSocket:
http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm
第 18 节的代码以及下面的图示。xSocket 使用监听方法 onData 处理收到的消息桢。Netty:
http://netty.io/docs/3.2.6.Final/api/org/jboss/netty/handler/codec/frame/FrameDecoder.html
FrameDecoder 的 API 文档。Netty 抽象了一个“消息桢解码器”的类来处理这些。我找了一下 Mina 2 的文档,也有类似 Netty FrameDecoder 相关的类:
http://mina.apache.org/chapter-11-codec-filter.html
//解析正确,略去
}else {
int currentPosition = in.position();
int length = in.limit();
if (currentPosition == 1) {//如果第一个字节不是消息标记
for (int i = currentPosition; i < length; i++) {
byte temp = in.get();
if (temp == SystemConstants.DEVICE_MSG_TAIL) {
int reaminLength = in.remaining();
if (reaminLength > 0) {
byte[] remainBytes = new byte[reaminLength];
in.get(remainBytes, 0, reaminLength);
in.clear();
in.put(remainBytes);
in.position(0);
return false;
}
}
}
}
break;
}
}
in.position(startPos);
return false;
}现在这段解析有问题,我想把buffer中不合法的消息丢弃掉,但这样处理出错。请高手指教哈
package cn.oceanmedia.rdc.common.codec.decoder;import static cn.oceanmedia.rdc.common.DataPacketConstants.DEFAULT_HEADER_LENGTH;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoderAdapter;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;/**
* 数据包解码对象解码器抽象超类。
*
* @author Michael J Chane
* @version $Revision: 1.5 $ $Date: 2008/06/15 19:07:27 $
* @param
* <P>
* 解码器返回的数据包内容
*/
public abstract class AbstractPacketDecoder<P> extends MessageDecoderAdapter { /**
* 日志对象
*/
protected final Log log = LogFactory.getLog(getClass()); /**
* 数据包类型
*/
protected byte type; /**
* 创建一个新的<code>AbstractPacketDecoder</code>对象。
*
* @param type
* 可以解码的数据类型
*/
protected AbstractPacketDecoder(byte type) {
this.type = type;
} /**
* 判断当前解码器能否解码最新读入的数据。
*
* @param session
* 连接会话
* @param in
* 读入的字节缓冲
* @return 是否可解码: {@code MessageDecoder.OK} 可以解码;
* {@code MessageDecoder.NOT_OK} 不能解码;
* {@code MessageDecoder.NEED_DATA} 需要更多数据才能判断是否能够解码
* @see org.apache.mina.filter.codec.demux.MessageDecoder#decodable(org.apache.mina.common.IoSession,
* org.apache.mina.common.ByteBuffer)
*/
public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
// 数据包头长度
int headerLength = retrieveHeaderLength();
if (in.remaining() < headerLength) {
// 读入的字节缓冲中的剩余数据长度小于数据包头长度,需要更多数据
// 注意:即使有标志位便可以确认当前类能否解码,仍然需要确保第一次解码时能够正确读入长度数据
return NEED_DATA;
} // 数据包类型
byte contentType = in.get();
if (contentType == type) {
// 数据包类型与当前解码器可以解码的数据类型匹配,可以解码
return OK;
} else {
// 数据包类型与当前解码器可以解码的数据类型匹配,不能解码
return NOT_OK;
}
} /**
* 解码最新读入的数据。
*
* @param session
* 会话连接
* @param in
* 读入的字节缓冲
* @param out
* 存放解码结果的输出
* @return 处理结果: {@code MessageDecoder.OK} 可以解码; {@code MessageDecoder.NOT_OK}
* 不能解码; {@code MessageDecoder.NEED_DATA} 需要更多数据才能判断是否能够解码
* @throws Exception
* 如果在处理中发生异常的话
* @see org.apache.mina.filter.codec.demux.MessageDecoder#decode(org.apache.mina.common.IoSession,
* org.apache.mina.common.ByteBuffer,
* org.apache.mina.filter.codec.ProtocolDecoderOutput)
*/
public MessageDecoderResult decode(IoSession session,
ByteBuffer in,
ProtocolDecoderOutput out)
throws Exception {
// 记录原有位置
in.();
// 跳过数据包标志位
in.get();
// 数据包长度
int contentLength = getContentLength(session, in); if (in.remaining() < contentLength) {
// 当前物理数据包长度小于逻辑数据包长度,重置开始位置
in.position(in.Value());
// 需要更多数据
return NEED_DATA;
} // 解码数据体
P packet = decodeBody(session, in, contentLength);
if (packet == null) {
// 错误
return NOT_OK;
} // 保存数据包对象
out.write(packet);
// 解码成功
return OK;
} /**
* 返回数据包头的长度。
* <p>
* 针对个别包4位长度位或者没有长度位的情况,子类应该覆盖该方法,返回对应的数据包头长度。
* </p>
*
* @return 数据包头的长度
*/
protected int retrieveHeaderLength() {
return DEFAULT_HEADER_LENGTH;
} /**
* 获取数据体(不含标志位及长度位本身)的字节长度。
* <p>
* 本方法会在获取数据包类型后被调用,默认从当前字节缓冲中读取2字节长度信息。
* </p>
* <p>
* 子类可以覆盖该方法,直接返回长度信息而不从字节缓冲中读取。
* </p>
*
* @param session
* 会话连接
* @param in
* 字节缓冲
* @return 数据体长度,它可以是:
* <ul>
* <li>从数据包中特定位置读取的长度信息</li>
* <li><code>NEED_MORE_LENGTH_INFO</code>代表需要更多字节才能判断长度</li>
* </ul>
*/
protected int getContentLength(IoSession session, ByteBuffer in) {
return in.getShort();
} /**
* 解码数据体内容,并返回对应的数据对象。
*
* @param session
* 会话连接
* @param in
* 读入的字节缓冲
* @param contentLength
* 数据体长度
* @return 数据体内容解码后得到的对应的数据对象,或者 <code>null</code> 代表解码时发生错误。
* @throws Exception
* 如果在处理中发生异常的话
*/
protected abstract P decodeBody(IoSession session,
ByteBuffer in,
int contentLength) throws Exception;
}
import java.nio.charset.CharsetDecoder;import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;/**
* 客户端本信息解码器。
*
* @author Michael J Chane
* @version $Revision: 1.4 $ $Date: 2008/06/12 04:18:06 $
*/
public class ClientVersionDecoder extends AbstractPacketDecoder<ClientVersion> { /**
* 客户端版本信息数据包标志位
*/
public static final byte HEADER_VER = 0x00; /**
* 字符集:ASCII
*/
protected static final CharsetDecoder CHARSET_DECODER = Charset
.forName("ASCII").newDecoder(); /**
* 创建一个新的<code>ClientVersionDecoder</code>对象,数据包头{@link #HEADER_VER HEADER_VER}
*/
public ClientVersionDecoder() {
super(HEADER_VER);
} /**
* 解码并返回读入的客户端版本信息。
*
* @param session
* 会话连接
* @param in
* 读入的字节缓冲
* @param contentLength
* 数据包内容长度。
* @return 客户端版本信息
* @throws Exception
* 如果处理中发生异常的话
*/
@Override
protected ClientVersion decodeBody(IoSession session,
ByteBuffer in,
int contentLength) throws Exception {
// 读入字节
String versionInfo = in.getString(contentLength, CHARSET_DECODER).trim();
// 返回客户端版本信息
return new ClientVersion(versionInfo);
} /**
* 从读入的字节缓冲中读取一个字节的版本信息长度。
*
* @param session
* 会话连接
* @param in
* 字节缓冲
* @return 版本信息长度
* @throws Exception
* 如果处理中发生异常的话
*/
@Override
protected int getContentLength(IoSession session, ByteBuffer in) {
return in.get();
}}
import java.awt.image.BufferedImage;import javax.imageio.ImageIO;import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
/**
* 分区数据解码器。
*
* @author Michael J Chane
* @version $Revision: 1.3 $ $Date: 2008/06/15 19:07:27 $
*/
public class ScreenPartitionDecoder extends
AbstractPacketDecoder<ScreenPartition> { /**
* 是否是服务器侧
*/
private boolean serverSide; /**
* 创建一个新的<code>ScreenPartitionDecoder</code>对象。
*/
public ScreenPartitionDecoder() {
super(HEADER_SCRN);
} /**
* 返回 <code>serverSide</code> 的当前值。
*
* @return <code>serverSide</code> 的当前值
*/
public boolean isServerSide() {
return serverSide;
} /**
* 设置 <code>serverSide</code> 的当前值。
*
* @param serverSide
* <code>serverSide</code> 的当前值
*/
public void setServerSide(boolean serverSide) {
this.serverSide = serverSide;
} /**
* 分区数据解码方法。
*
* @param session
* 会话连接
* @param in
* 读入的字节缓冲
* @param contentLength
* 内容长度
* @return 对应的已经装载了新数据的分区对象
* @throws Exception
* 如果处理中发生异常的话
*/
@Override
protected ScreenPartition decodeBody(IoSession session,
ByteBuffer in,
int contentLength) throws Exception {
if (serverSide) {
return serverDecode(session, in, contentLength);
} else {
return viewerDecode(session, in, contentLength);
}
} /**
* 服务端解码方法。
*
* @param session
* 绘画连接
* @param in
* 读入的字节
* @param contentLength
* 内容长度
* @return 读入的分区
* @throws Exception
* 如果处理中发生异常的话
*/
protected ScreenPartition serverDecode(IoSession session,
ByteBuffer in,
int contentLength) throws Exception {
// 远程桌面对象
RemoteDesktop desktop = (RemoteDesktop) session.getAttachment();
// 编码后的字节
ByteBuffer encodedBytes = ByteBuffer.allocate(contentLength
+ DEFAULT_HEADER_LENGTH);
// 设置标志位
encodedBytes.put(DataPacketConstants.HEADER_SCRN);
// 长度
encodedBytes.putShort((short) contentLength);
// 读取分区索引
int partitionIndex = in.getShort();
encodedBytes.putShort((short) partitionIndex);
// 数据内容
for (int i = 0; i < contentLength - 2; i++) {
encodedBytes.put(in.get());
}
// 编码完成
encodedBytes.flip();
// 对应的分区
ScreenPartition screenPartition = desktop.findPartition(partitionIndex);
// 设置编码后的字节
screenPartition.saveEncodedBytes(encodedBytes);
// 返回对应的分区对象
return screenPartition;
} /**
* 客户端解码方法。
*
* @param session
* 会话连接
* @param in
* 读入的字节
* @param contentLength
* 内容长度
* @return 读入的分区
* @throws Exception
* 如果处理中发生异常的话
*/
protected ScreenPartition viewerDecode(IoSession session,
ByteBuffer in,
int contentLength) throws Exception {
// 远程桌面对象
RemoteDesktop desktop = (RemoteDesktop) session.getAttachment();
// 当前数据包之后新的数据包(如果存在的话)的位置
int newPos = in.position() + contentLength;
// 读取分区索引
int partitionIndex = in.getShort();
// 读取分区图像
BufferedImage image = ImageIO.read(in.asInputStream());
// 对应的分区
ScreenPartition screenPartition = desktop.findPartition(partitionIndex);
// 设置新图像
screenPartition.setImage(image);
// 设置字节缓冲新位置,这个必须设置一下。因为上面的ImageIO.read可能读取的字节数与发送的字节数不匹配,必须设置,以免多读或者少读,导致后面的Decoder出错
in.position(newPos);
// 返回对应的分区对象
return screenPartition;
}}我这些代码是mina 1的,大体上mina 2原理一致。我的功能是类似现在TeamViewer的功能,远程桌面,被控端——我称之为Puppet(傀儡,提线木偶)发送数据到服务器(Server,称之为舞台),Server根据各个观众(Viewer)的数据请求情况,发送Puppet的桌面信息。反过来,将Viewer的控制信息,通过Server传给Puppet。其中,你刚才提到的“无用数据”,其实并非无用,参考最后viewerDecode方法中,最后的in.position(newPos);
int startPos = 0;
in.position(startPos);
while (in.hasRemaining()) {
byte b = in.get();// 取得第一个字节
int currentlen=in.limit();
if (b == SystemConstants.DEVICE_MSG_HEAD) {// 如果为消息头
in.position(startPos + 3);
byte thirdByte = in.get();// 读取第三个字节标记
int tmpDivided = (thirdByte >>> 5) & 0x1;
in.position(startPos + 4);
int msgContentLength = (thirdByte & 0x3) * 256 + (in.get() & 0xff);
int totalMsgLength = msgContentLength + 15;
if (tmpDivided == SystemConstants.IS_DIVIDE_MSG) {
totalMsgLength += 4;
}
if (in.limit() >= totalMsgLength) {
in.position(totalMsgLength - 1);// 指向消息最大长度位置
byte tail = in.get();
if (tail == SystemConstants.DEVICE_MSG_TAIL) {
byte[] aMsgBytes = new byte[totalMsgLength];
in.position(startPos);
in.get(aMsgBytes, 0, totalMsgLength);
out.write(aMsgBytes);
return true;
} else {
int currentPosition = in.position();
in.position(currentPosition + 1);
for (int i = currentPosition + 1; i < in.limit(); i++) {
byte temp = in.get();
if (temp == SystemConstants.DEVICE_MSG_TAIL) {
byte[] aMsgBytes = new byte[i + 1];
in.position(startPos);
in.get(aMsgBytes, 0, i + 1);
out.write(aMsgBytes);
return true;
}
}
}
} else {
in.position(startPos);
return false;
}
} else {
int currentPosition = in.position();
// debug
System.out.println("Read first byte from buffer is:******************************" + b);
System.out.println("*********start pos is:" + startPos + ",*************************current position is:" + currentPosition + ",Remaining is:"
+ in.remaining());
int length = in.limit();
if (currentPosition == 1) {// 如果第一个字节不是0x7e
for (int i = currentPosition; i < length; i++) {
byte temp = in.get();
if (temp == SystemConstants.DEVICE_MSG_TAIL) {
// int reaminLength = in.remaining();
// if (reaminLength > 0) {
// byte[] remainBytes = new byte[reaminLength];
// in.get(remainBytes, 0, reaminLength);
// in.clear();
// in.put(remainBytes);
// in.position(0);
return false;
// }
}
}
}
break;
}
}
// in.position(startPos);
return false;
}
我继承了CumulativeProtocolDecoder实现。现在发现第一包数据会莫名奇妙的被丢掉
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
int startPos = 0;
in.position(startPos);
int currentlen=in.limit();
System.out.println("Current length is*******************:"+currentlen+",current position is:"+in.position());
while (in.hasRemaining()) {
byte b = in.get();// 取得第一个字节
if (b == SystemConstants.DEVICE_MSG_HEAD) {// 如果为消息头
in.position(startPos + 3);
byte thirdByte = in.get();// 读取第三个字节标记
int tmpDivided = (thirdByte >>> 5) & 0x1;
in.position(startPos + 4);
int msgContentLength = (thirdByte & 0x3) * 256 + (in.get() & 0xff);
int totalMsgLength = msgContentLength + 15;
if (tmpDivided == SystemConstants.IS_DIVIDE_MSG) {
totalMsgLength += 4;
}
if (in.limit() >= totalMsgLength) {
in.position(totalMsgLength - 1);// 指向消息最大长度位置
byte tail = in.get();
if (tail == SystemConstants.DEVICE_MSG_TAIL) {
byte[] aMsgBytes = new byte[totalMsgLength];
in.position(startPos);
in.get(aMsgBytes, 0, totalMsgLength);
out.write(aMsgBytes);
return true;
} else {
int currentPosition = in.position();
// in.position(currentPosition + 1);
for (int i = currentPosition; i < in.limit(); i++) {
byte temp = in.get();
if (temp == SystemConstants.DEVICE_MSG_TAIL) {
byte[] aMsgBytes = new byte[i + 1];
in.position(startPos);
in.get(aMsgBytes, 0, i + 1);
out.write(aMsgBytes);
return true;
}
}
}
} else {
in.position(startPos);
return false;
}
} else {
int currentPosition = in.position();
// debug
System.out.println("Read first byte from buffer is:******************************" + b);
System.out.println("*********start pos is:" + startPos + ",*************************current position is:" + currentPosition + ",Remaining is:"
+ in.remaining());
int length = in.limit();
if (currentPosition == 1) {// 如果第一个字节不是0x7e
for (int i = currentPosition; i < length; i++) {
byte temp = in.get();
if (temp == SystemConstants.DEVICE_MSG_TAIL) {
// int reaminLength = in.remaining();
// if (reaminLength > 0) {
// byte[] remainBytes = new byte[reaminLength];
// in.get(remainBytes, 0, reaminLength);
// in.clear();
// in.put(remainBytes);
// in.position(0);
return false;
// }
}
}
}
break;
}
}
// in.position(startPos);
return false;
}