NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。了解了这个基本原理,我们结合代码看看使用,在使用上,也在分两个方向,一个是线程处理,一个是用非线程,后者比较简单,看下面代码:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.net.*;
import java.util.*; 
/**
*
* @author Administrator
* @version
*/
public class NBTest {
  /** Creates new NBTest */
  public NBTest()
  {
  }  public void startServer() throws Exception
  {
  int channels = 0;
  int nKeys = 0;
  int currentSelector = 0;  //使用Selector
  Selector selector = Selector.open();  //建立Channel 并绑定到9000端口
  ServerSocketChannel ssc = ServerSocketChannel.open();
  InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost         (),9000); 
  ssc.socket().bind(address);  //使设定non-blocking的方式。
  ssc.configureBlocking(false);  //向Selector注册Channel及我们有兴趣的事件
  SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
  printKeyInfo(s);  while(true) //不断的轮询
  {
    debug("NBTest: Starting select");    //Selector通过select方法通知我们我们感兴趣的事件发生了。
    nKeys = selector.select();
    //如果有我们注册的事情发生了,它的传回值就会大于0
    if(nKeys > 0)
    {
      debug("NBTest: Number of keys after select operation: " +nKeys);      //Selector传回一组SelectionKeys
      //我们从这些key中的channel()方法中取得我们刚刚注册的channel。
      Set selectedKeys = selector.selectedKeys();
      Iterator i = selectedKeys.iterator();
      while(i.hasNext())
      {
         s = (SelectionKey) i.next();
         printKeyInfo(s);
         debug("NBTest: Nr Keys in selector: " +selector.keys().size());         //一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
         i.remove();
         if(s.isAcceptable())
         {
           // 从channel()中取得我们刚刚注册的channel。
           Socket socket = ((ServerSocketChannel)s.channel()).accept ().socket();
           SocketChannel sc = socket.getChannel();           sc.configureBlocking(false);
           sc.register(selector, SelectionKey.OP_READ    |SelectionKey.OP_WRITE);
                      System.out.println(++channels);
         }
         else
         {
           debug("NBTest: Channel not acceptable");
         }
      }
   }
   else
   {
      debug("NBTest: Select finished without any keys.");
   }  }}
private static void debug(String s)
{
  System.out.println(s);
}
private static void printKeyInfo(SelectionKey sk)
{
  String s = new String();  s = "Att: " + (sk.attachment() == null ? "no" : "yes");
  s += ", Read: " + sk.isReadable();
  s += ", Acpt: " + sk.isAcceptable();
  s += ", Cnct: " + sk.isConnectable();
  s += ", Wrt: " + sk.isWritable();
  s += ", Valid: " + sk.isValid();
  s += ", Ops: " + sk.interestOps();
  debug(s);
}
/**
* @param args the command line arguments
*/
public static void main (String args[])
{
  NBTest nbTest = new NBTest();
  try
  {
    nbTest.startServer();
  }
    catch(Exception e)
  {
    e.printStackTrace();
  }
}} 这是一个守候在端口9000的noblock server例子,如果我们编制一个客户端程序,就可以对它进行互动操作,或者使用telnet 主机名 90000 可以链接上。

