使用传统的SOCKET编程相信很多人都写过,在处理数据时也是使用多线程来提高并发处理数。在NIO中,我们可以使用selector的多路复用技术来异步处理,不过这里有一个问题,就是说这相当于是一个监听者模式的处理实现。
当我们在selector.select()时,会把发生了相关事件的selectKey选择出来。例如 //使用Selector
Selector selector = Selector.open();
//建立Channel 并绑定到9000端口
ServerSocketChannel ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),9000);
ssc.socket().bind(address);
//使设定non-blocking的方式。
ssc.configureBlocking(false);
//向Selector注册Channel及我们有兴趣的事件
SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
printKeyInfo(s);
while(true) //不断的轮询
{
debug("NBTest: Starting select");
//Selector通过select方法通知我们我们感兴趣的事件发生了。
nKeys = selector.select();
//如果有我们注册的事情发生了,它的传回值就会大于0
if(nKeys > 0)
{
debug("NBTest: Number of keys after select operation: " +nKeys);
//Selector传回一组SelectionKeys
//我们从这些key中的channel()方法中取得我们刚刚注册的channel。
Set selectedKeys = selector.selectedKeys();
Iterator i = selectedKeys.iterator();
while(i.hasNext())
{
s = (SelectionKey) i.next();
printKeyInfo(s);
debug("NBTest: Nr Keys in selector: " +selector.keys().size());
//一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
i.remove();
if(s.isAcceptable())
{
// 从channel()中取得我们刚刚注册的channel。
Socket socket = ((ServerSocketChannel)s.channel()).accept().socket();
SocketChannel sc = socket.getChannel();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
System.out.println(++channels);
}
else
{
debug("NBTest: Channel not acceptable");
}
}
}
else
{
debug("NBTest: Select finished without any keys.");
}
}
这里就有几个问题了:
1.这里可以看出,select之后的处理是一个线性过程,那么在这个过程中还可能会有不同的事件发生,此时,是否selector会开启一个专门收集这些事件的线程来收集这些事件,直到这里的迭代处理完之后再次循环调用select()方法时又继续迭代处理?
2. 如果是1的这种处理方法,意思就是,如果有个事件是接收数据的事件,是否会在这个时候selector中的那个线程会开启一个线程来接收数据到buffer中,直到,我们在select()之后从这个socket中取出这个buffer为止?
上面都是我的推测,由于没有还没有去看源代码,所以先问问大家的理解,稍后看了源代码之后再来证实一下。
当我们在selector.select()时,会把发生了相关事件的selectKey选择出来。例如 //使用Selector
Selector selector = Selector.open();
//建立Channel 并绑定到9000端口
ServerSocketChannel ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),9000);
ssc.socket().bind(address);
//使设定non-blocking的方式。
ssc.configureBlocking(false);
//向Selector注册Channel及我们有兴趣的事件
SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
printKeyInfo(s);
while(true) //不断的轮询
{
debug("NBTest: Starting select");
//Selector通过select方法通知我们我们感兴趣的事件发生了。
nKeys = selector.select();
//如果有我们注册的事情发生了,它的传回值就会大于0
if(nKeys > 0)
{
debug("NBTest: Number of keys after select operation: " +nKeys);
//Selector传回一组SelectionKeys
//我们从这些key中的channel()方法中取得我们刚刚注册的channel。
Set selectedKeys = selector.selectedKeys();
Iterator i = selectedKeys.iterator();
while(i.hasNext())
{
s = (SelectionKey) i.next();
printKeyInfo(s);
debug("NBTest: Nr Keys in selector: " +selector.keys().size());
//一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
i.remove();
if(s.isAcceptable())
{
// 从channel()中取得我们刚刚注册的channel。
Socket socket = ((ServerSocketChannel)s.channel()).accept().socket();
SocketChannel sc = socket.getChannel();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
System.out.println(++channels);
}
else
{
debug("NBTest: Channel not acceptable");
}
}
}
else
{
debug("NBTest: Select finished without any keys.");
}
}
这里就有几个问题了:
1.这里可以看出,select之后的处理是一个线性过程,那么在这个过程中还可能会有不同的事件发生,此时,是否selector会开启一个专门收集这些事件的线程来收集这些事件,直到这里的迭代处理完之后再次循环调用select()方法时又继续迭代处理?
2. 如果是1的这种处理方法,意思就是,如果有个事件是接收数据的事件,是否会在这个时候selector中的那个线程会开启一个线程来接收数据到buffer中,直到,我们在select()之后从这个socket中取出这个buffer为止?
上面都是我的推测,由于没有还没有去看源代码,所以先问问大家的理解,稍后看了源代码之后再来证实一下。
解决方案 »
- 免费获取java教学光盘,您只需付邮费
- 问一个异常处理的问题
- 统计字数
- 50块能买到二手笔记本吗?
- 如何获取字符串的字符集??
- applet程序调用时如何显示进度
- 请问各位高手,怎么查看我的java虚拟机内存大小?
- !!!在jb7中开发cmp2.0,当在图形编辑tab页内为finder方法写query语句时,遇到问题。
- 准备学Java:菜鸟问题,http://java.sun.com/j2se/1.4.1/download.html页上所说Jre和JDK有什么区别到底要下载哪个?
- 请问在网页中两个Applet如何互相调用
- 关于getProperty("user.language")问题
- 请问jDialog如何使实现无法电击别的地方(我也描述不清,j2se高手近来看看吧)
int port = 8000;
int BUFFERSIZE = 1024;
Selector selector = null;
ServerSocketChannel serverChannel = null;
HashMap clientChannelMap = null;//用来存放每一个客户连接对应的套接字和通道 public NBlockingServer( int port ) {
this.clientChannelMap = new HashMap();
this.port = port;
} public void initialize() throws IOException {
//初始化,分别实例化一个选择器,一个服务器端可选择通道
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
InetAddress localhost = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(localhost, this.port );
this.serverChannel.socket().bind(isa);//将该套接字绑定到服务器某一可用端口
}
//结束时释放资源
public void finalize() throws IOException {
this.serverChannel.close();
this.selector.close();
}
//将读入字节缓冲的信息解码
public String decode( ByteBuffer byteBuffer ) throws
CharacterCodingException {
Charset charset = Charset.forName( "ISO-8859-1" );
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode( byteBuffer );
String result = charBuffer.toString();
return result;
}
//监听端口,当通道准备好时进行相应操作
public void portListening() throws IOException, InterruptedException {
//服务器端通道注册OP_ACCEPT事件
SelectionKey acceptKey =this.serverChannel.register( this.selector,
SelectionKey.OP_ACCEPT );
//当有已注册的事件发生时,select()返回值将大于0
while (acceptKey.selector().select() > 0 ) {
System.out.println("event happened");
//取得所有已经准备好的所有选择键
Set readyKeys = this.selector.selectedKeys();
//使用迭代器对选择键进行轮询
Iterator i = readyKeys.iterator();
while (i
else if ( key.isReadable() ) {//如果是通道读准备好事件
System.out.println("Readable");
//取得选择键对应的通道和套接字
SelectableChannel nextReady =
(SelectableChannel) key.channel();
Socket socket = (Socket) key.attachment();
//处理该事件,处理方法已封装在类ClientChInstance中
this.readFromChannel( socket.getChannel(),
(ClientChInstance)
this.clientChannelMap.get( socket ) );
}
else if ( key.isWritable() ) {//如果是通道写准备好事件
System.out.println("writeable");
//取得套接字后处理,方法同上
Socket socket = (Socket) key.attachment();
SocketChannel channel = (SocketChannel)
socket.getChannel();
this.writeToChannel( channel,"This is from server!");
}
}
}
}
//对通道的写操作
public void writeToChannel( SocketChannel channel, String message )
throws IOException {
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = channel.write( buf );
}
//对通道的读操作
public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )
throws IOException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );
int nbytes = channel.read( byteBuffer );
byteBuffer.flip();
String result = this.decode( byteBuffer );
//当客户端发出”@exit”退出命令时,关闭其通道
if ( result.indexOf( "@exit" ) >= 0 ) {
channel.close();
}
else {
clientInstance.append( result.toString() );
//读入一行完毕,执行相应操作
if ( result.indexOf( "\n" ) >= 0 ){
System.out.println("client input"+result);
clientInstance.execute();
}
}
}
//该类封装了怎样对客户端的通道进行操作,具体实现可以通过重载execute()方法
public class ClientChInstance {
SocketChannel channel;
StringBuffer buffer=new StringBuffer();
public ClientChInstance( SocketChannel channel ) {
this.channel = channel;
}
public void execute() throws IOException {
String message = "This is response after reading from channel!";
writeToChannel( this.channel, message );
buffer = new StringBuffer();
}
//当一行没有结束时,将当前字窜置于缓冲尾
public void append( String values ) {
buffer.append( values );
}
}
//主程序
public static void main( String[] args ) {
NBlockingServer nbServer = new NBlockingServer(8000);
try {
nbServer.initialize();
} catch ( Exception e ) {
e.printStackTrace();
System.exit( -1 );
}
try {
nbServer.portListening();
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}