NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。了解了这个基本原理,我们结合代码看看使用,在使用上,也在分两个方向,一个是线程处理,一个是用非线程,后者比较简单,看下面代码:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.net.*;
import java.util.*;
/**
*
* @author Administrator
* @version
*/
public class NBTest {
/** Creates new NBTest */
public NBTest()
{
} public void startServer() throws Exception
{
int channels = 0;
int nKeys = 0;
int currentSelector = 0; //使用Selector
Selector selector = Selector.open(); //建立Channel 并绑定到9000端口
ServerSocketChannel ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost (),9000);
ssc.socket().bind(address); //使设定non-blocking的方式。
ssc.configureBlocking(false); //向Selector注册Channel及我们有兴趣的事件
SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
printKeyInfo(s); while(true) //不断的轮询
{
debug("NBTest: Starting select"); //Selector通过select方法通知我们我们感兴趣的事件发生了。
nKeys = selector.select();
//如果有我们注册的事情发生了,它的传回值就会大于0
if(nKeys > 0)
{
debug("NBTest: Number of keys after select operation: " +nKeys); //Selector传回一组SelectionKeys
//我们从这些key中的channel()方法中取得我们刚刚注册的channel。
Set selectedKeys = selector.selectedKeys();
Iterator i = selectedKeys.iterator();
while(i.hasNext())
{
s = (SelectionKey) i.next();
printKeyInfo(s);
debug("NBTest: Nr Keys in selector: " +selector.keys().size()); //一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
i.remove();
if(s.isAcceptable())
{
// 从channel()中取得我们刚刚注册的channel。
Socket socket = ((ServerSocketChannel)s.channel()).accept ().socket();
SocketChannel sc = socket.getChannel(); sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
System.out.println(++channels);
}
else
{
debug("NBTest: Channel not acceptable");
}
}
}
else
{
debug("NBTest: Select finished without any keys.");
} }}
private static void debug(String s)
{
System.out.println(s);
}
private static void printKeyInfo(SelectionKey sk)
{
String s = new String(); s = "Att: " + (sk.attachment() == null ? "no" : "yes");
s += ", Read: " + sk.isReadable();
s += ", Acpt: " + sk.isAcceptable();
s += ", Cnct: " + sk.isConnectable();
s += ", Wrt: " + sk.isWritable();
s += ", Valid: " + sk.isValid();
s += ", Ops: " + sk.interestOps();
debug(s);
}
/**
* @param args the command line arguments
*/
public static void main (String args[])
{
NBTest nbTest = new NBTest();
try
{
nbTest.startServer();
}
catch(Exception e)
{
e.printStackTrace();
}
}} 这是一个守候在端口9000的noblock server例子,如果我们编制一个客户端程序,就可以对它进行互动操作,或者使用telnet 主机名 90000 可以链接上。
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.net.*;
import java.util.*;
/**
*
* @author Administrator
* @version
*/
public class NBTest {
/** Creates new NBTest */
public NBTest()
{
} public void startServer() throws Exception
{
int channels = 0;
int nKeys = 0;
int currentSelector = 0; //使用Selector
Selector selector = Selector.open(); //建立Channel 并绑定到9000端口
ServerSocketChannel ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost (),9000);
ssc.socket().bind(address); //使设定non-blocking的方式。
ssc.configureBlocking(false); //向Selector注册Channel及我们有兴趣的事件
SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
printKeyInfo(s); while(true) //不断的轮询
{
debug("NBTest: Starting select"); //Selector通过select方法通知我们我们感兴趣的事件发生了。
nKeys = selector.select();
//如果有我们注册的事情发生了,它的传回值就会大于0
if(nKeys > 0)
{
debug("NBTest: Number of keys after select operation: " +nKeys); //Selector传回一组SelectionKeys
//我们从这些key中的channel()方法中取得我们刚刚注册的channel。
Set selectedKeys = selector.selectedKeys();
Iterator i = selectedKeys.iterator();
while(i.hasNext())
{
s = (SelectionKey) i.next();
printKeyInfo(s);
debug("NBTest: Nr Keys in selector: " +selector.keys().size()); //一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
i.remove();
if(s.isAcceptable())
{
// 从channel()中取得我们刚刚注册的channel。
Socket socket = ((ServerSocketChannel)s.channel()).accept ().socket();
SocketChannel sc = socket.getChannel(); sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
System.out.println(++channels);
}
else
{
debug("NBTest: Channel not acceptable");
}
}
}
else
{
debug("NBTest: Select finished without any keys.");
} }}
private static void debug(String s)
{
System.out.println(s);
}
private static void printKeyInfo(SelectionKey sk)
{
String s = new String(); s = "Att: " + (sk.attachment() == null ? "no" : "yes");
s += ", Read: " + sk.isReadable();
s += ", Acpt: " + sk.isAcceptable();
s += ", Cnct: " + sk.isConnectable();
s += ", Wrt: " + sk.isWritable();
s += ", Valid: " + sk.isValid();
s += ", Ops: " + sk.interestOps();
debug(s);
}
/**
* @param args the command line arguments
*/
public static void main (String args[])
{
NBTest nbTest = new NBTest();
try
{
nbTest.startServer();
}
catch(Exception e)
{
e.printStackTrace();
}
}} 这是一个守候在端口9000的noblock server例子,如果我们编制一个客户端程序,就可以对它进行互动操作,或者使用telnet 主机名 90000 可以链接上。
package org.javapitfalls.item2;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.io.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.lang.*;
public class Client
{
public SocketChannel client = null;
public InetSocketAddress isa = null;
public RecvThread rt = null; public Client()
{
}
public void makeConnection()
{
int result = 0;
try
{
client = SocketChannel.open();
isa = new InetSocketAddress("JUJUMAO",4900);
client.connect(isa);
client.configureBlocking(false);
receiveMessage();
}
catch(UnknownHostException e)
{
e.printStackTrace();
}
catch(IOException e)
{
e.printStackTrace();
}
while ((result = sendMessage()) != -1)
{
} try
{
client.close();
System.exit(0);
}
catch(IOException e)
{
e.printStackTrace();
}
}
public int sendMessage()
{
System.out.println("Inside SendMessage");
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
ByteBuffer bytebuf = ByteBuffer.allocate(1024);
int nBytes = 0;
try
{
msg = in.readLine();
System.out.println("msg is "+msg);
bytebuf = ByteBuffer.wrap(msg.getBytes());
nBytes = client.write(bytebuf);
System.out.println("nBytes is "+nBytes);
if (msg.equals("quit") || msg.equals("shutdown")) {
System.out.println("time to stop the client");
interruptThread();
try
{
Thread.sleep(5000);
}
catch(Exception e)
{
e.printStackTrace();
}
client.close();
return -1;
}
}
catch(IOException e)
{
e.printStackTrace();
}
System.out.println("Wrote "+nBytes +" bytes to the server");
return nBytes;
} public void receiveMessage()
{
rt = new RecvThread("Receive THread",client);
rt.start(); } public void interruptThread()
{
rt.val = false;
} public static void main(String args[])
{
Client cl = new Client();
cl.makeConnection();
} public class RecvThread extends Thread
{
public SocketChannel sc = null;
public boolean val = true;
public RecvThread(String str,SocketChannel client)
{
super(str);
sc = client;
}
public void run() { System.out.println("Inside receivemsg");
int nBytes = 0;
ByteBuffer buf = ByteBuffer.allocate(2048);
try
{
while (val)
{
while ( (nBytes = nBytes = client.read(buf)) > 0){
buf.flip();
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buf);
String result = charBuffer.toString();
System.out.println(result);
buf.flip();
}
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
}import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;
import java.lang.*;
public class NonBlockingServer
{
public Selector sel = null;
public ServerSocketChannel server = null;
public SocketChannel socket = null;
public int port = 4900;
String result = null;
public NonBlockingServer()
{
System.out.println("Inside default ctor");
}
public NonBlockingServer(int port)
{
System.out.println("Inside the other ctor");
port = port;
} public void initializeOperations() throws IOException,UnknownHostException
{
System.out.println("Inside initialization");
sel = Selector.open();
server = ServerSocketChannel.open();
server.configureBlocking(false);
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia,port);
server.socket().bind(isa);
}
public void startServer() throws IOException
{
System.out.println("Inside startserver");
initializeOperations();
System.out.println("Abt to block on select()");
SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );
while (acceptKey.selector().select() > 0 )
{
Set readyKeys = sel.selectedKeys();
Iterator it = readyKeys.iterator(); while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
it.remove();
if (key.isAcceptable()) {
System.out.println("Key is Acceptable");
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
socket = (SocketChannel) ssc.accept();
socket.configureBlocking(false);
SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
System.out.println("Key is readable");
String ret = readMessage(key);
if (ret.length() > 0) {
writeMessage(socket,ret);
}
}
if (key.isWritable()) {
System.out.println("THe key is writable");
String ret = readMessage(key);
socket = (SocketChannel)key.channel();
if (result.length() > 0 ) {
writeMessage(socket,ret);
}
}
}
}
} public void writeMessage(SocketChannel socket,String ret)
{
System.out.println("Inside the loop"); if (ret.equals("quit") || ret.equals("shutdown")) {
return;
}
File file = new File(ret);
try
{
RandomAccessFile rdm = new RandomAccessFile(file,"r");
FileChannel fc = rdm.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
fc.read(buffer);
buffer.flip();
Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
CharBuffer charBuf = dec.decode(buffer);
System.out.println(charBuf.toString());
buffer = ByteBuffer.wrap((charBuf.toString()).getBytes());
int nBytes = socket.write(buffer);
System.out.println("nBytes = "+nBytes);
result = null;
}
catch(Exception e)
{
e.printStackTrace();
} }
public String readMessage(SelectionKey key)
{
int nBytes = 0;
socket = (SocketChannel)key.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
try
{
nBytes = socket.read(buf);
buf.flip();
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buf);
result = charBuffer.toString();
}
catch(IOException e)
{
e.printStackTrace();
}
return result;
} public static void main(String args[])
{
NonBlockingServer nb = new NonBlockingServer();
try
{
nb.startServer();
}
catch (IOException e)
{
e.printStackTrace();
System.exit(-1);
}
}
}