public static final Logger logger = LoggerFactory.getLogger(MessageManager.class); /** MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的*/ private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>(); private ExecutorService pool;
/** 登记式单例类 */ private static AtomicInteger instanceIndex = new AtomicInteger(0); private static Map<String, MessageManager> instanceMap = new HashMap<String, MessageManager>(); private String instanceName = ""; public static final String INSTANCE_CLIENT_TO_GATE = "client_to_gate"; public static final String INSTANCE_GATE_TO_CLIENT = "gate_to_client"; private MessageManager(String name){ this.instanceName = name; } public String getInstanceName() { return instanceName; } public static MessageManager getInstance(String name){ if(StringUtils.isBlank(name)){ name = MessageManager.class.getName().concat(instanceIndex.get()+""); instanceIndex.getAndIncrement(); }
synchronized (instanceMap) { MessageManager instance = instanceMap.get(name); if(instance==null){ instance = new MessageManager(name); instanceMap.put(name, instance); } return instance; } } public MessagePack handleMessage(byte command, Channel sendChannel, XLRequest request) { MessagePack messagePack = null; switch (command) { case CommandType.GATE_TO_CLIENT_NORMAL: messagePack = new GateMessagePack(command, sendChannel, request); break; case CommandType.CLIENT_REQUEST: messagePack = new ClientMessagePack(command, sendChannel, request); } return messagePack; } public void start() { pool = Executors.newFixedThreadPool(SysInfo.getProcessors()*2); for(int i=0;i<SysInfo.getProcessors();i++){ pool.execute(new PushRecvThread()); } } private class PushRecvThread implements Runnable { public void run() { while (true) { // 从队列中取继承MessagePack的实例 MessagePack message; try { message = receivedQueen.poll(1, TimeUnit.MILLISECONDS); if (message != null) { message.onHandler(); } } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } } } public void addSocketMessage(MessagePack message) { if (message != null) { try { boolean success = receivedQueen.offer(message, 1,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } } }代码比较多,源代码我上传到百度网盘了,地址: http://pan.baidu.com/s/17Vbyg7k用户每隔1s发送200byte数据的时候,就会出现如下的错误,而且还有个很奇怪的问题,在这7k用户建立连接的过程中,不会出现解码问题,全部建立连接后不久,就会出现解码问题。不知道是不是我写的代码有问题,各位大牛帮忙看看吧。 java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 4, maximum is 3 at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661) at org.jboss.netty.buffer.AbstractChannelBuffer.readInt(AbstractChannelBuffer.java:273) at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:51) at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 825570103, maximum is 0 at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661) at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:338) at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:344) at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:53) at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 4, maximum is 3 第一个错误看起来不像是什么错误,你要求读取4个字节,但实际上可读取的只有3个字节,也许是网络还没发送过来。 java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 825570103, maximum is 0 第二个错误比较离谱点,你要接受的数据居然有这么大?825MB?!是不是前面识别错误了? 建议用工具截取网络数据包,看看是不是发送跟接收处理跟预期的不一致。
public class GatewayFactory implements ChannelPipelineFactory { private MessageDecoder messageDecoder = new MessageDecoder(); private MessageEncoder messageEncoder = new MessageEncoder(); private GatewayMessageHandler gatewayMessageHandler = new GatewayMessageHandler(); ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor( SysInfo.getProcessors()*ConfigUtil.GATEWAY_PROCESS, ConfigUtil.MAX_CHANNEL_MEMORY_SIZE, 0) );
ServerHandler: @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
throws Exception {
Channel sendChannel = ctx.getChannel();
doProcessFromClient(ctx, messageEvent, sendChannel);
}
private void doProcessFromClient(ChannelHandlerContext ctx,
MessageEvent messageEvent, Channel sendChannel) {
XLRequest request = (XLRequest)messageEvent.getMessage();
byte command = CommandType.GATE_TO_CLIENT_NORMAL;
messageManager.addSocketMessage(messageManager.handleMessage(command, ctx.getChannel(), request));
}GateWayFactory:
public class GatewayFactory implements ChannelPipelineFactory { private MessageDecoder messageDecoder = new MessageDecoder();
private MessageEncoder messageEncoder = new MessageEncoder();
private GatewayMessageHandler gatewayMessageHandler = new GatewayMessageHandler();
ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
SysInfo.getProcessors()*ConfigUtil.GATEWAY_PROCESS,
ConfigUtil.MAX_CHANNEL_MEMORY_SIZE,
0)
);
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline cp = Channels.pipeline();
cp.addLast("messageServerDecoder", messageDecoder);
cp.addLast("messageServerEncoder", messageEncoder);
cp.addLast("executionHandler", executionHandler);
cp.addLast("gatewayMessageHandler", gatewayMessageHandler);
return cp;
}
}消息队列:
public class MessageManager {
public static final Logger logger = LoggerFactory.getLogger(MessageManager.class); /** MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的*/
private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>();
private ExecutorService pool;
/** 登记式单例类 */
private static AtomicInteger instanceIndex = new AtomicInteger(0);
private static Map<String, MessageManager> instanceMap = new HashMap<String, MessageManager>();
private String instanceName = "";
public static final String INSTANCE_CLIENT_TO_GATE = "client_to_gate";
public static final String INSTANCE_GATE_TO_CLIENT = "gate_to_client";
private MessageManager(String name){
this.instanceName = name;
}
public String getInstanceName() {
return instanceName;
}
public static MessageManager getInstance(String name){
if(StringUtils.isBlank(name)){
name = MessageManager.class.getName().concat(instanceIndex.get()+"");
instanceIndex.getAndIncrement();
}
synchronized (instanceMap) {
MessageManager instance = instanceMap.get(name);
if(instance==null){
instance = new MessageManager(name);
instanceMap.put(name, instance);
}
return instance;
}
}
public MessagePack handleMessage(byte command, Channel sendChannel, XLRequest request) {
MessagePack messagePack = null; switch (command) {
case CommandType.GATE_TO_CLIENT_NORMAL:
messagePack = new GateMessagePack(command, sendChannel, request);
break;
case CommandType.CLIENT_REQUEST:
messagePack = new ClientMessagePack(command, sendChannel, request);
}
return messagePack;
} public void start() {
pool = Executors.newFixedThreadPool(SysInfo.getProcessors()*2);
for(int i=0;i<SysInfo.getProcessors();i++){
pool.execute(new PushRecvThread());
}
} private class PushRecvThread implements Runnable { public void run() {
while (true) {
// 从队列中取继承MessagePack的实例
MessagePack message;
try {
message = receivedQueen.poll(1, TimeUnit.MILLISECONDS);
if (message != null) {
message.onHandler();
}
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public void addSocketMessage(MessagePack message) {
if (message != null) {
try {
boolean success = receivedQueen.offer(message, 1,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
}代码比较多,源代码我上传到百度网盘了,地址: http://pan.baidu.com/s/17Vbyg7k用户每隔1s发送200byte数据的时候,就会出现如下的错误,而且还有个很奇怪的问题,在这7k用户建立连接的过程中,不会出现解码问题,全部建立连接后不久,就会出现解码问题。不知道是不是我写的代码有问题,各位大牛帮忙看看吧。 java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 4, maximum is 3
at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661)
at org.jboss.netty.buffer.AbstractChannelBuffer.readInt(AbstractChannelBuffer.java:273)
at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:51)
at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 825570103, maximum is 0
at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661)
at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:338)
at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:344)
at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:53)
at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
第一个错误看起来不像是什么错误,你要求读取4个字节,但实际上可读取的只有3个字节,也许是网络还没发送过来。
java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 825570103, maximum is 0
第二个错误比较离谱点,你要接受的数据居然有这么大?825MB?!是不是前面识别错误了?
建议用工具截取网络数据包,看看是不是发送跟接收处理跟预期的不一致。
LengthFieldBasedFrameDecoder
private MessageEncoder messageEncoder = new MessageEncoder();
private GatewayMessageHandler gatewayMessageHandler = new GatewayMessageHandler();
ExecutionHandler executionHandler = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
SysInfo.getProcessors()*ConfigUtil.GATEWAY_PROCESS,
ConfigUtil.MAX_CHANNEL_MEMORY_SIZE,
0)
);
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline cp = Channels.pipeline();
cp.addLast("messageServerDecoder", messageDecoder);
cp.addLast("messageServerEncoder", messageEncoder);
cp.addLast("executionHandler", executionHandler);
cp.addLast("gatewayMessageHandler", gatewayMessageHandler);
return cp;
}
}错误在这里,应该是每次都要NEW一个的,cp.addLast("messageServerDecoder", new messageDecoder())