使用传统的SOCKET编程相信很多人都写过,在处理数据时也是使用多线程来提高并发处理数。在NIO中,我们可以使用selector的多路复用技术来异步处理,不过这里有一个问题,就是说这相当于是一个监听者模式的处理实现。
   当我们在selector.select()时,会把发生了相关事件的selectKey选择出来。例如  //使用Selector   
    Selector   selector   =   Selector.open();   
    
    //建立Channel   并绑定到9000端口   
    ServerSocketChannel   ssc   =   ServerSocketChannel.open();   
    InetSocketAddress   address   =   new   InetSocketAddress(InetAddress.getLocalHost(),9000);     
    ssc.socket().bind(address);   
    
    //使设定non-blocking的方式。   
    ssc.configureBlocking(false);   
    //向Selector注册Channel及我们有兴趣的事件   
    SelectionKey   s   =   ssc.register(selector,   SelectionKey.OP_ACCEPT);   
    printKeyInfo(s);   
    while(true)   //不断的轮询   
    {   
      debug("NBTest:   Starting   select");   
    
      //Selector通过select方法通知我们我们感兴趣的事件发生了。   
      nKeys   =   selector.select();   
      //如果有我们注册的事情发生了,它的传回值就会大于0   
      if(nKeys   >   0)   
      {   
        debug("NBTest:   Number   of   keys   after   select   operation:   "   +nKeys);   
    
        //Selector传回一组SelectionKeys   
        //我们从这些key中的channel()方法中取得我们刚刚注册的channel。   
        Set   selectedKeys   =   selector.selectedKeys();   
        Iterator   i   =   selectedKeys.iterator();   
        while(i.hasNext())   
        {   
               s   =   (SelectionKey)   i.next();   
               printKeyInfo(s);   
               debug("NBTest:   Nr   Keys   in   selector:   "   +selector.keys().size());   
    
               //一个key被处理完成后,就都被从就绪关键字(ready   keys)列表中除去   
               i.remove();   
               if(s.isAcceptable())   
               {   
                   //   从channel()中取得我们刚刚注册的channel。   
                   Socket   socket   =   ((ServerSocketChannel)s.channel()).accept().socket();   
                   SocketChannel   sc   =   socket.getChannel();   
    
                   sc.configureBlocking(false);   
                   sc.register(selector,   SelectionKey.OP_READ   |SelectionKey.OP_WRITE);   
                                    System.out.println(++channels);   
               }   
               else   
               {   
                   debug("NBTest:   Channel   not   acceptable");   
               }   
          }   
     }   
     else   
     {   
        debug("NBTest:   Select   finished   without   any   keys.");   
     }   
      }   
    
这里就有几个问题了:
 1.这里可以看出,select之后的处理是一个线性过程,那么在这个过程中还可能会有不同的事件发生,此时,是否selector会开启一个专门收集这些事件的线程来收集这些事件,直到这里的迭代处理完之后再次循环调用select()方法时又继续迭代处理?
 2. 如果是1的这种处理方法,意思就是,如果有个事件是接收数据的事件,是否会在这个时候selector中的那个线程会开启一个线程来接收数据到buffer中,直到,我们在select()之后从这个socket中取出这个buffer为止?
上面都是我的推测,由于没有还没有去看源代码,所以先问问大家的理解,稍后看了源代码之后再来证实一下。

