第一次使用nio去做事,恳请大家帮忙看一下代码哪里出问题了,还有请大家帮看看这样的非阻塞和线程设计的是否合理,我主要是用在抓取海量网页上的爬虫。现在只是测试,所以用了固定的url
package wadihu.crawl;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
public class CrawlOrder
{
private boolean shutdown = false; // 用于控制Connector线程
private Selector selector; // 注册选择器
private Queue<Target> targetLists = new LinkedList<Target>(); // 任务队列
private Queue<Target> waitLists = new LinkedList<Target>(); // 等待抓取队列
// private Queue<Target> endLists = new LinkedList<Target>(); // 完成抓取队列 public CrawlOrder() throws IOException
{
selector = Selector.open(); // 打开选择器
Connector connector = new Connector();
connector.start();
System.out.println("正在启动连接线程...");
Reador reador  = new Reador();
reador.start();
System.out.println("正在启动读写IO线程...");
receiveTarget();
} /**用户输入URL请求 */
public void receiveTarget() throws IOException {
BufferedReader buf = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
while((msg = buf.readLine()) != null)
{
if(!msg.equals("bye")) {
Target target = new Target(msg);
addTarget(target);
}
else
{
shutdown = true;
selector.wakeup();
break;
}
}
} /** 向等待连接队列添加任务 
 * @throws IOException */
public void addTarget(Target target) throws IOException {
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(target.address);
target.socketChannel = socketChannel;
synchronized (targetLists) {
targetLists.add(target);
}
selector.wakeup();
}  /** 注册连接事件 */
public void registerTargets()
{
synchronized(targetLists)
{
while(targetLists.size() > 0)
{
Target target = targetLists.poll();
try {
target.socketChannel.register(selector, SelectionKey.OP_CONNECT, target);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
}  /** 注册网络IO读写事件 
 * @throws InterruptedException 
 * @throws IOException */
public void registerWrites() throws InterruptedException, IOException
{
synchronized(waitLists)
{
while(waitLists.size() == 0)
{
waitLists.wait();
}
Target target = waitLists.poll();
target.socketChannel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ, target);
}
} /** 连接就绪事件发生
 * @throws IOException */
public void processSelectdKeys() throws IOException
{
for (Iterator it =  selector.selectedKeys().iterator(); it.hasNext();)
{
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
Target target = (Target) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if(socketChannel.finishConnect())
{
selectionKey.cancel();
socketChannel.close();
addFinishedTarget(target);
}
}
} /** 读写就绪事件发生
 * @throws IOException */
public void process() throws IOException
{
for (Iterator it =  selector.selectedKeys().iterator(); it.hasNext();)
{
SelectionKey selectionKey = (SelectionKey) it.next();
it.remove();
if(selectionKey.isValid())
{
if(selectionKey.isWritable()) {
write(selectionKey);
}
if(selectionKey.isReadable()) {
read(selectionKey);
}
}
}
}
public void write(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
String head = "GET / HTTP/1.1\r\nHost: " + "www.baidu.com" + "\r\n" + "User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;)\r\n\r\n";
buffer.put(head.getBytes());
buffer.flip();
int w = socketChannel.write(buffer);
if (w <= 0)
{
selectionKey.cancel();
socketChannel.close();
}
} public void read(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int r = socketChannel.read(buffer);
buffer.flip();
byte[] b = new byte[buffer.limit()];
buffer.get(b);
System.out.println(new String(b));
//            if (r == 0)
//            {
//             return;
//            }
            if (r == -1 || r == 0)
            {
             selectionKey.cancel();
             socketChannel.close();
                return;
            }
} /** 向等待抓取队列加入一个连接就绪的任务,表示已经建立好连接,可进行读写操作 */
public void addFinishedTarget(Target target) {
synchronized (waitLists)
{
waitLists.notify();
waitLists.add(target);
}
} /** 建立连接内部类 */
private class Connector extends Thread
{
public void run()
{
while(!shutdown)
{
try {
registerTargets();
if(selector.select() > 0) {
processSelectdKeys();
}
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} /** 读写线程内部类 */
private class Reador extends Thread
{
public void run()
{
while(!shutdown)
{
try
                                              {
registerWrites();
if(selector.select() > 0)
                                                       {
process();
}

                                              catch (ClosedChannelException e)
                                              {
e.printStackTrace();
}
                                              catch (IOException e) {
e.printStackTrace();
}
                                              catch (InterruptedException e)
                                              {
e.printStackTrace();
}
}
try
                                     {
selector.close();
}
                                     catch (IOException e)
                                     {
e.printStackTrace();
}
}
} public static void main(String[] args) throws IOException
{
new CrawlOrder();
}
} /** 一项抓取任务,外部类  */
class Target
{
InetSocketAddress address;
SocketChannel socketChannel;
public Target(String host) throws UnknownHostException
{
address = new InetSocketAddress(InetAddress.getByName(host), 80);
}
}
打印错误信息如下:
正在启动连接线程...
正在启动读写IO线程...
www.baidu.com
java.nio.channels.ClosedChannelException
at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:167)
at wadihu.crawl.CrawlOrder.registerWrites(CrawlOrder.java:101)
at wadihu.crawl.CrawlOrder$Reador.run(CrawlOrder.java:222)

解决方案 »

  1.   

    BufferedReader???请仔细说下吧,这里错了吗
      

  2.   

    1. 你获取socketChannel的方式我测试了一下 connect是false
    我建议你使用
    public Target(String host) throws IOException {
    address = new InetSocketAddress(InetAddress.getByName(host), 80);
            Socket sock = new Socket();
    sock.connect(address, 10000);
    this.socketChannel = sock.getChannel();
    }
    2. 因为你是客户端, 没必要注册 connect的事件, 你生成socket之后只要注册read, 等待read. 
    3. 线程使用有点混乱简单点Reader进程负责获取地址, 跟selector绑定, 
    Resume进程负责即selector.select() ;当然对于每个read消息你可以再开进程分开处理。 
    4. 对于多线程采访同一变量, 必要的时候使用volatile避免采访寄存器
      

  3.   

    lz那写的太复杂了,我竟然没看懂。下面简单的作了个相同功能的
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
    public class CrawlOrder {
    private boolean shutdown = false; // 用于控制Connector线程
    private Selector selector; // 注册选择器 public CrawlOrder() throws IOException {
    selector = Selector.open(); // 打开选择器
    System.out.println("正在启动连接线程...");
    new Thread(new Crawl()).start();
    System.out.println("正在启动读写IO线程...");
    receiveTarget();
    } /** 用户输入URL请求 */
    public void receiveTarget() throws IOException {
    BufferedReader buf = new BufferedReader(
    new InputStreamReader(System.in));
    String msg = null;
    while ((msg = buf.readLine()) != null) {
    try {
    // msg = "www.baidu.com";
    if (!msg.equals("bye")) {
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    socketChannel
    .connect(new InetSocketAddress(msg.trim(), 80));
    socketChannel.register(selector, SelectionKey.OP_CONNECT);
    selector.wakeup();
    } else {
    shutdown = true;
    selector.wakeup();
    break;
    }
    } catch (Exception e) {
    e.printStackTrace();
    } }
    } class Crawl implements Runnable { @Override
    public void run() {
    SelectionKey key = null;
    while (!shutdown) {
    try {
    selector.select(500);
    Iterator<SelectionKey> keyIterator = selector
    .selectedKeys().iterator();
    while (keyIterator.hasNext()) {
    key = keyIterator.next();
    keyIterator.remove();
    if (key.isValid()) {
    handler(key);
    }
    }
    } catch (Exception e) {
    if (key != null)
    key.cancel();
    e.printStackTrace(); }
    } } private void handler(SelectionKey key) throws Exception {
    SocketChannel channel = (SocketChannel) key.channel(); if (key.isReadable()) {
    // System.out.println("read");
    ByteBuffer buffer = ByteBuffer.allocate(100 * 1024);
    int ret = channel.read(buffer);
    if (ret < 0)
    channel.close();
    buffer.flip();
    Charset ch = Charset.forName("gb2312");
    System.out.println(ch.decode(buffer)); } else if (key.isWritable()) {
    // System.out.println("write");
    String head = "GET / HTTP/1.1\r\nHost: "
    + "www.baidu.com"
    + "\r\n"
    + "User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;)\r\n\r\n";
    ByteBuffer buffer = ByteBuffer.wrap(head.getBytes());
    channel.write(buffer);
    channel.register(selector, SelectionKey.OP_READ);
    } else if (key.isConnectable()) {
    if (channel.isConnectionPending())
    channel.finishConnect();
    System.out.println("conn");
    channel.register(selector, SelectionKey.OP_WRITE);
    // key.cancel();
    } } } public static void main(String[] args) throws IOException {
    new CrawlOrder();
    }
    }
      

  4.   

    楼上的写的不错,我的其实很混乱,第一次写用nio,我的做法其实是想把各种事件触发都放到队列,然后事件触发和事件处理分离并线程化、池化,但是没实现出来,我会参考下你的,我还得改,有好的建议希望多指导
      

  5.   

    等我晚上再改改,结合cangyingzhijia的代码,等会放上来看看
      

  6.   

    package wadihu.crawl;import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.UnknownHostException;
    import java.nio.ByteBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Queue;/** 爬行类,专门负责网页的下载, 以非阻塞方式连接 */
    public class CrawlOrder1 {
    private boolean shutdown = false; // 用于控制Connector线程
    private Selector selector; // 注册选择器
    private Queue<Target> targetLists = new LinkedList<Target>(); // 任务队列
    private Queue<Target> waitLists = new LinkedList<Target>(); // 等待抓取队列,已经建立好连接的任务 public CrawlOrder1() throws IOException {
    selector = Selector.open(); // 打开选择器
    Connector connector = new Connector();
    connector.start();
    System.out.println("爬虫已启动...");
    receiveTarget();  // 用户提交URL任务输入
    } /**用户输入URL请求 */
    public void receiveTarget() throws IOException {
    BufferedReader buf = new BufferedReader(new InputStreamReader(System.in));
    String msg = null;
    while((msg = buf.readLine()) != null) {
    if(!msg.equals("bye")) {
    Target target = new Target(msg);
    addTarget(target);
    }
    else {
    shutdown = true;
    selector.wakeup();
    System.out.println("系统已经停止");
    break;
    }
    }
    } /** 向等待连接队列添加任务 
     * @throws IOException */
    public void addTarget(Target target) throws IOException {
    SocketChannel socketChannel = null;
    socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    socketChannel.connect(target.address);
    target.socketChannel = socketChannel;
    synchronized (targetLists) {
    targetLists.add(target);
    }
    selector.wakeup();
    }  /** 注册连接事件 */
    public void registerTargets() {
    synchronized(targetLists) {
    while(targetLists.size() > 0) {
    Target target = targetLists.poll();
    try {
    target.socketChannel.register(selector, SelectionKey.OP_CONNECT, target);
    } catch (ClosedChannelException e) {
    e.printStackTrace();
    }
    }
    }
    } /** 连接就绪事件发生,处理就绪的事件
     * @throws IOException */
    public void processSelectdKeys() throws IOException {
    for (Iterator<?> it =  selector.selectedKeys().iterator(); it.hasNext();) {
    SelectionKey selectionKey = (SelectionKey) it.next();
    it.remove();
    Target target = (Target) selectionKey.attachment();
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    if(socketChannel.finishConnect()) {
    System.out.println("连接成功");
    selectionKey.cancel();
    socketChannel.close();
    addFinishedTarget(target);
    }
    }
    } /** 向等待抓取队列加入一个连接就绪的任务,表示已经建立好连接,可进行读写操作 */
    public void addFinishedTarget(Target target) {
    synchronized (waitLists) {
    waitLists.notify();
    waitLists.add(target);
    }
    } /** 建立连接内部类 */
    private class Connector extends Thread {
    public void run() {
    while(!shutdown) {
    try {
    registerTargets();
    if(selector.select() > 0) {
    processSelectdKeys();
    }
    } catch (ClosedChannelException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    try {
    selector.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    } public static void main(String[] args) throws IOException {
    new CrawlOrder1();
    }
    } /** 一项抓取任务,外部类  */
    class Target
    {
    InetSocketAddress address;
    SocketChannel socketChannel;
    public Target(String host) throws UnknownHostException
    {
    address = new InetSocketAddress(InetAddress.getByName(host), 80);
    }
    }
    我还是倾向这种做法,事件都存放队列中,然后线程去对列中取并执行,这样我想以后会更高效些吧,我现在只做好了连接的处理,读写一直没想到怎么加,惭愧
      

  7.   

    这样我觉得事件更细化,加上线程去处理,我想更高效,分成多个对列,原始任务队列(没有事件的socket对列),线程发现该队列有数据就取出执行(建立连接),把建立好连接的任务放到待抓取队列(这里都是已经建立好连接的任务),然后另外的线程发现该待抓取队列有数据就取出执行(执行读写操作),把数据读写完毕后放到已完成队列,这样另外的线程可以发现已完成对列有数据就取出做本地存储操作,大家谈谈想法吧,最好给出点代码 嘿嘿