最近学习非阻塞通信的知识,有点一知半解,下面的例子是参考 孙卫琴 <<java网络编程精讲>>的
高手给看看客户端代码:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;public class NonBlockingClient {   private SocketChannel socketChannel=null;
   
   private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);      // 发送缓冲区用于向服务器发送数据
   private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024); //接收缓冲区用于从服务器端接收数据   private Charset charset=Charset.forName("GBK");
   private Selector selector;
   
   public NonBlockingClient() throws IOException
   {
     socketChannel=SocketChannel.open();  //创建SocketChannel对象
     InetAddress ia=InetAddress.getLocalHost();
     InetSocketAddress isa=new InetSocketAddress(ia,10000);
     socketChannel.connect(isa);   //采用阻塞模式连接服务器
     socketChannel.configureBlocking(false); //采用非阻塞模式接收和发送数据
     System.out.println("与服务器连接成功");
     selector=Selector.open(); //创建监听器
   }
   
   public void receiveData() //从控制台接收数据到缓冲区
   {
       try{
       
      BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
      String msg=null;
      while((msg=br.readLine())!=null)
      {
  if(msg.equals("exit"))
             break;
        synchronized(sendBuffer) //同步代码块,sendBuffer为共享资源
        {
         sendBuffer.put(encode(msg+"\r\n"));
        }
        
      }
       }
       catch(IOException e)
       {
         e.printStackTrace();
       }
   }
    
   public void communicate() throws IOException
   {
     socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
     while(selector.select()>0) //采用阻塞方式,返回相关事件已经发生的SelectionKey对象的数目
     {
      Set readyKeys=selector.selectedKeys(); //返回相关事件已经发生的SelectionKey对象的集合
      Iterator it=readyKeys.iterator();
      while(it.hasNext())
      {
         SelectionKey key=null;
         try
         {
           key=(SelectionKey)it.next();
           it.remove();
           
           if(key.isReadable())
           {
            receive(key);
           }
           if(key.isWritable())
           {
            send(key);
           }
         }
         catch(IOException e)
         {
           e.printStackTrace();
           try
           {
            if(key!=null)
            {
               key.cancel();
               key.channel().close();
            }
           }
           catch(Exception e1)
           {
            e1.printStackTrace();
           }
         }
      }
     }
   }
   
   public void send(SelectionKey key) throws IOException //将缓冲区中的数据通过channel发送出去
   {
      SocketChannel socketChannel=(SocketChannel)key.channel();//得到相关的SocketChannel
      synchronized(sendBuffer) //同步代码段
      {
        sendBuffer.flip();                  //把极限设为位置,把位置设为0
        socketChannel.write(sendBuffer);    //发送数据
        sendBuffer.compact();               //删除已经发送的数据
      }
   }
   
   public void receive(SelectionKey key) throws IOException
   {
      SocketChannel socketChannel=(SocketChannel)key.channel();
      socketChannel.read(receiveBuffer);
      receiveBuffer.flip();
      String receiveData=decode(receiveBuffer);   //从接收缓冲区中解码字节序列,转换为字符串
      
      if(receiveData.indexOf("\n")==-1) //不能凑成一行数据
       return;
      
      String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1);//一行数据
      System.out.println(outputData);
      if(outputData.equals("echo:bye\r\n"));
      {
        key.cancel();
        socketChannel.close();
        System.out.println("关闭与服务器的连接");
        selector.close();
        System.exit(0); //结束程序
      }
      
      ByteBuffer temp=encode(outputData);
      receiveBuffer.position(temp.limit());
      receiveBuffer.compact();        //删除已经打印的数据
   }
   public String decode(ByteBuffer buffer) //解码
   {
      CharBuffer charBuffer=charset.decode(buffer);
      return charBuffer.toString();
   }
   
   public ByteBuffer encode(String str)   //编码
   {
      return charset.encode(str);
   }
   
   public static void main(String args[]) throws IOException
   {
       final NonBlockingClient nbc=new NonBlockingClient();
       
       Thread t=new Thread(new Runnable()
        {
         public void run()   //匿名内部类
         {
          nbc.receiveData();
         }
        });
       t.start();
       nbc.communicate();
   }  
}
服务器端代码:
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.io.*;public class NonBlockingServer 
{
    private Selector selector=null;
    private ServerSocketChannel serverSocketChannel=null;
    private int port=10000;
    private Charset charset=Charset.forName("GBK");
    
    public NonBlockingServer() throws IOException
    {
       selector=Selector.open();   //创建一个selector对象
       serverSocketChannel=ServerSocketChannel.open();   //创建一个ServerSocketChannel对象
       serverSocketChannel.socket().setReuseAddress(true);   //设置服务器端口可以重用
       serverSocketChannel.configureBlocking(false); //设置为非阻塞模式
       serverSocketChannel.socket().bind(new InetSocketAddress(port));
       
       System.out.println("服务器启动");
    
    }
    
    public void service() throws IOException
    {
       //向selector注册监听客户连接请求就绪事件
       serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
       while(selector.select()>0)
       {
         Set readyKeys=selector.selectedKeys(); //得到selector中的事件集合
         Iterator it=readyKeys.iterator();   //得到事件集合的迭代器
           while(it.hasNext())
           {
             SelectionKey key=null;
             try
             {
                key=(SelectionKey)it.next();
              it.remove();      //取出一个事件就从集合中将其删除
             
              if(key.isAcceptable()) //接收客户端的连接请求
              {
                //得到与SelectionKey关联的ServerSocketChannel
                ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
                 //得到客户端SocketChannel
                 SocketChannel socketChannel=(SocketChannel)ssc.accept();
                  System.out.println("接收到客户端连接,来自:"+socketChannel.socket().getInetAddress()
                                  +":"+socketChannel.socket().getPort());
                  socketChannel.configureBlocking(false);
                  ByteBuffer buffer=ByteBuffer.allocate(1024);                  //向selector注册与客户端socketChannel的读、写就绪事件,buffer为关联的对象
                  socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);
                  //socketChannel.register(selector,SelectionKey.OP_WRITE,buffer);
                                 
              }
              //读就绪事件发生
              if(key.isReadable())
              {
              receive(key);
              }
              //写就绪事件发生
              if(key.isWritable())
              {
                 send(key);
              }
             }
             catch(Exception e)
             {
              e.printStackTrace();
              try
              {
              if(key!=null)
              {
              key.cancel();
              key.channel().close();
              }
              }
              catch(Exception e1)
              {
              e1.printStackTrace();
              }
             }
           } //end while
       } // end while 
       
    }
    public void send(SelectionKey key) throws IOException 
    {
      
 ByteBuffer buffer=(ByteBuffer)key.attachment();   //得到与该SelectionKey关联的ByteBuffer
      SocketChannel socketChannel=(SocketChannel)key.channel();    //得到与该SelectionKey关联的SocketChannel
         
 buffer.flip();    //把极限设为位置,把位置设为0;
         
         String data=decode(buffer);  //按照GBK编码进行解码,将字节缓冲区中的数据转换为字符串
         
 if(data.indexOf("\r\n")==-1)  //如果还没有读到一行数据,就返回
          return;
         
         String outputData=data.substring(0,data.indexOf("\n")+1);   //截取一行数据,把\n包含进去
        
 System.out.print(outputData);   //在服务器端的控制台将读到的数据输出
         
 //String strToClient=new StringBuffer(outputData).reverse().toString();
         //System.out.println(strToClient);
 ByteBuffer outputBuffer=encode("服务器端来的数据: "+outputData);
         
 while(outputBuffer.hasRemaining())   //当前位置与极限之间是否还有数据
              socketChannel.write(outputBuffer);
         
         ByteBuffer temp=encode(outputData);  //把outputData转换为字节,放入ByteBuffer中
          buffer.position(temp.limit());               //把buffer的位置设置为temp的极限
          buffer.compact();                                   //删除buffer中已经处理过的数据
          
          if(outputData.equals("byte\r\n"))
          {
           key.cancel();
           socketChannel.close();
           System.out.println("关闭与客户的连接");
          }
          
    }
    public void receive(SelectionKey key) throws IOException
    {
     ByteBuffer buffer=(ByteBuffer)key.attachment();     //获得与SelectionKey关联的缓冲区
    
     SocketChannel socketChannel=(SocketChannel)key.channel();   //获得与SelectionKey关联的SocketChannel
     //创建一个ByteBuffer,用于存放读到的数据
     ByteBuffer readBuffer=ByteBuffer.allocate(32);
    
     socketChannel.read(readBuffer);  //读入数据放入到readBuffer中
    
     readBuffer.flip();     //把极限设为当前位置,再把当前位置设为0
    
     buffer.limit(buffer.capacity());    //把buffer的极限设为为容量     buffer.put(readBuffer);      //将readBuffer中的数据复制到buffer中,假定buffer的容量足够大,不会出现溢出的情况
    }
    
    private String decode(ByteBuffer buffer)    //解码
    {
     CharBuffer charBuffer=charset.decode(buffer);   //将字节转换为字符
     return charBuffer.toString();               //将字符缓冲区转换为字符串
    }
    private ByteBuffer encode(String str)        //编码
    { 
     return charset.encode(str);                   //将字符串转换为字节序列
    }
    
    public static void main(String args[]) throws Exception
    {
     NonBlockingServer server=new NonBlockingServer();
     server.service();
    }
}
问题1:
 先运行服务端 再运行客户端 客户端从键盘读一些数据 按回车 此时数据传递给服务器端 客户端打印出数据 就结束了 为什么只发送一次 客户端就结束了呢??
