解决方案 »

  1.   


    ServerHandler: @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
    throws Exception {
    Channel sendChannel = ctx.getChannel();
    doProcessFromClient(ctx, messageEvent, sendChannel);
    }
    private void doProcessFromClient(ChannelHandlerContext ctx,
    MessageEvent messageEvent, Channel sendChannel) {
    XLRequest request = (XLRequest)messageEvent.getMessage();
    byte command = CommandType.GATE_TO_CLIENT_NORMAL;
    messageManager.addSocketMessage(messageManager.handleMessage(command, ctx.getChannel(), request));
    }GateWayFactory:
    public class GatewayFactory implements ChannelPipelineFactory { private MessageDecoder messageDecoder = new MessageDecoder();
    private MessageEncoder messageEncoder = new MessageEncoder();
    private GatewayMessageHandler gatewayMessageHandler = new GatewayMessageHandler();
    ExecutionHandler executionHandler = new ExecutionHandler( 
    new OrderedMemoryAwareThreadPoolExecutor(
    SysInfo.getProcessors()*ConfigUtil.GATEWAY_PROCESS, 
    ConfigUtil.MAX_CHANNEL_MEMORY_SIZE, 
    0)
    );

    public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline cp = Channels.pipeline();
    cp.addLast("messageServerDecoder", messageDecoder);
    cp.addLast("messageServerEncoder", messageEncoder);
    cp.addLast("executionHandler", executionHandler);
    cp.addLast("gatewayMessageHandler", gatewayMessageHandler);
    return cp;
    }

    }消息队列:
    public class MessageManager {

    public static final Logger logger = LoggerFactory.getLogger(MessageManager.class); /** MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的*/
        private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>();
        private ExecutorService pool;
        
        
        /** 登记式单例类 */
        private static AtomicInteger instanceIndex = new AtomicInteger(0);
        private static Map<String, MessageManager> instanceMap = new HashMap<String, MessageManager>();
        private String instanceName = "";
        public static final String INSTANCE_CLIENT_TO_GATE = "client_to_gate";
        public static final String INSTANCE_GATE_TO_CLIENT = "gate_to_client";
        private MessageManager(String name){
         this.instanceName = name;
        }
        public String getInstanceName() {
    return instanceName;
    }
    public static MessageManager getInstance(String name){
         if(StringUtils.isBlank(name)){
         name = MessageManager.class.getName().concat(instanceIndex.get()+"");
         instanceIndex.getAndIncrement();
         }
        
         synchronized (instanceMap) {
         MessageManager instance = instanceMap.get(name);
             if(instance==null){
             instance = new MessageManager(name);
             instanceMap.put(name, instance);
             }
             return instance;
    }
        }
        public MessagePack handleMessage(byte command, Channel sendChannel, XLRequest request) {
            MessagePack messagePack = null;        switch (command) {
            case CommandType.GATE_TO_CLIENT_NORMAL: 
             messagePack = new GateMessagePack(command, sendChannel, request);
             break;
            case CommandType.CLIENT_REQUEST:
             messagePack = new ClientMessagePack(command, sendChannel, request);
            }
            return messagePack;
        }    public void start() {
         pool = Executors.newFixedThreadPool(SysInfo.getProcessors()*2);
            for(int i=0;i<SysInfo.getProcessors();i++){
              pool.execute(new PushRecvThread());
            }
        } private class PushRecvThread implements Runnable { public void run() {
    while (true) {
    // 从队列中取继承MessagePack的实例
    MessagePack message;
    try {
    message = receivedQueen.poll(1, TimeUnit.MILLISECONDS);
    if (message != null) {
    message.onHandler();
    }
    } catch (InterruptedException e) {
    logger.error(e.getMessage(), e);
    }
    }
    }
    }
    public void addSocketMessage(MessagePack message) {
    if (message != null) {
    try {
    boolean success = receivedQueen.offer(message, 1,TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    logger.error(e.getMessage(), e);
    }
    }
    }
    }代码比较多,源代码我上传到百度网盘了,地址: http://pan.baidu.com/s/17Vbyg7k用户每隔1s发送200byte数据的时候,就会出现如下的错误,而且还有个很奇怪的问题,在这7k用户建立连接的过程中,不会出现解码问题,全部建立连接后不久,就会出现解码问题。不知道是不是我写的代码有问题,各位大牛帮忙看看吧。 java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 4, maximum is 3
    at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661)
    at org.jboss.netty.buffer.AbstractChannelBuffer.readInt(AbstractChannelBuffer.java:273)
    at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:51)
    at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
    java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 825570103, maximum is 0
    at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:661)
    at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:338)
    at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:344)
    at com.zltx.game.sky.net.verify.MessageDecoder.decodeDataBuffer(MessageDecoder.java:53)
    at com.zltx.game.sky.net.verify.MessageDecoder.decode(MessageDecoder.java:37)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
      

  2.   

    java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 4, maximum is 3
    第一个错误看起来不像是什么错误,你要求读取4个字节,但实际上可读取的只有3个字节,也许是网络还没发送过来。
    java.lang.IndexOutOfBoundsException: Not enough readable bytes - Need 825570103, maximum is 0
    第二个错误比较离谱点,你要接受的数据居然有这么大?825MB?!是不是前面识别错误了?
    建议用工具截取网络数据包,看看是不是发送跟接收处理跟预期的不一致。
      

  3.   

    MessageDecoder 的decodeDataBuffer里readInt前没对readableBytes检查。考虑到Tcp分段问题,可能等整个消息到了之后再解码更好另外 netty本身即有基于长度字段的decoder
    LengthFieldBasedFrameDecoder
      

  4.   

    public class GatewayFactory implements ChannelPipelineFactory { private MessageDecoder messageDecoder = new MessageDecoder();
    private MessageEncoder messageEncoder = new MessageEncoder();
    private GatewayMessageHandler gatewayMessageHandler = new GatewayMessageHandler();
    ExecutionHandler executionHandler = new ExecutionHandler( 
    new OrderedMemoryAwareThreadPoolExecutor(
    SysInfo.getProcessors()*ConfigUtil.GATEWAY_PROCESS, 
    ConfigUtil.MAX_CHANNEL_MEMORY_SIZE, 
    0)
    );

    public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline cp = Channels.pipeline();
    cp.addLast("messageServerDecoder", messageDecoder);
    cp.addLast("messageServerEncoder", messageEncoder);

    cp.addLast("executionHandler", executionHandler);
    cp.addLast("gatewayMessageHandler", gatewayMessageHandler);
    return cp;
    }

    }错误在这里,应该是每次都要NEW一个的,cp.addLast("messageServerDecoder", new messageDecoder())