本帖最后由 uyerp 于 2012-07-22 13:37:04 编辑

解决方案 »

  1.   

    Java对象的编码过程,应该不会有什么问题。
    解码过程本身,也应该不会有什么问题。
    问题的关键,应该是在接收缓冲区上面。服务端对数据的接收上面,没有明显的,对TCP通讯过程当中,半包,粘包,的处理过程。
    也就是说,客户端每次发送的数据之间,看不到是通过什么方式来区分的。
    通俗点说,就是,本次发送的数据和下一次发送的数据,两次发送的数据,接收端通过什么方式来区分这两次的数据?一般而言,我们会自己定义一个内部协议,来区分每次发送的数据。
    比如,我在发送数据的前边,加上整个数据的长度信息,这样,通过长度就可以区分本次发送的数据到什么位置结束。服务器每次读取数据,都会创建一个缓冲区来接收数据, 但是,如果缓冲区里面有一个以上的数据,或者,由于网络不稳定,本次只接收到了半个数据(当然,这里的一个、半个是指的每次发送的数据包),楼主的解码过程,就会出现明显的漏洞了。看样子,楼主在测试代码的时候,选用的都是在同一个主机上,或者在局域网的环境下测试的吧。
    这种情况下,网络通信的质量非常高,客户端发送的数据,基本上能够无阻碍的被服务端收到,所以,每次处理接收数据的过程中,其实,接收缓冲区里面可能存在一个以上的数据了。由于程序每次只处理一个数据包,就造成了丢包问题,那些没有被处理的数据包,被程序丢弃了。正常的通信测试应该测试三个方面,半包、粘包和压力。
      

  2.   

    为了避免接受时的数据分包处理,所以,每次发送都发送了固定长度256的byte[],因为消息是固定长度的。
    接受时,每次数据也是read(byte[256]),这样应该不会有半包的情况发生吧?
    这个例子就是在同一台机器上运行,不过在网络环境下,就应该会出现半包了。
    如果我一次接受,处理多个消息,该还会丢失数据不?
    我改成一次接受,循环处理看看。
    先谢过……
      

  3.   

    还有个问题“服务器每次读取数据,都会创建一个缓冲区来接收数据”。
    这个不是很理解,就我上面的这个例子中,如果客户端已经写了50000次,在服务器我还没开始读的情况下,这些数据装在什么地方?缓冲是服务器开始读取才创建的么?
    还有,就是,我多个客户端的情况下,是不是每个客户端会创建自己独立的缓冲?也就是每个key,是独立的缓冲?
      

  4.   

    服务端的接收代码中:
    ByteBuffer buffer = (ByteBuffer) key.attachment();
            int count = channel.read(buffer);
            if (count > 0) {
                buffer.flip();
                Message meg = (Message)NioUtil.byte2Obj(buffer.array());
    这几行代码,
    ByteBuffer,就是数据缓冲区。
    int count = channel.read(buffer);这行代码,你怎么保证每次被执行的时候,count都是256?
    if (count > 0) 只能保证缓冲区中有数据,对吧?
    非阻塞通信当中,代码的调用是采用事件处理模式来实现的,但是,每次事件的响应,和你计算机已经接收了多少数据没有多大关系。
    楼主把count的值打印出来看看,是不是每次都是256,
    NioUtil.byte2Obj(buffer.array());这个方法,估计每次只处理256个字节的数据,
    那么,如果count的值大于256,那么,大于256之余的数据,不是没有被处理吗 ?
      

  5.   

    我在服务端的接收代码中,并没有看到楼主所说的 read(byte[256]) 代码,哪怕相同功能的代码也没有看到。
    看到的,只是把当前缓冲区中所有的数据都转换成byte[]数组(buffer.array())。
    至于缓冲区里面有多少数据,神仙才会知道。
      

  6.   

    一个key循环read还是读取不到所有的数据。
    难道要一次read出所有数据,才不会丢失?
      

  7.   

    那我客户端put了50000次,是非阻塞的。
    那服务端,构建缓冲时,怎么知道需要多大的缓冲区,才能读取所有数据?
    “那么,如果count的值大于256,那么,大于256之余的数据,不是没有被处理吗”
    那一次处理中超出256的数据量是肯定的,因为客户端写了那么多。
    那接收时,怎么保证超过256的部分数据也能处理?
      

  8.   

    我没认真看你服务端的代码,那个缓冲区,是你写程序创建的,和操作系统的缓冲区还不是一回事。你客户端发送了5W次的数据,但是,服务端未必同样调用了5W次来接收客户端发送的数据。这个次数,不是一一对应的。你每次只发送256个字节的数据,到了操作系统那一层,它会分组打包的。
    比如,你连续发送了多次小数据包,到了操作系统,可能会将这些连续的小数据,放到同一个IP报文中发送到接收端;当然,如果你一次性发送了一个很大的数据,到了操作系很可能会将它拆成连续的小的IP报文发送到接收端。但是无论怎样分组打包,都是由操作系统决定的。
    TCP协议,只保证你的数据可靠的顺次的被接收端接收,但是,每次接收多少,无法保证。如果发送端进行了5W次的发送,接收端只进行了4W次的接收处理,并且,传输的数据顺次可靠。
    这种情况下,你怎么办?肯定是接收端,有的接收处理过程,存在一次处理了一个以上发送端发送的数据。
      

  9.   

     ByteBuffer buffer = (ByteBuffer) key.attachment();
            int count = channel.read(buffer);
            if (count > 0) {
                buffer.flip();
                Message meg = (Message)NioUtil.byte2Obj(buffer.array());
                // 客户端请求类型
                int type = meg.getType();这几行代码,把if改成while循环,进行接收处理。ByteBuffer buffer = (ByteBuffer) key.attachment();
            int count = 0;
            while((count = channel.read(buffer))==256) {//保证每次处理一个完整的包

                buffer.flip();
                Message meg = (Message)NioUtil.byte2Obj(buffer.array());
                buffer.clear();//清空数据,以备下一次的接收处理操作。
                // 客户端请求类型
                int type = meg.getType();之后,if语句的else if分支,可以删掉了。} else if(count < 0){
                System.out.println("错啦");
                channel.close();
            }
            key.interestOps(SelectionKey.OP_READ);吧“}”后面的else if 等等语句块删掉。
    最终变为:
             } 
            key.interestOps(SelectionKey.OP_READ);
      

  10.   

    我改了代码,比前面的简单,就服务器,客户端。
    统计显示,服务接收的数据还是不对,有劳看看……
    服务端:
    public class NIOServer { // 超时时间,单位毫秒
    private static final int TimeOut = 6000; // 本地监听端口
    private static final int ListenPort = 33445; protected Selector selector; public static void main(String[] args) throws IOException {
    // 创建选择器
    Selector selector = Selector.open(); // 打开监听信道
    ServerSocketChannel listenerChannel = ServerSocketChannel.open(); // 与本地端口绑定
    listenerChannel.socket().bind(new InetSocketAddress(ListenPort)); // 设置为非阻塞模式
    listenerChannel.configureBlocking(false); // 将选择器绑定到监听信道,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作
    listenerChannel.register(selector, SelectionKey.OP_ACCEPT); // 反复循环,等待IO
    while (true) {
    // 等待某信道就绪(或超时)
    if (selector.select(TimeOut) == 0) {
    //System.out.println("服务器运行中……");
    System.out.println("服务器上次共接受"+count+"数据");
    continue;
    }
    // 取得迭代器.selectedKeys()中包含了每个准备好某一I/O操作的信道的SelectionKey
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    try {
    if (key.isAcceptable()) {
    // 有客户端连接请求时
    handleKey(key);
    }
    if (key.isReadable()) {
    // 从客户端读取数据
    handleKey(key);
    }
    } catch (IOException e) {
    // 出现IO异常(如客户端断开连接)时移除处理过的键
    e.printStackTrace();
    key.channel().close();
    continue;
    }
    }
    }
    }
    private static int count;
    // 处理事件
    private static void handleKey(SelectionKey key) throws IOException {
    if (key.isAcceptable()) { // 接收请求
    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
    clientChannel.configureBlocking(false);
    clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocate(256));
    } else if (key.isReadable()) { // 读信息
    SocketChannel channel = (SocketChannel) key.channel();
    // 拿到256长度的缓冲区,一个key一个
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    int read = 0;
    while ((read = channel.read(buffer)) == 256) {
    count++;
    buffer.flip();
    String meg = (String) byte2Obj(buffer.array());
    buffer.clear();
    // 向服务器发数据
    System.out.println("接收客户端消息," + meg);
    // 向客户端写成功消息
    channel.write(ByteBuffer.wrap("OK".getBytes()));
    }
    key.interestOps(SelectionKey.OP_READ);
    } else if (key.isWritable()) { // 写事件 }
    } /**
     * Byte数组换为对象转
     */
    public static Object byte2Obj(byte[] byteArry) throws IOException {
    DatagramPacket dp = new DatagramPacket(byteArry, byteArry.length);
    ByteArrayInputStream bais = new ByteArrayInputStream(dp.getData());
    BufferedInputStream zipIn = new BufferedInputStream(bais);
    ObjectInputStream ois = new ObjectInputStream(zipIn);
    // 获取消息对象
    Object obj = null;
    try {
    obj = ois.readObject();
    ois.close();
    zipIn.close();
    bais.close();
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    System.out.println("读取客户端消息 <类型> 异常," + e.getMessage());
    }
    return obj;
    }
    }
    客户端:
    public class NIOClient { // 信道选择器
    private Selector selector = null; // 与服务器通信的信道
    private SocketChannel socketChannel = null; // 要连接的服务器Ip地址
    private String hostIp = "localhost"; // 要连接的远程服务器在监听的端口
    private int hostListenningPort = 33445; /**
     * 构造函数
     */
    public NIOClient() {
    try {
    initialize();
    } catch (IOException e) {
    System.out.println("初始化服务器连接异常" + e.getMessage());
    e.printStackTrace();
    }
    } /**
     * 初始化函数
     * 
     * @throws IOException
     *             异常
     */
    private void initialize() throws IOException {
    // 打开监听信道并设置为非阻塞模式
    socketChannel = SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort));
    socketChannel.configureBlocking(false);
    // 打开并注册选择器到信道
    selector = Selector.open();
    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(256));
    // 启动读取线程
    new TCPClientReadThread(selector);
    } /**
     * 想服务器发送消息的方法
     * 
     * @param obj
     *            要写入服务器的消息
     * @throws IOException
     *             异常
     */
    public void put(Object obj) throws IOException {
    // 对象转数组
    byte[] objarr = obj2Byte(obj);
    // 将对象赋值给固定长度的数组
    byte[] buffer = new byte[256];
    System.arraycopy(objarr, 0, buffer, 0, objarr.length);
    socketChannel.write(ByteBuffer.wrap(buffer));
    }
    /**
     * 客户端监听
     */
    class TCPClientReadThread implements Runnable {
    private Selector selector;
    // 超时时间,单位毫秒
    private static final int TimeOut = 3000; public TCPClientReadThread(Selector selector) {
    this.selector = selector;
    new Thread(this).start();
    } public void run() {
    try {
    while (true) {
    // 等待某信道就绪(或超时)
    if (selector.select(TimeOut) == 0) {
    //System.out.println("客户端运行中……");
    continue;
    }
    // 处理服务器返回消息
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    } /**
     * 对象转换为Byte数组
     */
    public static byte[] obj2Byte(Object obj) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BufferedOutputStream zipOut = new BufferedOutputStream(baos);
    ObjectOutputStream ous = new ObjectOutputStream(zipOut);
    ous.writeObject(obj);
    ous.flush();
    ous.close();
    zipOut.close();
    baos.close();
    byte[] arr = baos.toByteArray();
    return arr;
    } private static int count;
    public static void main(String[] args) throws IOException {
    NIOClient client = new NIOClient();
    for (int i = 0; i < 50000; i++) {
    client.put("data" + i);
    count++;
    }
    System.out.println("客户端发送"+count+"次");
    }
    }
      

  11.   

    加分吧。累死我了。
    package net.csdn.bbs.uyerp;import java.io.BufferedOutputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectableChannel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.concurrent.ConcurrentLinkedQueue;public class TesetNioClient {

     /**
         * 对象转换为Byte数组
         */
        public static byte[] obj2Byte(Object obj) throws IOException {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BufferedOutputStream zipOut = new BufferedOutputStream(baos);
            ObjectOutputStream ous = new ObjectOutputStream(zipOut);
            ous.writeObject(obj);
            ous.flush();
            ous.close();
            zipOut.close();
            baos.close();
            byte[] arr = baos.toByteArray();
            return arr;
        } static class SelectorHolder extends Thread{
    private Selector selector;
    public SelectorHolder(Selector selector){
    this.selector = selector;
    }

    public void run(){
    try {
    while(true){
    if(selector.select()<=0){
    try{Thread.sleep(1);}catch(Exception e){break;}
    continue;
    }
    for(Iterator<SelectionKey> iter = selector.selectedKeys().iterator();iter.hasNext();){
    SelectionKey key = iter.next();
    SelectableChannel selectableChannel = key.channel();
    iter.remove();
    if(key.isWritable()){
    onWrite(key,(SocketChannel)selectableChannel);
    }
    if(key.isConnectable()){
    onConnect(key,(SocketChannel)selectableChannel);
    }
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    private void onWrite(SelectionKey key, SocketChannel channel) throws IOException {
    if(!sendQueue.isEmpty()){
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    buffer.clear();
    Object obj = sendQueue.poll();
    buffer.put(obj2Byte(obj));
    buffer.limit(buffer.capacity());
    buffer.rewind();
    channel.write(buffer);
    }
    } private void onConnect(SelectionKey key, SocketChannel channel) throws IOException {
    if(channel.isConnectionPending()){
    channel.finishConnect();
    }
    channel.register(key.selector(), SelectionKey.OP_WRITE, ByteBuffer.allocate(256));
    }
    }

    static ConcurrentLinkedQueue<Object> sendQueue = new ConcurrentLinkedQueue<Object>();

    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
    SocketChannel channel  = SocketChannel.open();
    channel.configureBlocking(false);
    Selector selector = Selector.open();
    channel.register(selector, SelectionKey.OP_CONNECT);
    channel.connect(new InetSocketAddress("localhost", 33445));
    SelectorHolder holder = new SelectorHolder(selector);
    holder.start();

    int count = 0;
    for (int i = 0; i < 50000; i++) {
                sendQueue.add("data" + i);
                count++;
            }
            System.out.println("客户端发送"+count+"次");
    }}
      

  12.   

    服务端,
    不要调用 key.interestOps(SelectionKey.OP_READ); 这个方法。
    很费解为啥要写这一句,根本没啥必要嘛。
      

  13.   

    “阻塞和非阻塞的形式,写到一起了”,数据丢失的原因是,服务器还没接收到数据,发送端就已经关闭,是么?所以,要放到线程里跑。
    我网上找的例子,都这写的。
    key.interestOps这句,我看网上例子也是这么写的。
    不过,还得给我解释下“阻塞和非阻塞的形式,写到一起了”。
    有劳你半夜还给我写代码,非常感谢。必须加分,你给我解释下上面的那句话呢,我结贴,呵呵……
      

  14.   

    “阻塞和非阻塞的形式,写到一起了”,数据丢失的原因是,服务器还没接收到数据,发送端就已经关闭,是么?不是这样的,楼主你的代码改一个地方就可以了。
        public void put(Object obj) throws IOException {
            ...
            //关键是这一句
            //socketChannel.write(ByteBuffer.wrap(buffer));
            //改成如下
              while(buff.hasRemaining()){
                 socketChannel.write(buff);
            }
        }
      

  15.   

    to:qunhao
    是可以哦,不过速度好像比preferme的慢了很多。
    何故?
      

  16.   

    TCP协议的阻塞通信和非阻塞通信,代码风格是不一样的。非阻塞通信,代码编写,类似事件处理模型。楼主还是多写写IO方面的程序吧,IO和NIO编程的思路是不相同的。略微改进了一下代码:
    private void onWrite(SelectionKey key, SocketChannel channel) throws IOException {
                if(sendQueue.isEmpty()){
                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                }else{
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    buffer.clear();
                    Object obj = sendQueue.poll();
                    buffer.put(obj2Byte(obj));
                    buffer.limit(buffer.capacity());
                    buffer.rewind();
                    channel.write(buffer);
                }
            }public static void main(String[] args) throws IOException {
            SocketChannel channel  = SocketChannel.open();
            channel.configureBlocking(false);
            Selector selector = Selector.open();
            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
            channel.connect(new InetSocketAddress("localhost", 33445));
            SelectorHolder holder = new SelectorHolder(selector);
            holder.start();
            
            int count = 0;
            for (int i = 0; i < 50000; i++) {
                sendQueue.add("data" + i);
                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                selector.wakeup();
                count++;
            }
            System.out.println("客户端发送"+count+"次");
        }
      

  17.   

    数据丢失的原因是因为,
    发送速度(调用write方法发送数据),大于IO的写入(网络实际上已经发送的数据)速度。NIO的数据读写是异步操作,当操作系统的缓冲区满时,是无法成功写入的。楼主的代码:
    public void put(Object obj) throws IOException {
            // 对象转数组
            byte[] objarr = obj2Byte(obj);
            // 将对象赋值给固定长度的数组
            byte[] buffer = new byte[256];
            System.arraycopy(objarr, 0, buffer, 0, objarr.length);
            socketChannel.write(ByteBuffer.wrap(buffer));//这条语句要注意。
        }socketChannel.write(ByteBuffer.wrap(buffer));
    这个方法,是有返回值的,返回写入数据的长度,不是每次都是256的,当操作系统发送缓冲区填满时,
    该方法不会返回256,说明,没有将buffer对象中的数据全部进行写入。
      

  18.   

    多谢preferme解答
    你的解释也让我明白了qunhao说的那行代码的意思。
    IO的代码本身很少写,基本没写过用于生产的,这次也学到很多。非常谢谢~