可以参考下列的例子 http://blog.csdn.net/hzhxxx/archive/2009/12/09/4970196.aspx下列描述了JNSP(java network service platform)服务的部署和功能,如有不明白之处,自己查看源代码,我无力和没有太多时间回答太多的问题. 源代码一起提供,可以随意修改,发布,并商业化,但请保留作者的信息.为了技术的进步,请注意共享成就.目前已经发布于:http://download.csdn.net/source/1885332;http://hzhxxx.download.csdn.net/ 如果查看后有技术问题探讨和建议的,欢迎致电致信联系.系统架构设计和实现有如下特点: A. 采用 Accept-Connect 通讯模型,能统一管理系统中使用的所有 Socket 资源; B. 系统支持通信和协议分离的实现原则,支持自定义协议解析模块;通信负责数据收发,协议负责数据鉴别,两者配合实现通信和协议的和谐工作; C. 灵活支持业务处理功能重定义,分级支持慢业务和快业务的不同业务处理线程; D. 丰富灵活的配置参数,能满足高扩展性; E. 配合业务处理和通信模型,能异步的管理所有的交互步骤; F. 分布式服务设计和部署,实现动态(热拔插)的增加和较少业务服务器,减少乃至拒绝单点服务;
/** * 接收内核 socket 缓冲区的数据到连接自建立的缓冲区(支持自动增长) * * @param in * 实际接收到的数据包队列 * @return PACKET_ERROR 表示对方关闭连接, PACKET_FULL 表示数据发送完毕, PACKET_LESS * 表示数据继续接收. * @see #parse(Vector) */ public Define.DataRecvStatus recv(Vector<byte[]> in) throws Exception { Define.DataRecvStatus result = null; result = Define.DataRecvStatus.PACKET_FULL; int received = 0; m_recvbuff.limit(this.m_recvcapacity); m_onerecvbuff.clear(); //尽量把内核接收缓冲区的数据全部读取出来 while (received < m_onerecvbuff.limit() && m_onerecvbuff.hasRemaining()) { try { int len = 0; len = m_socketchannel.read(m_onerecvbuff); // 通道 数据被读取完全,是否表示这个连接已经没有数据了 if (len < 0) { // 一般是-1,可以认为连接被对方关闭 result = Define.DataRecvStatus.PACKET_ERROR; break; } else if (len == 0) { // 如果每次都是 0,会是什么情况 result = Define.DataRecvStatus.PACKET_LESS; break; } else { received += len; result = Define.DataRecvStatus.PACKET_LESS; } } catch (NotYetConnectedException e) { result = Define.DataRecvStatus.PACKET_ERROR; logger.error("oops, got an exception: ", e); break; } catch (ClosedByInterruptException e) { result = Define.DataRecvStatus.PACKET_ERROR; logger.error("oops, got an exception: ", e); break; } catch (AsynchronousCloseException e) { result = Define.DataRecvStatus.PACKET_ERROR; logger.error("oops, got an exception: ", e); break; } catch (ClosedChannelException e) { result = Define.DataRecvStatus.PACKET_ERROR; logger.error("oops, got an exception: ", e); break; } catch (IOException e) { result = Define.DataRecvStatus.PACKET_ERROR; logger.error("oops, got an exception: ", e); break; } } m_onerecvbuff.flip(); if (m_onerecvbuff.limit() > m_recvbuff.remaining()) { // 继续扩大接收缓冲区 m_recvbuff.flip(); byte[] bytes = new byte[m_recvbuff.limit()]; m_recvbuff.get(bytes, 0, m_recvbuff.limit()); m_recvcapacity *= 2; m_recvbuff = ByteBuffer.allocate(m_recvcapacity); m_recvbuff.put(bytes); } // 合并本次实际接收到的数据 m_recvbuff.put(m_onerecvbuff); // 如果数据没有接收完整,对方关闭连接,最后一段不完整数据可能被丢弃 // 如果解析协议发生错误,等同于连接被关闭,不过是服务器主动关闭 if (result == Define.DataRecvStatus.PACKET_ERROR) { if (received > 0) { parse(in); } } else { if (received > 0) { result = parse(in); } } return result; }
每个超长的分段发送,在拆分的时候分配一个唯一ID和一个seq序号以及被拆分成几条的totalNum,数据包头加上这3个值。
接收队列是一个HashMap,key为ID,value为一个类,元素:totalNum/待接收num(第一次接收时初始化为totalNum-1)/一个HashMap,类中HashMap的key为seq,value为那段文本。接收处理方法里面,每接收完一段,找对应的队列里面的ID,找到了,则检测对应的ID下的待接收num是否为1(因为当前接收的这段是最后一段,就是说待接收的刚好接收到了),如果是,表示该ID对应的都已经传完,开始按seq组装Hashmap的value,组装完后remove掉,如果不是,则待接收Num+1,对应文本put到类里的HashMap。如果找不到,则接收队列put一个新的ID及类。不知道有帮助没?
http://blog.csdn.net/hzhxxx/archive/2009/12/09/4970196.aspx下列描述了JNSP(java network service platform)服务的部署和功能,如有不明白之处,自己查看源代码,我无力和没有太多时间回答太多的问题.
源代码一起提供,可以随意修改,发布,并商业化,但请保留作者的信息.为了技术的进步,请注意共享成就.目前已经发布于:http://download.csdn.net/source/1885332;http://hzhxxx.download.csdn.net/
如果查看后有技术问题探讨和建议的,欢迎致电致信联系.系统架构设计和实现有如下特点:
A. 采用 Accept-Connect 通讯模型,能统一管理系统中使用的所有 Socket 资源;
B. 系统支持通信和协议分离的实现原则,支持自定义协议解析模块;通信负责数据收发,协议负责数据鉴别,两者配合实现通信和协议的和谐工作;
C. 灵活支持业务处理功能重定义,分级支持慢业务和快业务的不同业务处理线程;
D. 丰富灵活的配置参数,能满足高扩展性;
E. 配合业务处理和通信模型,能异步的管理所有的交互步骤;
F. 分布式服务设计和部署,实现动态(热拔插)的增加和较少业务服务器,减少乃至拒绝单点服务;
* 接收内核 socket 缓冲区的数据到连接自建立的缓冲区(支持自动增长)
*
* @param in
* 实际接收到的数据包队列
* @return PACKET_ERROR 表示对方关闭连接, PACKET_FULL 表示数据发送完毕, PACKET_LESS
* 表示数据继续接收.
* @see #parse(Vector)
*/
public Define.DataRecvStatus recv(Vector<byte[]> in) throws Exception {
Define.DataRecvStatus result = null;
result = Define.DataRecvStatus.PACKET_FULL;
int received = 0;
m_recvbuff.limit(this.m_recvcapacity);
m_onerecvbuff.clear();
//尽量把内核接收缓冲区的数据全部读取出来
while (received < m_onerecvbuff.limit() && m_onerecvbuff.hasRemaining()) {
try {
int len = 0;
len = m_socketchannel.read(m_onerecvbuff);
// 通道 数据被读取完全,是否表示这个连接已经没有数据了
if (len < 0) {
// 一般是-1,可以认为连接被对方关闭
result = Define.DataRecvStatus.PACKET_ERROR;
break;
} else if (len == 0) {
// 如果每次都是 0,会是什么情况
result = Define.DataRecvStatus.PACKET_LESS;
break;
} else {
received += len;
result = Define.DataRecvStatus.PACKET_LESS;
}
} catch (NotYetConnectedException e) {
result = Define.DataRecvStatus.PACKET_ERROR;
logger.error("oops, got an exception: ", e);
break;
} catch (ClosedByInterruptException e) {
result = Define.DataRecvStatus.PACKET_ERROR;
logger.error("oops, got an exception: ", e);
break;
} catch (AsynchronousCloseException e) {
result = Define.DataRecvStatus.PACKET_ERROR;
logger.error("oops, got an exception: ", e);
break;
} catch (ClosedChannelException e) {
result = Define.DataRecvStatus.PACKET_ERROR;
logger.error("oops, got an exception: ", e);
break;
} catch (IOException e) {
result = Define.DataRecvStatus.PACKET_ERROR;
logger.error("oops, got an exception: ", e);
break;
}
} m_onerecvbuff.flip();
if (m_onerecvbuff.limit() > m_recvbuff.remaining()) {
// 继续扩大接收缓冲区
m_recvbuff.flip();
byte[] bytes = new byte[m_recvbuff.limit()];
m_recvbuff.get(bytes, 0, m_recvbuff.limit());
m_recvcapacity *= 2;
m_recvbuff = ByteBuffer.allocate(m_recvcapacity);
m_recvbuff.put(bytes);
} // 合并本次实际接收到的数据
m_recvbuff.put(m_onerecvbuff); // 如果数据没有接收完整,对方关闭连接,最后一段不完整数据可能被丢弃
// 如果解析协议发生错误,等同于连接被关闭,不过是服务器主动关闭
if (result == Define.DataRecvStatus.PACKET_ERROR) {
if (received > 0) {
parse(in);
}
} else {
if (received > 0) {
result = parse(in);
}
} return result;
}
先介绍一下背景:
我的系统是一个时间证券系统,处理的数据量相当大每秒七八百M, 系统监听一个端口,另外一个系统以客户端的身份把发数据发送过来,不超过二十个连接,我的程序对数据做一些处理,然后再发到其它的几个服务器去。
系统在开始运行的时候很正常,但过上十几分钟以后,其中的某些连接就接收不到数据了,通过tcpdump查看发现ack报文中的win值相当小,小于256字节,而且相当的慢,不知道是什么原因造成这种情况。
服务器是8核linux虚拟机。
mina 1.x
mina 2.x
永远记住,每个逻辑包(区别于传输时的物理包)的最初几个字节(或者固定位置的几个字节)代表的是包的长度以及包的类型。