解决方案 »

  1.   

    我来猜一下吧 既然之后的处理是线性的 可能就是等再次执行到select时才接收或处理事件
      

  2.   

    看了源代码,这个select是调用了一个JVMSelector.select()的native方法,完全郁闷,看不出来到底是数据和新连接到来时是如何处理的。难道这些新来的链接请求会一直等待到select()来调用?或者是虚拟机先接受这些连接,然后等select()调用的时候再给select()返回?
      

  3.   

    public class NBlockingServer { 
    int port = 8000; 
    int BUFFERSIZE = 1024; 
    Selector selector = null; 
    ServerSocketChannel serverChannel = null; 
    HashMap clientChannelMap = null;//用来存放每一个客户连接对应的套接字和通道 public NBlockingServer( int port ) { 
    this.clientChannelMap = new HashMap(); 
    this.port = port; 
    } public void initialize() throws IOException { 
    //初始化,分别实例化一个选择器,一个服务器端可选择通道 
    this.selector = Selector.open(); 
    this.serverChannel = ServerSocketChannel.open(); 
    this.serverChannel.configureBlocking(false); 
    InetAddress localhost = InetAddress.getLocalHost(); 
    InetSocketAddress isa = new InetSocketAddress(localhost, this.port ); 
    this.serverChannel.socket().bind(isa);//将该套接字绑定到服务器某一可用端口 

    //结束时释放资源 
    public void finalize() throws IOException { 
    this.serverChannel.close(); 
    this.selector.close(); 

    //将读入字节缓冲的信息解码 
    public String decode( ByteBuffer byteBuffer ) throws 
    CharacterCodingException { 
    Charset charset = Charset.forName( "ISO-8859-1" ); 
    CharsetDecoder decoder = charset.newDecoder(); 
    CharBuffer charBuffer = decoder.decode( byteBuffer ); 
    String result = charBuffer.toString(); 
    return result; 

    //监听端口,当通道准备好时进行相应操作 
    public void portListening() throws IOException, InterruptedException { 
    //服务器端通道注册OP_ACCEPT事件 
    SelectionKey acceptKey =this.serverChannel.register( this.selector, 
    SelectionKey.OP_ACCEPT ); 
    //当有已注册的事件发生时,select()返回值将大于0 
    while (acceptKey.selector().select() > 0 ) { 
    System.out.println("event happened"); 
    //取得所有已经准备好的所有选择键 
    Set readyKeys = this.selector.selectedKeys(); 
    //使用迭代器对选择键进行轮询 
    Iterator i = readyKeys.iterator(); 
    while (i 
    else if ( key.isReadable() ) {//如果是通道读准备好事件 
    System.out.println("Readable"); 
    //取得选择键对应的通道和套接字 
    SelectableChannel nextReady = 
    (SelectableChannel) key.channel(); 
    Socket socket = (Socket) key.attachment(); 
    //处理该事件,处理方法已封装在类ClientChInstance中 
    this.readFromChannel( socket.getChannel(), 
    (ClientChInstance) 
    this.clientChannelMap.get( socket ) ); 

    else if ( key.isWritable() ) {//如果是通道写准备好事件 
    System.out.println("writeable"); 
    //取得套接字后处理,方法同上 
    Socket socket = (Socket) key.attachment(); 
    SocketChannel channel = (SocketChannel) 
    socket.getChannel(); 
    this.writeToChannel( channel,"This is from server!"); 




    //对通道的写操作 
    public void writeToChannel( SocketChannel channel, String message ) 
    throws IOException { 
    ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); 
    int nbytes = channel.write( buf ); 

    //对通道的读操作 
    public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance ) 
    throws IOException, InterruptedException { 
    ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE ); 
    int nbytes = channel.read( byteBuffer ); 
    byteBuffer.flip(); 
    String result = this.decode( byteBuffer ); 
    //当客户端发出”@exit”退出命令时,关闭其通道 
    if ( result.indexOf( "@exit" ) >= 0 ) { 
    channel.close(); 

    else { 
    clientInstance.append( result.toString() ); 
    //读入一行完毕,执行相应操作 
    if ( result.indexOf( "\n" ) >= 0 ){ 
    System.out.println("client input"+result); 
    clientInstance.execute(); 



    //该类封装了怎样对客户端的通道进行操作,具体实现可以通过重载execute()方法 
    public class ClientChInstance { 
    SocketChannel channel; 
    StringBuffer buffer=new StringBuffer(); 
    public ClientChInstance( SocketChannel channel ) { 
    this.channel = channel; 

    public void execute() throws IOException { 
    String message = "This is response after reading from channel!"; 
    writeToChannel( this.channel, message ); 
    buffer = new StringBuffer(); 

    //当一行没有结束时,将当前字窜置于缓冲尾 
    public void append( String values ) { 
    buffer.append( values ); 


    //主程序 
    public static void main( String[] args ) { 
    NBlockingServer nbServer = new NBlockingServer(8000); 
    try { 
    nbServer.initialize(); 
    } catch ( Exception e ) { 
    e.printStackTrace(); 
    System.exit( -1 ); 

    try { 
    nbServer.portListening(); 

    catch ( Exception e ) { 
    e.printStackTrace(); 



      

  4.   

    上面都没有满意的答案啊,呵呵。经过几天仔细看了一下能看到的文章和源码,基本上感觉应该是这样的。  selector.select()方法调用时会在收集到事件时返回,然后就是我们一般的迭代处理事件的过程。在这个过程中,肯定可能会有新的事件到来,比如数据来了,新链接来了等等,但是这时我们仍然在处理数据,这些来了的事件会等待下一次select()的收集,当然,会体现为建立新连接时,客户端会等待服务端accept(),客户端发送数据时当然不会阻塞(这个看操作系统的缓冲有多大,还有你发送数据的速度有多快)。等到服务器select()时,会把这些事件收集起来,继续然后又让我们来进行accept()和read等。由于这个时候我们已经知道这个端口上有新链接或者数据了,所以accept()或者read的时候,就不会出现阻塞的情况。