第一次使用nio去做事,恳请大家帮忙看一下代码哪里出问题了,还有请大家帮看看这样的非阻塞和线程设计的是否合理,我主要是用在抓取海量网页上的爬虫。现在只是测试,所以用了固定的url
package wadihu.crawl;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
public class CrawlOrder
{
private boolean shutdown = false; // 用于控制Connector线程
private Selector selector; // 注册选择器
private Queue<Target> targetLists = new LinkedList<Target>(); // 任务队列
private Queue<Target> waitLists = new LinkedList<Target>(); // 等待抓取队列
// private Queue<Target> endLists = new LinkedList<Target>(); // 完成抓取队列 public CrawlOrder() throws IOException
{
selector = Selector.open(); // 打开选择器
Connector connector = new Connector();
connector.start();
System.out.println("正在启动连接线程...");
Reador reador = new Reador();
reador.start();
System.out.println("正在启动读写IO线程...");
receiveTarget();
} /**用户输入URL请求 */
public void receiveTarget() throws IOException {
BufferedReader buf = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
while((msg = buf.readLine()) != null)
{
if(!msg.equals("bye")) {
Target target = new Target(msg);
addTarget(target);
}
else
{
shutdown = true;
selector.wakeup();
break;
}
}
} /** 向等待连接队列添加任务
* @throws IOException */
public void addTarget(Target target) throws IOException {
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(target.address);
target.socketChannel = socketChannel;
synchronized (targetLists) {
targetLists.add(target);
}
selector.wakeup();
} /** 注册连接事件 */
public void registerTargets()
{
synchronized(targetLists)
{
while(targetLists.size() > 0)
{
Target target = targetLists.poll();
try {
target.socketChannel.register(selector, SelectionKey.OP_CONNECT, target);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
} /** 注册网络IO读写事件
* @throws InterruptedException
* @throws IOException */
public void registerWrites() throws InterruptedException, IOException
{
synchronized(waitLists)
{
while(waitLists.size() == 0)
{
waitLists.wait();
}
Target target = waitLists.poll();
target.socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ, target);
}
} /** 连接就绪事件发生
* @throws IOException */
public void processSelectdKeys() throws IOException
{
for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();)
{
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
Target target = (Target) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if(socketChannel.finishConnect())
{
selectionKey.cancel();
socketChannel.close();
addFinishedTarget(target);
}
}
} /** 读写就绪事件发生
* @throws IOException */
public void process() throws IOException
{
for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();)
{
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
if(selectionKey.isValid())
{
if(selectionKey.isWritable()) {
write(selectionKey);
}
if(selectionKey.isReadable()) {
read(selectionKey);
}
}
}
}
public void write(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
String head = "GET / HTTP/1.1\r\nHost: " + "www.baidu.com" + "\r\n" + "User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;)\r\n\r\n";
buffer.put(head.getBytes());
buffer.flip();
int w = socketChannel.write(buffer);
if (w <= 0)
{
selectionKey.cancel();
socketChannel.close();
}
} public void read(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int r = socketChannel.read(buffer);
buffer.flip();
byte[] b = new byte[buffer.limit()];
buffer.get(b);
System.out.println(new String(b));
// if (r == 0)
// {
// return;
// }
if (r == -1 || r == 0)
{
selectionKey.cancel();
socketChannel.close();
return;
}
} /** 向等待抓取队列加入一个连接就绪的任务,表示已经建立好连接,可进行读写操作 */
public void addFinishedTarget(Target target) {
synchronized (waitLists)
{
waitLists.notify();
waitLists.add(target);
}
} /** 建立连接内部类 */
private class Connector extends Thread
{
public void run()
{
while(!shutdown)
{
try {
registerTargets();
if(selector.select() > 0) {
processSelectdKeys();
}
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} /** 读写线程内部类 */
private class Reador extends Thread
{
public void run()
{
while(!shutdown)
{
try
{
registerWrites();
if(selector.select() > 0)
{
process();
}
}
catch (ClosedChannelException e)
{
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
try
{
selector.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
} public static void main(String[] args) throws IOException
{
new CrawlOrder();
}
} /** 一项抓取任务,外部类 */
class Target
{
InetSocketAddress address;
SocketChannel socketChannel;
public Target(String host) throws UnknownHostException
{
address = new InetSocketAddress(InetAddress.getByName(host), 80);
}
}
打印错误信息如下:
正在启动连接线程...
正在启动读写IO线程...
www.baidu.com
java.nio.channels.ClosedChannelException
at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:167)
at wadihu.crawl.CrawlOrder.registerWrites(CrawlOrder.java:101)
at wadihu.crawl.CrawlOrder$Reador.run(CrawlOrder.java:222)
package wadihu.crawl;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
public class CrawlOrder
{
private boolean shutdown = false; // 用于控制Connector线程
private Selector selector; // 注册选择器
private Queue<Target> targetLists = new LinkedList<Target>(); // 任务队列
private Queue<Target> waitLists = new LinkedList<Target>(); // 等待抓取队列
// private Queue<Target> endLists = new LinkedList<Target>(); // 完成抓取队列 public CrawlOrder() throws IOException
{
selector = Selector.open(); // 打开选择器
Connector connector = new Connector();
connector.start();
System.out.println("正在启动连接线程...");
Reador reador = new Reador();
reador.start();
System.out.println("正在启动读写IO线程...");
receiveTarget();
} /**用户输入URL请求 */
public void receiveTarget() throws IOException {
BufferedReader buf = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
while((msg = buf.readLine()) != null)
{
if(!msg.equals("bye")) {
Target target = new Target(msg);
addTarget(target);
}
else
{
shutdown = true;
selector.wakeup();
break;
}
}
} /** 向等待连接队列添加任务
* @throws IOException */
public void addTarget(Target target) throws IOException {
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(target.address);
target.socketChannel = socketChannel;
synchronized (targetLists) {
targetLists.add(target);
}
selector.wakeup();
} /** 注册连接事件 */
public void registerTargets()
{
synchronized(targetLists)
{
while(targetLists.size() > 0)
{
Target target = targetLists.poll();
try {
target.socketChannel.register(selector, SelectionKey.OP_CONNECT, target);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
} /** 注册网络IO读写事件
* @throws InterruptedException
* @throws IOException */
public void registerWrites() throws InterruptedException, IOException
{
synchronized(waitLists)
{
while(waitLists.size() == 0)
{
waitLists.wait();
}
Target target = waitLists.poll();
target.socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ, target);
}
} /** 连接就绪事件发生
* @throws IOException */
public void processSelectdKeys() throws IOException
{
for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();)
{
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
Target target = (Target) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if(socketChannel.finishConnect())
{
selectionKey.cancel();
socketChannel.close();
addFinishedTarget(target);
}
}
} /** 读写就绪事件发生
* @throws IOException */
public void process() throws IOException
{
for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();)
{
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
if(selectionKey.isValid())
{
if(selectionKey.isWritable()) {
write(selectionKey);
}
if(selectionKey.isReadable()) {
read(selectionKey);
}
}
}
}
public void write(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
String head = "GET / HTTP/1.1\r\nHost: " + "www.baidu.com" + "\r\n" + "User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;)\r\n\r\n";
buffer.put(head.getBytes());
buffer.flip();
int w = socketChannel.write(buffer);
if (w <= 0)
{
selectionKey.cancel();
socketChannel.close();
}
} public void read(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int r = socketChannel.read(buffer);
buffer.flip();
byte[] b = new byte[buffer.limit()];
buffer.get(b);
System.out.println(new String(b));
// if (r == 0)
// {
// return;
// }
if (r == -1 || r == 0)
{
selectionKey.cancel();
socketChannel.close();
return;
}
} /** 向等待抓取队列加入一个连接就绪的任务,表示已经建立好连接,可进行读写操作 */
public void addFinishedTarget(Target target) {
synchronized (waitLists)
{
waitLists.notify();
waitLists.add(target);
}
} /** 建立连接内部类 */
private class Connector extends Thread
{
public void run()
{
while(!shutdown)
{
try {
registerTargets();
if(selector.select() > 0) {
processSelectdKeys();
}
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} /** 读写线程内部类 */
private class Reador extends Thread
{
public void run()
{
while(!shutdown)
{
try
{
registerWrites();
if(selector.select() > 0)
{
process();
}
}
catch (ClosedChannelException e)
{
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
try
{
selector.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
} public static void main(String[] args) throws IOException
{
new CrawlOrder();
}
} /** 一项抓取任务,外部类 */
class Target
{
InetSocketAddress address;
SocketChannel socketChannel;
public Target(String host) throws UnknownHostException
{
address = new InetSocketAddress(InetAddress.getByName(host), 80);
}
}
打印错误信息如下:
正在启动连接线程...
正在启动读写IO线程...
www.baidu.com
java.nio.channels.ClosedChannelException
at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:167)
at wadihu.crawl.CrawlOrder.registerWrites(CrawlOrder.java:101)
at wadihu.crawl.CrawlOrder$Reador.run(CrawlOrder.java:222)
我建议你使用
public Target(String host) throws IOException {
address = new InetSocketAddress(InetAddress.getByName(host), 80);
Socket sock = new Socket();
sock.connect(address, 10000);
this.socketChannel = sock.getChannel();
}
2. 因为你是客户端, 没必要注册 connect的事件, 你生成socket之后只要注册read, 等待read.
3. 线程使用有点混乱简单点Reader进程负责获取地址, 跟selector绑定,
Resume进程负责即selector.select() ;当然对于每个read消息你可以再开进程分开处理。
4. 对于多线程采访同一变量, 必要的时候使用volatile避免采访寄存器
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
public class CrawlOrder {
private boolean shutdown = false; // 用于控制Connector线程
private Selector selector; // 注册选择器 public CrawlOrder() throws IOException {
selector = Selector.open(); // 打开选择器
System.out.println("正在启动连接线程...");
new Thread(new Crawl()).start();
System.out.println("正在启动读写IO线程...");
receiveTarget();
} /** 用户输入URL请求 */
public void receiveTarget() throws IOException {
BufferedReader buf = new BufferedReader(
new InputStreamReader(System.in));
String msg = null;
while ((msg = buf.readLine()) != null) {
try {
// msg = "www.baidu.com";
if (!msg.equals("bye")) {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel
.connect(new InetSocketAddress(msg.trim(), 80));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
selector.wakeup();
} else {
shutdown = true;
selector.wakeup();
break;
}
} catch (Exception e) {
e.printStackTrace();
} }
} class Crawl implements Runnable { @Override
public void run() {
SelectionKey key = null;
while (!shutdown) {
try {
selector.select(500);
Iterator<SelectionKey> keyIterator = selector
.selectedKeys().iterator();
while (keyIterator.hasNext()) {
key = keyIterator.next();
keyIterator.remove();
if (key.isValid()) {
handler(key);
}
}
} catch (Exception e) {
if (key != null)
key.cancel();
e.printStackTrace(); }
} } private void handler(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel(); if (key.isReadable()) {
// System.out.println("read");
ByteBuffer buffer = ByteBuffer.allocate(100 * 1024);
int ret = channel.read(buffer);
if (ret < 0)
channel.close();
buffer.flip();
Charset ch = Charset.forName("gb2312");
System.out.println(ch.decode(buffer)); } else if (key.isWritable()) {
// System.out.println("write");
String head = "GET / HTTP/1.1\r\nHost: "
+ "www.baidu.com"
+ "\r\n"
+ "User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;)\r\n\r\n";
ByteBuffer buffer = ByteBuffer.wrap(head.getBytes());
channel.write(buffer);
channel.register(selector, SelectionKey.OP_READ);
} else if (key.isConnectable()) {
if (channel.isConnectionPending())
channel.finishConnect();
System.out.println("conn");
channel.register(selector, SelectionKey.OP_WRITE);
// key.cancel();
} } } public static void main(String[] args) throws IOException {
new CrawlOrder();
}
}
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
public class CrawlOrder1 {
private boolean shutdown = false; // 用于控制Connector线程
private Selector selector; // 注册选择器
private Queue<Target> targetLists = new LinkedList<Target>(); // 任务队列
private Queue<Target> waitLists = new LinkedList<Target>(); // 等待抓取队列,已经建立好连接的任务 public CrawlOrder1() throws IOException {
selector = Selector.open(); // 打开选择器
Connector connector = new Connector();
connector.start();
System.out.println("爬虫已启动...");
receiveTarget(); // 用户提交URL任务输入
} /**用户输入URL请求 */
public void receiveTarget() throws IOException {
BufferedReader buf = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
while((msg = buf.readLine()) != null) {
if(!msg.equals("bye")) {
Target target = new Target(msg);
addTarget(target);
}
else {
shutdown = true;
selector.wakeup();
System.out.println("系统已经停止");
break;
}
}
} /** 向等待连接队列添加任务
* @throws IOException */
public void addTarget(Target target) throws IOException {
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(target.address);
target.socketChannel = socketChannel;
synchronized (targetLists) {
targetLists.add(target);
}
selector.wakeup();
} /** 注册连接事件 */
public void registerTargets() {
synchronized(targetLists) {
while(targetLists.size() > 0) {
Target target = targetLists.poll();
try {
target.socketChannel.register(selector, SelectionKey.OP_CONNECT, target);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
} /** 连接就绪事件发生,处理就绪的事件
* @throws IOException */
public void processSelectdKeys() throws IOException {
for (Iterator<?> it = selector.selectedKeys().iterator(); it.hasNext();) {
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
Target target = (Target) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if(socketChannel.finishConnect()) {
System.out.println("连接成功");
selectionKey.cancel();
socketChannel.close();
addFinishedTarget(target);
}
}
} /** 向等待抓取队列加入一个连接就绪的任务,表示已经建立好连接,可进行读写操作 */
public void addFinishedTarget(Target target) {
synchronized (waitLists) {
waitLists.notify();
waitLists.add(target);
}
} /** 建立连接内部类 */
private class Connector extends Thread {
public void run() {
while(!shutdown) {
try {
registerTargets();
if(selector.select() > 0) {
processSelectdKeys();
}
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} public static void main(String[] args) throws IOException {
new CrawlOrder1();
}
} /** 一项抓取任务,外部类 */
class Target
{
InetSocketAddress address;
SocketChannel socketChannel;
public Target(String host) throws UnknownHostException
{
address = new InetSocketAddress(InetAddress.getByName(host), 80);
}
}
我还是倾向这种做法,事件都存放队列中,然后线程去对列中取并执行,这样我想以后会更高效些吧,我现在只做好了连接的处理,读写一直没想到怎么加,惭愧