最近学习非阻塞通信的知识,有点一知半解,下面的例子是参考 孙卫琴 <<java网络编程精讲>>的
高手给看看客户端代码:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;public class NonBlockingClient { private SocketChannel socketChannel=null;
private ByteBuffer sendBuffer=ByteBuffer.allocate(1024); // 发送缓冲区用于向服务器发送数据
private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024); //接收缓冲区用于从服务器端接收数据 private Charset charset=Charset.forName("GBK");
private Selector selector;
public NonBlockingClient() throws IOException
{
socketChannel=SocketChannel.open(); //创建SocketChannel对象
InetAddress ia=InetAddress.getLocalHost();
InetSocketAddress isa=new InetSocketAddress(ia,10000);
socketChannel.connect(isa); //采用阻塞模式连接服务器
socketChannel.configureBlocking(false); //采用非阻塞模式接收和发送数据
System.out.println("与服务器连接成功");
selector=Selector.open(); //创建监听器
}
public void receiveData() //从控制台接收数据到缓冲区
{
try{
BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=br.readLine())!=null)
{
if(msg.equals("exit"))
break;
synchronized(sendBuffer) //同步代码块,sendBuffer为共享资源
{
sendBuffer.put(encode(msg+"\r\n"));
}
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
public void communicate() throws IOException
{
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
while(selector.select()>0) //采用阻塞方式,返回相关事件已经发生的SelectionKey对象的数目
{
Set readyKeys=selector.selectedKeys(); //返回相关事件已经发生的SelectionKey对象的集合
Iterator it=readyKeys.iterator();
while(it.hasNext())
{
SelectionKey key=null;
try
{
key=(SelectionKey)it.next();
it.remove();
if(key.isReadable())
{
receive(key);
}
if(key.isWritable())
{
send(key);
}
}
catch(IOException e)
{
e.printStackTrace();
try
{
if(key!=null)
{
key.cancel();
key.channel().close();
}
}
catch(Exception e1)
{
e1.printStackTrace();
}
}
}
}
}
public void send(SelectionKey key) throws IOException //将缓冲区中的数据通过channel发送出去
{
SocketChannel socketChannel=(SocketChannel)key.channel();//得到相关的SocketChannel
synchronized(sendBuffer) //同步代码段
{
sendBuffer.flip(); //把极限设为位置,把位置设为0
socketChannel.write(sendBuffer); //发送数据
sendBuffer.compact(); //删除已经发送的数据
}
}
public void receive(SelectionKey key) throws IOException
{
SocketChannel socketChannel=(SocketChannel)key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData=decode(receiveBuffer); //从接收缓冲区中解码字节序列,转换为字符串
if(receiveData.indexOf("\n")==-1) //不能凑成一行数据
return;
String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1);//一行数据
System.out.println(outputData);
if(outputData.equals("echo:bye\r\n"));
{
key.cancel();
socketChannel.close();
System.out.println("关闭与服务器的连接");
selector.close();
System.exit(0); //结束程序
}
ByteBuffer temp=encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact(); //删除已经打印的数据
}
public String decode(ByteBuffer buffer) //解码
{
CharBuffer charBuffer=charset.decode(buffer);
return charBuffer.toString();
}
public ByteBuffer encode(String str) //编码
{
return charset.encode(str);
}
public static void main(String args[]) throws IOException
{
final NonBlockingClient nbc=new NonBlockingClient();
Thread t=new Thread(new Runnable()
{
public void run() //匿名内部类
{
nbc.receiveData();
}
});
t.start();
nbc.communicate();
}
}
服务器端代码:
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.io.*;public class NonBlockingServer
{
private Selector selector=null;
private ServerSocketChannel serverSocketChannel=null;
private int port=10000;
private Charset charset=Charset.forName("GBK");
public NonBlockingServer() throws IOException
{
selector=Selector.open(); //创建一个selector对象
serverSocketChannel=ServerSocketChannel.open(); //创建一个ServerSocketChannel对象
serverSocketChannel.socket().setReuseAddress(true); //设置服务器端口可以重用
serverSocketChannel.configureBlocking(false); //设置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务器启动");
}
public void service() throws IOException
{
//向selector注册监听客户连接请求就绪事件
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while(selector.select()>0)
{
Set readyKeys=selector.selectedKeys(); //得到selector中的事件集合
Iterator it=readyKeys.iterator(); //得到事件集合的迭代器
while(it.hasNext())
{
SelectionKey key=null;
try
{
key=(SelectionKey)it.next();
it.remove(); //取出一个事件就从集合中将其删除
if(key.isAcceptable()) //接收客户端的连接请求
{
//得到与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
//得到客户端SocketChannel
SocketChannel socketChannel=(SocketChannel)ssc.accept();
System.out.println("接收到客户端连接,来自:"+socketChannel.socket().getInetAddress()
+":"+socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer=ByteBuffer.allocate(1024); //向selector注册与客户端socketChannel的读、写就绪事件,buffer为关联的对象
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);
//socketChannel.register(selector,SelectionKey.OP_WRITE,buffer);
}
//读就绪事件发生
if(key.isReadable())
{
receive(key);
}
//写就绪事件发生
if(key.isWritable())
{
send(key);
}
}
catch(Exception e)
{
e.printStackTrace();
try
{
if(key!=null)
{
key.cancel();
key.channel().close();
}
}
catch(Exception e1)
{
e1.printStackTrace();
}
}
} //end while
} // end while
}
public void send(SelectionKey key) throws IOException
{
ByteBuffer buffer=(ByteBuffer)key.attachment(); //得到与该SelectionKey关联的ByteBuffer
SocketChannel socketChannel=(SocketChannel)key.channel(); //得到与该SelectionKey关联的SocketChannel
buffer.flip(); //把极限设为位置,把位置设为0;
String data=decode(buffer); //按照GBK编码进行解码,将字节缓冲区中的数据转换为字符串
if(data.indexOf("\r\n")==-1) //如果还没有读到一行数据,就返回
return;
String outputData=data.substring(0,data.indexOf("\n")+1); //截取一行数据,把\n包含进去
System.out.print(outputData); //在服务器端的控制台将读到的数据输出
//String strToClient=new StringBuffer(outputData).reverse().toString();
//System.out.println(strToClient);
ByteBuffer outputBuffer=encode("服务器端来的数据: "+outputData);
while(outputBuffer.hasRemaining()) //当前位置与极限之间是否还有数据
socketChannel.write(outputBuffer);
ByteBuffer temp=encode(outputData); //把outputData转换为字节,放入ByteBuffer中
buffer.position(temp.limit()); //把buffer的位置设置为temp的极限
buffer.compact(); //删除buffer中已经处理过的数据
if(outputData.equals("byte\r\n"))
{
key.cancel();
socketChannel.close();
System.out.println("关闭与客户的连接");
}
}
public void receive(SelectionKey key) throws IOException
{
ByteBuffer buffer=(ByteBuffer)key.attachment(); //获得与SelectionKey关联的缓冲区
SocketChannel socketChannel=(SocketChannel)key.channel(); //获得与SelectionKey关联的SocketChannel
//创建一个ByteBuffer,用于存放读到的数据
ByteBuffer readBuffer=ByteBuffer.allocate(32);
socketChannel.read(readBuffer); //读入数据放入到readBuffer中
readBuffer.flip(); //把极限设为当前位置,再把当前位置设为0
buffer.limit(buffer.capacity()); //把buffer的极限设为为容量 buffer.put(readBuffer); //将readBuffer中的数据复制到buffer中,假定buffer的容量足够大,不会出现溢出的情况
}
private String decode(ByteBuffer buffer) //解码
{
CharBuffer charBuffer=charset.decode(buffer); //将字节转换为字符
return charBuffer.toString(); //将字符缓冲区转换为字符串
}
private ByteBuffer encode(String str) //编码
{
return charset.encode(str); //将字符串转换为字节序列
}
public static void main(String args[]) throws Exception
{
NonBlockingServer server=new NonBlockingServer();
server.service();
}
}
问题1:
先运行服务端 再运行客户端 客户端从键盘读一些数据 按回车 此时数据传递给服务器端 客户端打印出数据 就结束了 为什么只发送一次 客户端就结束了呢??
问题2:
客户端输入”exit“,为什么客户端进程没有结束,而是阻塞了呢?
问题3:
服务器端在什么情况下 执行此局代码: System.out.println("关闭与客户的连接");代码有点长,谢谢大家了
高手给看看客户端代码:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.nio.charset.*;public class NonBlockingClient { private SocketChannel socketChannel=null;
private ByteBuffer sendBuffer=ByteBuffer.allocate(1024); // 发送缓冲区用于向服务器发送数据
private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024); //接收缓冲区用于从服务器端接收数据 private Charset charset=Charset.forName("GBK");
private Selector selector;
public NonBlockingClient() throws IOException
{
socketChannel=SocketChannel.open(); //创建SocketChannel对象
InetAddress ia=InetAddress.getLocalHost();
InetSocketAddress isa=new InetSocketAddress(ia,10000);
socketChannel.connect(isa); //采用阻塞模式连接服务器
socketChannel.configureBlocking(false); //采用非阻塞模式接收和发送数据
System.out.println("与服务器连接成功");
selector=Selector.open(); //创建监听器
}
public void receiveData() //从控制台接收数据到缓冲区
{
try{
BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=br.readLine())!=null)
{
if(msg.equals("exit"))
break;
synchronized(sendBuffer) //同步代码块,sendBuffer为共享资源
{
sendBuffer.put(encode(msg+"\r\n"));
}
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
public void communicate() throws IOException
{
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
while(selector.select()>0) //采用阻塞方式,返回相关事件已经发生的SelectionKey对象的数目
{
Set readyKeys=selector.selectedKeys(); //返回相关事件已经发生的SelectionKey对象的集合
Iterator it=readyKeys.iterator();
while(it.hasNext())
{
SelectionKey key=null;
try
{
key=(SelectionKey)it.next();
it.remove();
if(key.isReadable())
{
receive(key);
}
if(key.isWritable())
{
send(key);
}
}
catch(IOException e)
{
e.printStackTrace();
try
{
if(key!=null)
{
key.cancel();
key.channel().close();
}
}
catch(Exception e1)
{
e1.printStackTrace();
}
}
}
}
}
public void send(SelectionKey key) throws IOException //将缓冲区中的数据通过channel发送出去
{
SocketChannel socketChannel=(SocketChannel)key.channel();//得到相关的SocketChannel
synchronized(sendBuffer) //同步代码段
{
sendBuffer.flip(); //把极限设为位置,把位置设为0
socketChannel.write(sendBuffer); //发送数据
sendBuffer.compact(); //删除已经发送的数据
}
}
public void receive(SelectionKey key) throws IOException
{
SocketChannel socketChannel=(SocketChannel)key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData=decode(receiveBuffer); //从接收缓冲区中解码字节序列,转换为字符串
if(receiveData.indexOf("\n")==-1) //不能凑成一行数据
return;
String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1);//一行数据
System.out.println(outputData);
if(outputData.equals("echo:bye\r\n"));
{
key.cancel();
socketChannel.close();
System.out.println("关闭与服务器的连接");
selector.close();
System.exit(0); //结束程序
}
ByteBuffer temp=encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact(); //删除已经打印的数据
}
public String decode(ByteBuffer buffer) //解码
{
CharBuffer charBuffer=charset.decode(buffer);
return charBuffer.toString();
}
public ByteBuffer encode(String str) //编码
{
return charset.encode(str);
}
public static void main(String args[]) throws IOException
{
final NonBlockingClient nbc=new NonBlockingClient();
Thread t=new Thread(new Runnable()
{
public void run() //匿名内部类
{
nbc.receiveData();
}
});
t.start();
nbc.communicate();
}
}
服务器端代码:
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.io.*;public class NonBlockingServer
{
private Selector selector=null;
private ServerSocketChannel serverSocketChannel=null;
private int port=10000;
private Charset charset=Charset.forName("GBK");
public NonBlockingServer() throws IOException
{
selector=Selector.open(); //创建一个selector对象
serverSocketChannel=ServerSocketChannel.open(); //创建一个ServerSocketChannel对象
serverSocketChannel.socket().setReuseAddress(true); //设置服务器端口可以重用
serverSocketChannel.configureBlocking(false); //设置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务器启动");
}
public void service() throws IOException
{
//向selector注册监听客户连接请求就绪事件
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while(selector.select()>0)
{
Set readyKeys=selector.selectedKeys(); //得到selector中的事件集合
Iterator it=readyKeys.iterator(); //得到事件集合的迭代器
while(it.hasNext())
{
SelectionKey key=null;
try
{
key=(SelectionKey)it.next();
it.remove(); //取出一个事件就从集合中将其删除
if(key.isAcceptable()) //接收客户端的连接请求
{
//得到与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
//得到客户端SocketChannel
SocketChannel socketChannel=(SocketChannel)ssc.accept();
System.out.println("接收到客户端连接,来自:"+socketChannel.socket().getInetAddress()
+":"+socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer=ByteBuffer.allocate(1024); //向selector注册与客户端socketChannel的读、写就绪事件,buffer为关联的对象
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);
//socketChannel.register(selector,SelectionKey.OP_WRITE,buffer);
}
//读就绪事件发生
if(key.isReadable())
{
receive(key);
}
//写就绪事件发生
if(key.isWritable())
{
send(key);
}
}
catch(Exception e)
{
e.printStackTrace();
try
{
if(key!=null)
{
key.cancel();
key.channel().close();
}
}
catch(Exception e1)
{
e1.printStackTrace();
}
}
} //end while
} // end while
}
public void send(SelectionKey key) throws IOException
{
ByteBuffer buffer=(ByteBuffer)key.attachment(); //得到与该SelectionKey关联的ByteBuffer
SocketChannel socketChannel=(SocketChannel)key.channel(); //得到与该SelectionKey关联的SocketChannel
buffer.flip(); //把极限设为位置,把位置设为0;
String data=decode(buffer); //按照GBK编码进行解码,将字节缓冲区中的数据转换为字符串
if(data.indexOf("\r\n")==-1) //如果还没有读到一行数据,就返回
return;
String outputData=data.substring(0,data.indexOf("\n")+1); //截取一行数据,把\n包含进去
System.out.print(outputData); //在服务器端的控制台将读到的数据输出
//String strToClient=new StringBuffer(outputData).reverse().toString();
//System.out.println(strToClient);
ByteBuffer outputBuffer=encode("服务器端来的数据: "+outputData);
while(outputBuffer.hasRemaining()) //当前位置与极限之间是否还有数据
socketChannel.write(outputBuffer);
ByteBuffer temp=encode(outputData); //把outputData转换为字节,放入ByteBuffer中
buffer.position(temp.limit()); //把buffer的位置设置为temp的极限
buffer.compact(); //删除buffer中已经处理过的数据
if(outputData.equals("byte\r\n"))
{
key.cancel();
socketChannel.close();
System.out.println("关闭与客户的连接");
}
}
public void receive(SelectionKey key) throws IOException
{
ByteBuffer buffer=(ByteBuffer)key.attachment(); //获得与SelectionKey关联的缓冲区
SocketChannel socketChannel=(SocketChannel)key.channel(); //获得与SelectionKey关联的SocketChannel
//创建一个ByteBuffer,用于存放读到的数据
ByteBuffer readBuffer=ByteBuffer.allocate(32);
socketChannel.read(readBuffer); //读入数据放入到readBuffer中
readBuffer.flip(); //把极限设为当前位置,再把当前位置设为0
buffer.limit(buffer.capacity()); //把buffer的极限设为为容量 buffer.put(readBuffer); //将readBuffer中的数据复制到buffer中,假定buffer的容量足够大,不会出现溢出的情况
}
private String decode(ByteBuffer buffer) //解码
{
CharBuffer charBuffer=charset.decode(buffer); //将字节转换为字符
return charBuffer.toString(); //将字符缓冲区转换为字符串
}
private ByteBuffer encode(String str) //编码
{
return charset.encode(str); //将字符串转换为字节序列
}
public static void main(String args[]) throws Exception
{
NonBlockingServer server=new NonBlockingServer();
server.service();
}
}
问题1:
先运行服务端 再运行客户端 客户端从键盘读一些数据 按回车 此时数据传递给服务器端 客户端打印出数据 就结束了 为什么只发送一次 客户端就结束了呢??
问题2:
客户端输入”exit“,为什么客户端进程没有结束,而是阻塞了呢?
问题3:
服务器端在什么情况下 执行此局代码: System.out.println("关闭与客户的连接");代码有点长,谢谢大家了
你输入exit之后,只是把main线程结束了,
nbc.receiveData();
这个线程没结束吧?从你的代码上下文来看,
应该是你输入exit
客户端会发送一个"exit"这样的字符串到服务器,
然后break,退出掉监听控制台输入的线程
然后服务器回发一个"echo:bye\r\n"
这样客户端收到了会channel.close()
退出监听网络连接的线程因为我看到你客户端代码有"echo:bye\r\n"这个处理
服务器并没有地方发送
public void receiveData() // 从控制台接收数据到缓冲区
{
try { BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
String msg = null;
boolean flag = false; //修改点 while ((msg = br.readLine()) != null) {
if (msg.equals("exit")){
msg = "byte"; //修改点 flag = true; //修改点 }
synchronized (sendBuffer) // 同步代码块,sendBuffer为共享资源
{
sendBuffer.put(encode(msg + "\r\n"));
}
if(flag) break; //修改点
}
} catch (IOException e) {
e.printStackTrace();
}
}public void receive(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData = decode(receiveBuffer); // 从接收缓冲区中解码字节序列,转换为字符串 if (receiveData.indexOf("\n") == -1) // 不能凑成一行数据
return; String outputData = receiveData.substring(0,
receiveData.indexOf("\n") + 1);// 一行数据
System.out.println(outputData);
if (outputData.equals("服务器端来的数据: byte\r\n")) //修改点 {
key.cancel();
socketChannel.close();
System.out.println("关闭与服务器的连接");
selector.close();
System.exit(0); // 结束程序
} ByteBuffer temp = encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact(); // 删除已经打印的数据
}
简单修改了下 NonBlockingClient