/**
 * 
 */
package MinaServer.tool;import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;import common.dataList.DataType;/**
 * @ClassName: MyProtocalDecoder.java
 * @Description: 读取仪表消息帧的解析方法,解决粘包、断包问题
 * @author wp
 * @version V1.0
 * @Date 2013-4-12 上午09:18:03
 */public class AnalysisProtocalDecoder implements ProtocolDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
private final Charset charset;
private int maxPackLength = 100;

public AnalysisProtocalDecoder() {
this(Charset.defaultCharset());
} public AnalysisProtocalDecoder(Charset charset) {
this.charset = charset;
} public int getMaxLineLength() {
return maxPackLength;
} public void setMaxLineLength(int maxLineLength) {
if (maxLineLength <= 0) {
throw new IllegalArgumentException("maxLineLength: "
+ maxLineLength);
}
this.maxPackLength = maxLineLength;
} private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
} public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// synchronized (this) {
// 先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(in);
// 把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然后按数据包的协议进行读取 buf.();
// 判断消息头是否合法
String headStr = buf.getHexDump().toString();
// 检查消息头是否合法,不合法清空buffer
if (headStr != null && headStr.length() > 8) {
if (headStr.trim().startsWith("68 03")
|| headStr.trim().startsWith("68 04")
|| headStr.trim().startsWith("68 05")
|| headStr.trim().startsWith("68 F1")
|| headStr.trim().startsWith("68 F2")
|| headStr.trim().startsWith("68 F3")
|| headStr.trim().startsWith("68 F4")
|| headStr.trim().startsWith("68 84")) {
// 得到当前消息帧长度
String len = buf.getHexDump().toString().substring(6, 8);
int length = Integer.valueOf(len, 16) + 4;
// 循环读取,直到当前帧的长度小于消息帧的固定长度
while (buf.remaining() >= length) {
int oldLimit2 = buf.limit();
int oldLength = buf.remaining();
// 若当前帧的长度不小于消息帧的固定长度,则读取长度为length的数据
buf.limit(buf.position() + length);
String content = buf.getHexDump().toString();
AnalysisProtocalPack pack = new AnalysisProtocalPack(
session, content);// 传递session信息到接收端,以获取门机信息
out.write(pack); if (oldLength - length != 0) {
// 如果当前帧的原始长度不等于消息帧的固定长度,表示有粘包数据,那么取出多余的数据和其帧的长度进行下一次循环
buf.limit(oldLimit2);
buf.position(buf.position() + length); // 得到下一帧的长度
len = buf.getHexDump().toString().substring(6, 8);
length = Integer.valueOf(len, 16) + 4;
} else {
// 否则清空
buf.limit(0);
buf.position(0);
break;
}
}
if (buf.hasRemaining()) {// 如果有剩余的数据,则放入session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength)
.setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
buf.shrink();
} else {// 如果数据已经处理完毕,进行清空
buf.limit(0);
buf.position(0);
}
} else {// 数据格式不合法则清空buf
DataType.logger.error("数据格式错误:" + buf.getHexDump().toString());
buf.limit(0);
buf.position(0);
}
} else {// 数据长度不足8位的放入session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(
true).shrink();
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
buf.shrink();
}
// }
} public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
} public void dispose(IoSession session) throws Exception {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx != null) {
session.removeAttribute(CONTEXT);
}
} // 记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
// 所以,需要上下文拼起来才能完整的处理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0; private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(80).setAutoExpand(true);
} @SuppressWarnings("unused")
public CharsetDecoder getDecoder() {
return decoder;
} public IoBuffer getBuffer() {
return buf;
} @SuppressWarnings("unused")
public int getOverflowPosition() {
return overflowPosition;
} @SuppressWarnings("unused")
public int getMatchCount() {
return matchCount;
} @SuppressWarnings("unused")
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
} @SuppressWarnings("unused")
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
} public void append(IoBuffer in) {
getBuffer().put(in);
} }
}高人帮忙看看,有什么可以优化的么?

解决方案 »

  1.   

    mima的东西好复杂    我当时就没搞来 
    不过我记得编码解码器这里面本来就有的啊  
      

  2.   

    我不是这么写的,我是实现 org.apache.mina.filter.codec.demux.MessageDecoderAdapter,不需要这么复杂
      

  3.   

    每小时10M我觉得很小啊,你JVM内存多大?还有String len = buf.getHexDump().toString().substring(6, 8);这种东西要做长度保护,不然别人恶意给你发len值很大的报文,直接就把你内存撑爆了
      

  4.   

    @YidingHe ,能把你代码发一份给我吗,我邮箱[email protected]
    @ygycomon ,jvm内存一共1G,一小时10M,跑不到10天它就挂了
      

  5.   

        ,  解码器你继承mina的就好了。做好每个session通道进来的iobuffer的数据处理。我做的项目接几百个设备都没出现你这个问题iobuffer这个东西做好指针的位置,粘包断包的操作。不大可能会出现内存堆积的。不行你就上个jprofile工具来监控一下内存的使用情况。多开点客户端发数据,立马解决问题。
      

  6.   

    jvm不会回收内存?挂掉的信息你贴出来看看嘛
      

  7.   

    jvisalvm 连上去看看内存到底为什么没有释放。是不是你别的地方有代码存下了什么引用 ?
      

  8.   

    public class JsonRequestDecoder extends MessageDecoderAdapter {    private static final Logger log = LoggerFactory.getLogger(JsonRequestDecoder.class);    private String status = "ready";    private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer in) {        int last = in.remaining() - 1;
            MessageDecoderResult result;        if (status.equals("ready") && in.get(0) != '{') {
                result = NOT_OK;
            } else {
                if (in.get(last) == (byte) Constants.END_SIGN) {
                    status = "ready";
                    result = OK;
                } else {
                    status = "pending";
                    result = NEED_DATA;
                }
            }        return result;
        }    public MessageDecoderResult decode(IoSession ioSession,
                                           IoBuffer in, ProtocolDecoderOutput out) throws Exception {        String str = in.getString(decoder).trim();
            String[] jsons = str.split("\n+");        for (String json : jsons) {
                log.debug("Received original json: " + json);            JsonRequestMessage request;
                try {
                    request = JsonUtils.parseRequest(json);
                } catch (Exception e) {
                    log.error("Error parsing request message", e);                request = new JsonRequestMessage();
                    request.setValid(false);
                }            request.setOriginalString(json);            ClientInfo info = request.getClientInfo();
                if (info == null) {
                    info = new ClientInfo();
                    request.setClientInfo(info);
                }            info.setIpAddress(((InetSocketAddress) ioSession.getRemoteAddress()).getAddress().getHostAddress());            out.write(request);
            }        return OK;
        }    @Override
        public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
            this.status = "ready";
        }
    }
      

  9.   

    总之就是实现 decodable(), decode() 和 finishDecode() 三个方法即可。如果我上面的东东看不懂,官方文档也有例子可以参考。