解决方案 »

  1.   

    给你一个完整的例子吧
    package org.javapitfalls.item2;
    import java.nio.*;
    import java.nio.channels.*;
    import java.net.*;
    import java.io.*;
    import java.nio.channels.spi.*;
    import java.nio.charset.*;
    import java.lang.*;
    public class Client
    {
        public SocketChannel client = null;
        public InetSocketAddress isa = null;
        public RecvThread rt = null;    public Client()
        {
        }
        
    public void makeConnection()
        {
    int result = 0;
    try
    {

    client = SocketChannel.open();
    isa = new InetSocketAddress("JUJUMAO",4900);
    client.connect(isa);
    client.configureBlocking(false);
    receiveMessage();    
    }
    catch(UnknownHostException e)
    {
    e.printStackTrace();
    }
    catch(IOException e)
    {
    e.printStackTrace();
    }
    while ((result = sendMessage()) != -1)
    {
    } try
    {
    client.close();
    System.exit(0);
    }
    catch(IOException e)
    {
    e.printStackTrace();
    }
        }
        
    public int sendMessage()
        {
    System.out.println("Inside SendMessage");
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
    String msg = null;
    ByteBuffer bytebuf = ByteBuffer.allocate(1024);
    int nBytes = 0;
    try
    {
    msg = in.readLine();
    System.out.println("msg is "+msg);
    bytebuf = ByteBuffer.wrap(msg.getBytes());
    nBytes = client.write(bytebuf);
    System.out.println("nBytes is "+nBytes);
    if (msg.equals("quit") || msg.equals("shutdown")) {
    System.out.println("time to stop the client");
    interruptThread();
    try
    {
    Thread.sleep(5000);
    }
    catch(Exception e)
    {
    e.printStackTrace();
    }
    client.close();
    return -1;
    }
        
    }
            catch(IOException e)
    {
    e.printStackTrace();
    }
    System.out.println("Wrote "+nBytes +" bytes to the server");
    return nBytes;
        }    public void receiveMessage()
        {
    rt = new RecvThread("Receive THread",client);
    rt.start();    }    public void interruptThread()
        {
    rt.val = false;
        }    public static void main(String args[])
        {
    Client cl = new Client();
    cl.makeConnection();
        }    public class RecvThread extends Thread
        {
    public SocketChannel sc = null;
    public boolean val = true;

    public RecvThread(String str,SocketChannel client)
    {
    super(str);
    sc = client;
    }

    public void run() { System.out.println("Inside receivemsg");
    int nBytes = 0;
    ByteBuffer buf = ByteBuffer.allocate(2048);
    try
    {
    while (val)
    {
    while ( (nBytes = nBytes = client.read(buf)) > 0){
    buf.flip();
    Charset charset = Charset.forName("us-ascii");
    CharsetDecoder decoder = charset.newDecoder();
    CharBuffer charBuffer = decoder.decode(buf);
    String result = charBuffer.toString();
    System.out.println(result);
    buf.flip();

    }
    }

    }
    catch(IOException e)
    {
    e.printStackTrace();

    }
                 }
        }
    }import java.io.*;
    import java.nio.*;
    import java.nio.channels.*;
    import java.net.*;
    import java.util.*;
    import java.nio.charset.*;
    import java.lang.*;
    public class NonBlockingServer
    {
        public Selector sel = null;
        public ServerSocketChannel server = null;
        public SocketChannel socket = null;
        public int port = 4900;
        String result = null;
        public NonBlockingServer()
        {
    System.out.println("Inside default ctor");
        }
        
    public NonBlockingServer(int port)
        {
    System.out.println("Inside the other ctor");
    port = port;
        }    public void initializeOperations() throws IOException,UnknownHostException
        {
    System.out.println("Inside initialization");
    sel = Selector.open();
    server = ServerSocketChannel.open();
    server.configureBlocking(false);
    InetAddress ia = InetAddress.getLocalHost();
    InetSocketAddress isa = new InetSocketAddress(ia,port);
    server.socket().bind(isa);
        }
        
    public void startServer() throws IOException
        {
    System.out.println("Inside startserver");
            initializeOperations();
    System.out.println("Abt to block on select()");
    SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );

    while (acceptKey.selector().select() > 0 )
    {
        
    Set readyKeys = sel.selectedKeys();
    Iterator it = readyKeys.iterator(); while (it.hasNext()) {
    SelectionKey key = (SelectionKey)it.next();
    it.remove();
                    
    if (key.isAcceptable()) {
    System.out.println("Key is Acceptable");
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    socket = (SocketChannel) ssc.accept();
    socket.configureBlocking(false);
    SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
    }
    if (key.isReadable()) {
    System.out.println("Key is readable");
    String ret = readMessage(key);
    if (ret.length() > 0) {
    writeMessage(socket,ret);
    }
    }
    if (key.isWritable()) {
    System.out.println("THe key is writable");
    String ret = readMessage(key);
    socket = (SocketChannel)key.channel();
    if (result.length() > 0 ) {
    writeMessage(socket,ret);
    }
    }
    }
    }
        }    public void writeMessage(SocketChannel socket,String ret)
        {
    System.out.println("Inside the loop"); if (ret.equals("quit") || ret.equals("shutdown")) {
    return;
    }
    File file = new File(ret);
    try
    {

    RandomAccessFile rdm = new RandomAccessFile(file,"r");
    FileChannel fc = rdm.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    fc.read(buffer);
    buffer.flip();
        
    Charset set = Charset.forName("us-ascii");
    CharsetDecoder dec = set.newDecoder();
    CharBuffer charBuf = dec.decode(buffer);
    System.out.println(charBuf.toString());
    buffer = ByteBuffer.wrap((charBuf.toString()).getBytes());
    int nBytes = socket.write(buffer);
    System.out.println("nBytes = "+nBytes);
    result = null;
    }
    catch(Exception e)
    {
    e.printStackTrace();
    }    }
      
        public String readMessage(SelectionKey key)
        {
    int nBytes = 0;
    socket = (SocketChannel)key.channel();
            ByteBuffer buf = ByteBuffer.allocate(1024);
    try
    {
                nBytes = socket.read(buf);
    buf.flip();
    Charset charset = Charset.forName("us-ascii");
    CharsetDecoder decoder = charset.newDecoder();
    CharBuffer charBuffer = decoder.decode(buf);
    result = charBuffer.toString();
        
            }
    catch(IOException e)
    {
    e.printStackTrace();
    }
    return result;
        }    public static void main(String args[])
        {
    NonBlockingServer nb = new NonBlockingServer();
    try
    {
    nb.startServer();
    }
    catch (IOException e)
    {
    e.printStackTrace();
    System.exit(-1);
    }

    }
    }