/**
*
*/
package MinaServer.tool;import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;import common.dataList.DataType;/**
* @ClassName: MyProtocalDecoder.java
* @Description: 读取仪表消息帧的解析方法,解决粘包、断包问题
* @author wp
* @version V1.0
* @Date 2013-4-12 上午09:18:03
*/public class AnalysisProtocalDecoder implements ProtocolDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
private final Charset charset;
private int maxPackLength = 100;
public AnalysisProtocalDecoder() {
this(Charset.defaultCharset());
} public AnalysisProtocalDecoder(Charset charset) {
this.charset = charset;
} public int getMaxLineLength() {
return maxPackLength;
} public void setMaxLineLength(int maxLineLength) {
if (maxLineLength <= 0) {
throw new IllegalArgumentException("maxLineLength: "
+ maxLineLength);
}
this.maxPackLength = maxLineLength;
} private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
} public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// synchronized (this) {
// 先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(in);
// 把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然后按数据包的协议进行读取 buf.();
// 判断消息头是否合法
String headStr = buf.getHexDump().toString();
// 检查消息头是否合法,不合法清空buffer
if (headStr != null && headStr.length() > 8) {
if (headStr.trim().startsWith("68 03")
|| headStr.trim().startsWith("68 04")
|| headStr.trim().startsWith("68 05")
|| headStr.trim().startsWith("68 F1")
|| headStr.trim().startsWith("68 F2")
|| headStr.trim().startsWith("68 F3")
|| headStr.trim().startsWith("68 F4")
|| headStr.trim().startsWith("68 84")) {
// 得到当前消息帧长度
String len = buf.getHexDump().toString().substring(6, 8);
int length = Integer.valueOf(len, 16) + 4;
// 循环读取,直到当前帧的长度小于消息帧的固定长度
while (buf.remaining() >= length) {
int oldLimit2 = buf.limit();
int oldLength = buf.remaining();
// 若当前帧的长度不小于消息帧的固定长度,则读取长度为length的数据
buf.limit(buf.position() + length);
String content = buf.getHexDump().toString();
AnalysisProtocalPack pack = new AnalysisProtocalPack(
session, content);// 传递session信息到接收端,以获取门机信息
out.write(pack); if (oldLength - length != 0) {
// 如果当前帧的原始长度不等于消息帧的固定长度,表示有粘包数据,那么取出多余的数据和其帧的长度进行下一次循环
buf.limit(oldLimit2);
buf.position(buf.position() + length); // 得到下一帧的长度
len = buf.getHexDump().toString().substring(6, 8);
length = Integer.valueOf(len, 16) + 4;
} else {
// 否则清空
buf.limit(0);
buf.position(0);
break;
}
}
if (buf.hasRemaining()) {// 如果有剩余的数据,则放入session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength)
.setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
buf.shrink();
} else {// 如果数据已经处理完毕,进行清空
buf.limit(0);
buf.position(0);
}
} else {// 数据格式不合法则清空buf
DataType.logger.error("数据格式错误:" + buf.getHexDump().toString());
buf.limit(0);
buf.position(0);
}
} else {// 数据长度不足8位的放入session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(
true).shrink();
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
buf.shrink();
}
// }
} public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
} public void dispose(IoSession session) throws Exception {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx != null) {
session.removeAttribute(CONTEXT);
}
} // 记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
// 所以,需要上下文拼起来才能完整的处理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0; private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(80).setAutoExpand(true);
} @SuppressWarnings("unused")
public CharsetDecoder getDecoder() {
return decoder;
} public IoBuffer getBuffer() {
return buf;
} @SuppressWarnings("unused")
public int getOverflowPosition() {
return overflowPosition;
} @SuppressWarnings("unused")
public int getMatchCount() {
return matchCount;
} @SuppressWarnings("unused")
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
} @SuppressWarnings("unused")
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
} public void append(IoBuffer in) {
getBuffer().put(in);
} }
}高人帮忙看看,有什么可以优化的么?
*
*/
package MinaServer.tool;import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;import common.dataList.DataType;/**
* @ClassName: MyProtocalDecoder.java
* @Description: 读取仪表消息帧的解析方法,解决粘包、断包问题
* @author wp
* @version V1.0
* @Date 2013-4-12 上午09:18:03
*/public class AnalysisProtocalDecoder implements ProtocolDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");
private final Charset charset;
private int maxPackLength = 100;
public AnalysisProtocalDecoder() {
this(Charset.defaultCharset());
} public AnalysisProtocalDecoder(Charset charset) {
this.charset = charset;
} public int getMaxLineLength() {
return maxPackLength;
} public void setMaxLineLength(int maxLineLength) {
if (maxLineLength <= 0) {
throw new IllegalArgumentException("maxLineLength: "
+ maxLineLength);
}
this.maxPackLength = maxLineLength;
} private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
} public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// synchronized (this) {
// 先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(in);
// 把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然后按数据包的协议进行读取 buf.();
// 判断消息头是否合法
String headStr = buf.getHexDump().toString();
// 检查消息头是否合法,不合法清空buffer
if (headStr != null && headStr.length() > 8) {
if (headStr.trim().startsWith("68 03")
|| headStr.trim().startsWith("68 04")
|| headStr.trim().startsWith("68 05")
|| headStr.trim().startsWith("68 F1")
|| headStr.trim().startsWith("68 F2")
|| headStr.trim().startsWith("68 F3")
|| headStr.trim().startsWith("68 F4")
|| headStr.trim().startsWith("68 84")) {
// 得到当前消息帧长度
String len = buf.getHexDump().toString().substring(6, 8);
int length = Integer.valueOf(len, 16) + 4;
// 循环读取,直到当前帧的长度小于消息帧的固定长度
while (buf.remaining() >= length) {
int oldLimit2 = buf.limit();
int oldLength = buf.remaining();
// 若当前帧的长度不小于消息帧的固定长度,则读取长度为length的数据
buf.limit(buf.position() + length);
String content = buf.getHexDump().toString();
AnalysisProtocalPack pack = new AnalysisProtocalPack(
session, content);// 传递session信息到接收端,以获取门机信息
out.write(pack); if (oldLength - length != 0) {
// 如果当前帧的原始长度不等于消息帧的固定长度,表示有粘包数据,那么取出多余的数据和其帧的长度进行下一次循环
buf.limit(oldLimit2);
buf.position(buf.position() + length); // 得到下一帧的长度
len = buf.getHexDump().toString().substring(6, 8);
length = Integer.valueOf(len, 16) + 4;
} else {
// 否则清空
buf.limit(0);
buf.position(0);
break;
}
}
if (buf.hasRemaining()) {// 如果有剩余的数据,则放入session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength)
.setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
buf.shrink();
} else {// 如果数据已经处理完毕,进行清空
buf.limit(0);
buf.position(0);
}
} else {// 数据格式不合法则清空buf
DataType.logger.error("数据格式错误:" + buf.getHexDump().toString());
buf.limit(0);
buf.position(0);
}
} else {// 数据长度不足8位的放入session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(
true).shrink();
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
buf.shrink();
}
// }
} public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
} public void dispose(IoSession session) throws Exception {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx != null) {
session.removeAttribute(CONTEXT);
}
} // 记录上下文,因为数据触发没有规模,很可能只收到数据包的一半
// 所以,需要上下文拼起来才能完整的处理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0; private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(80).setAutoExpand(true);
} @SuppressWarnings("unused")
public CharsetDecoder getDecoder() {
return decoder;
} public IoBuffer getBuffer() {
return buf;
} @SuppressWarnings("unused")
public int getOverflowPosition() {
return overflowPosition;
} @SuppressWarnings("unused")
public int getMatchCount() {
return matchCount;
} @SuppressWarnings("unused")
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
} @SuppressWarnings("unused")
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
} public void append(IoBuffer in) {
getBuffer().put(in);
} }
}高人帮忙看看,有什么可以优化的么?
不过我记得编码解码器这里面本来就有的啊
@ygycomon ,jvm内存一共1G,一小时10M,跑不到10天它就挂了
MessageDecoderResult result; if (status.equals("ready") && in.get(0) != '{') {
result = NOT_OK;
} else {
if (in.get(last) == (byte) Constants.END_SIGN) {
status = "ready";
result = OK;
} else {
status = "pending";
result = NEED_DATA;
}
} return result;
} public MessageDecoderResult decode(IoSession ioSession,
IoBuffer in, ProtocolDecoderOutput out) throws Exception { String str = in.getString(decoder).trim();
String[] jsons = str.split("\n+"); for (String json : jsons) {
log.debug("Received original json: " + json); JsonRequestMessage request;
try {
request = JsonUtils.parseRequest(json);
} catch (Exception e) {
log.error("Error parsing request message", e); request = new JsonRequestMessage();
request.setValid(false);
} request.setOriginalString(json); ClientInfo info = request.getClientInfo();
if (info == null) {
info = new ClientInfo();
request.setClientInfo(info);
} info.setIpAddress(((InetSocketAddress) ioSession.getRemoteAddress()).getAddress().getHostAddress()); out.write(request);
} return OK;
} @Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
this.status = "ready";
}
}