问题2:
 客户端输入”exit“,为什么客户端进程没有结束,而是阻塞了呢?
问题3:
 服务器端在什么情况下 执行此局代码: System.out.println("关闭与客户的连接");代码有点长,谢谢大家了

解决方案 »

  1.   

    你的第二个问题,你客户端main里面有两个线程,
    你输入exit之后,只是把main线程结束了,
    nbc.receiveData();
    这个线程没结束吧?从你的代码上下文来看,
    应该是你输入exit
    客户端会发送一个"exit"这样的字符串到服务器,
    然后break,退出掉监听控制台输入的线程
    然后服务器回发一个"echo:bye\r\n"
    这样客户端收到了会channel.close()
    退出监听网络连接的线程因为我看到你客户端代码有"echo:bye\r\n"这个处理
    服务器并没有地方发送
      

  2.   


    public void receiveData() // 从控制台接收数据到缓冲区
    {
    try { BufferedReader br = new BufferedReader(new InputStreamReader(
    System.in));
    String msg = null;
    boolean flag = false; //修改点 while ((msg = br.readLine()) != null) {
    if (msg.equals("exit")){
    msg = "byte"; //修改点 flag = true; //修改点 }
    synchronized (sendBuffer) // 同步代码块,sendBuffer为共享资源
    {
    sendBuffer.put(encode(msg + "\r\n"));
    }
    if(flag) break; //修改点
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }public void receive(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    socketChannel.read(receiveBuffer);
    receiveBuffer.flip();
    String receiveData = decode(receiveBuffer); // 从接收缓冲区中解码字节序列,转换为字符串 if (receiveData.indexOf("\n") == -1) // 不能凑成一行数据
    return; String outputData = receiveData.substring(0,
    receiveData.indexOf("\n") + 1);// 一行数据
    System.out.println(outputData);
    if (outputData.equals("服务器端来的数据: byte\r\n")) //修改点 {
    key.cancel();
    socketChannel.close();
    System.out.println("关闭与服务器的连接");
    selector.close();
    System.exit(0); // 结束程序
    } ByteBuffer temp = encode(outputData);
    receiveBuffer.position(temp.limit());
    receiveBuffer.compact(); // 删除已经打印的数据
    }
    简单修改了下 NonBlockingClient