class Task
{
private SocketAddress clientAddr;
    private String filename;
    private long lastActiveTime;
    private long createTime;
    private ByteBuffer inputData;
    private boolean needFlip;
public Task()
{
inputData = ByteBuffer.allocate(1024);
needFlip = true;
totalwrite=0;
}
public Task(SocketChannel sc)
{
inputData = ByteBuffer.allocate(1024);
needFlip = true;
if(sc!=null)
{
Socket s = sc.socket();
lastActiveTime = System.currentTimeMillis();
createTime = lastActiveTime;
clientAddr=s.getRemoteSocketAddress();
}
}
private SelectionKey key;
public void activate(SelectionKey sk)
{
key =sk;
}
public void deactivate()
{
key =null;
}

public boolean isMe(SocketChannel sc)
{
if(clientAddr==null)
return false;
Socket s = sc.socket();
return s.getRemoteSocketAddress().equals(clientAddr);
}
private boolean needRead()
{
if(filename!=null&&filename.length()>3)
return false;
int len=inputData.position();
if(len<=0)
return true;
inputData.flip();
String stmp="void";
try
{
Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
CharBuffer charBuf = dec.decode(inputData);
stmp=charBuf.toString();
}
catch(Exception e)
{
e.printStackTrace();
}
inputData.rewind();
inputData.position(len);
if(stmp.endsWith(".zip"))
{
filename = stmp.substring(stmp.length()-20);
debugMsg("needRead():"+filename+";inputData.position:"+len);
return false;
}
return true;
} public void work()
{
if(key==null)
return;
if(needRead())
readMessage(key);
if(!needRead())
writeMessage(key);
deactivate();
}    private void readMessage(SelectionKey key)
    {
     needFlip = true;
int nBytes = 0;
totalwrite =0;
SocketChannel socket = (SocketChannel)key.channel();
try
{
// do
// {
nBytes = socket.read(inputData);
debugMsg("readMessage() read :"+nBytes+" from "+getChannelAddress(socket));
// }
// while(nBytes>0);
        }
catch(IOException e)
{
e.printStackTrace();
}
    }
    public boolean isTimeOut()
    {
     if(System.currentTimeMillis()-lastActiveTime>1000*30)
     return true;
     return false;
    }
    private int writeTime =0;
    public boolean isFinished()
    {
     //call this at every writeMessage() 
     if(writeTime<=0)
     return false;
     if(filelength==totalwrite)
     {
         return true;
     }
     return false;
    }
    public void destroy() 
    {
     key=null;
     writeTime = 0;
     lastActiveTime = 0;
     createTime = 0;
     clientAddr = null;
     inputData.rewind();
     needFlip = true;
     totalwrite = 0;
     try
     {
     fc.close();
    }
    catch(Exception e)
    {
     e.printStackTrace();
    }
     fc = null;
    }
    public void init()
    {
     if(fc!=null)
     return;
     System.out.println("Task.init(), open the file for the first write action");
     needRead();
try
{
File file;
file = new File(filename);    
RandomAccessFile rdm = new RandomAccessFile(file,"r");
fc = rdm.getChannel();
// buffer = ByteBuffer.allocate(1024);
filelength = file.length();
}
catch(Exception e)
{
System.out.println("init() filename="+filename);
e.printStackTrace();
}
totalwrite = 0;
    }
//    private 
//    private RandomAccessFile rdm;
//    private ByteBuffer buffer;
    private FileChannel fc ;
    private long filelength;
    private int totalwrite;
    private void writeMessage(SelectionKey key)
    {
     init();
     if(fc==null)
     return;
     ByteBuffer buffer=ByteBuffer.allocate(1600);
     writeTime++;
     if(needFlip)
     {
     inputData.flip();
     needFlip=false;
     }
     lastActiveTime = System.currentTimeMillis();
     SocketChannel socket = (SocketChannel)key.channel();
try
{
int nBytes=0;
// do
// {
fc.position(totalwrite);
buffer.rewind();
fc.read(buffer);
buffer.flip();    
nBytes = socket.write(buffer);
buffer.rewind();
if(nBytes>0)
{
totalwrite+=nBytes;
}
debugMsg("Task.writeMessage() nBytes = "+nBytes+";address="+getChannelAddress(socket));
// else
// {
// System.out.println("error in Task.writeMessage,nBytes = "+nBytes);
// System.out.println("socket close here due to write error");
// destroy();
// socket.close();
// }
// }
// while(nBytes>0 &&isFinished()==false);
if(isFinished())
{
System.out.println("the while file has been sent by Task.writeMessage");
}

}
catch(Exception e)
{
e.printStackTrace();
}    }   
private static void debugMsg(String s)
{
SelectorTestServer.debugMsg(s);

public static String getChannelAddress(SocketChannel sc)
{
Socket s = sc.socket();
StringBuffer sb = new StringBuffer();
sb.append("peer address:");
sb.append(s.getRemoteSocketAddress().toString());
return sb.toString();
}    
}
//end of SelectorTestServer.java

解决方案 »

  1.   

    //client side test program
    import java.io.*;
    import java.net.*;public class NonBlockingTestClient
    {
    int MAXTHREAD = 2;
    public static void main(String args[])
    {
    NonBlockingTestClient ntc=new NonBlockingTestClient();
    for(int i=0;i<2;i++)
      new Thread(ntc.new Request()).start();
    }
    public static int threadcount=0;
    public synchronized int incCount()
    {
    threadcount++;
    return threadcount;
    }
    class Request implements Runnable
    {
    private int ID;
    public Request()
    {
    ID=incCount();
    }
    public void run()
    {
    try
    {
    InetAddress ia = InetAddress.getLocalHost();
    InetSocketAddress isa = new InetSocketAddress(ia,4900);
    Socket skt = new Socket();
    skt.connect(isa);
    System.out.println("connect to :"+isa);
    OutputStream os=skt.getOutputStream();
    os.write("C:\\tmp\\googleapi.zip".getBytes());
    os.flush();
    InputStream is=skt.getInputStream();
    System.out.println("request of thread "+ID+" send to server");
    byte b[]=new byte[1024];
    int read=0;
    FileOutputStream fos=new FileOutputStream(new File(""+ID+".zip"));
    do
    {
    read=is.read(b,0,1024);
    System.out.println("thread "+ID+" read "+read);
    if(read>0)
    fos.write(b,0,read);
    }
    while(read>0);
    fos.close();
    is.close();
    os.close();
    }
    catch(Exception e)
    {e.printStackTrace();}
    }
    }
    }
      

  2.   

    朋友们,我的服务器端程序11k,客户端程序2k,保存下来可以直接运行的。help help help
      

  3.   

    http://www-106.ibm.com/developerworks/java/library/j-javaio/?dwzone=java
    Merlin brings nonblocking I/O to the Java platform
    IBM的文章我看过了,那个家伙闭着眼睛瞎写,有下面这个大大的bug,他竟然能把那篇吹nonblocking I/O的文章写出来,我晕
    http://developer.java.sun.com/developer/bugParade/bugs/4755720.html
    Bug Id  4755720 
    Votes  4 
    Synopsis  registering OP_WRITE in nio 1.4.1 prevents selection of OP_READ events
      

  4.   

    anybody involves in network programming?
      

  5.   

    post a working one, but not multithreaded.
    import java.nio.*;
    import java.net.*;
    import java.nio.channels.*;
    import java.io.*;
    import java.util.*;
    import java.nio.charset.*;
    import java.awt.*;
    import javax.swing.*;
    import java.awt.event.*;
    class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;
    Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new InetSocketAddress(port));
    System.out.println(this.getClass().getName()+" listen on port "+port);
    serverSocket.configureBlocking(false);
    SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);
    sk.attach(new Acceptor());
    }public static void main(String args[])
    {
    new NonBlockingTestClient().show();
    try
    { new Reactor(4900).run();}
    catch (IOException ex) { ex.printStackTrace(); }
    }
    /*
    Alternatively, use explicit SPI provider:
    SelectorProvider p = SelectorProvider.provider();
    selector = p.openSelector();
    serverSocket = p.openServerSocketChannel();
    http://gee.cs.oswego.edu
    */public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted()) {
    selector.select();
    Set selected = selector.selectedKeys();
    Iterator it = selected.iterator();
    while (it.hasNext())
    dispatch((SelectionKey)(it.next()));
    selected.clear();
    }
    } catch (IOException ex) { ex.printStackTrace(); }
    }
    void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null)
    r.run();
    }
    // class Reactor continued
    class Acceptor implements Runnable { // inner
    public void run() {
    try {
    SocketChannel c = serverSocket.accept();
    if (c != null)
    new Handler(selector, c);
    }
    catch(IOException ex) { ex.printStackTrace();}
    }
    }
    }
    final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    static final int MAXIN=1024;
    static final int MAXOUT=1024;
    ByteBuffer inputData = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    Handler(Selector sel, SocketChannel c)
    throws IOException {
    socket = c;
    // Optionally try first read now
    socket.configureBlocking(false);
    sk = socket.register(sel, SelectionKey.OP_READ);
    sk.attach(this);
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
    }
    private String fileName;
    boolean isNeedRead()
    {
    if(fileName!=null&&fileName.endsWith(".zip"))
    return false;
    int len=inputData.position();
    if(len<=0)
    return true;
    inputData.flip();
    String stmp="void";
    try
    {
    Charset set = Charset.forName("us-ascii");
    CharsetDecoder dec = set.newDecoder();
    CharBuffer charBuf = dec.decode(inputData);
    stmp=charBuf.toString();
    }
    catch(Exception e)
    {
    e.printStackTrace();
    }
    if(stmp.endsWith(".zip"))
    {
    fileName = stmp;
    debugMsg("needRead():"+fileName+";inputData.position:"+len);
    return false;
    }
    inputData.rewind();
    inputData.position(len);
    return true;
    }
    void debugMsg(String s)
    {
    System.out.println(s);
    }
    boolean inputIsComplete() 

    return !isNeedRead();
    }
    boolean outputIsComplete() { if(fileLength==totalSend) return true; return false;}
    void process() 
    {  
    if(fc==null)
    {//open the file
    try
    {
    fileLength  =0;
    totalSend=0;
    File file = new File(fileName);    
    RandomAccessFile rdm = new RandomAccessFile(file,"r");
    fc = rdm.getChannel();
    fileLength = file.length();
    }
    catch(Exception e)
    {
    debugMsg("init() filename="+fileName);
    e.printStackTrace();
    }
    }
    }
    // class Handler continued
    public void run() {
    try {
    if (state == READING) read();
    else if (state == SENDING) send();
    } catch (IOException ex) { /* ... */ }
    }
    void read() throws IOException {
    socket.read(inputData);
    if (inputIsComplete()) {
    process();
    state = SENDING;
    // Normally also do first write now
    sk.interestOps(SelectionKey.OP_WRITE);
    }
    }
    private long fileLength;
    private long totalSend;
    private FileChannel fc ;
    void send() throws IOException { if(fc==null)
    {
    sk.cancel();
    debugMsg("unable to open file, cancel this connection");
    return;
    }
    output.clear();
    fc.position(totalSend);
    fc.read(output);
    output.flip();
    int sendcount=socket.write(output);
    if(sendcount>0)
    {
    totalSend+=sendcount;
    }
    debugMsg("send "+sendcount+" bytes");
    if (outputIsComplete()) sk.cancel();
    }
    }class NonBlockingTestClient extends javax.swing.JFrame
    {
    int MAXTHREAD = 2;
    JTextField jtfFileName=new JTextField("C:\\tmp\\googleapi.zip");
    public NonBlockingTestClient(){
    JButton jb=new JButton("send 2 request");
    Container c=getContentPane();
    c.setLayout(new GridLayout(0,1));
    c.add(jtfFileName);c.add(jb);
    jb.addActionListener(new java.awt.event.ActionListener()
    {public void actionPerformed(ActionEvent e)
    { work(); }});
    setSize(new Dimension(400,400));
    setVisible(true);
    pack();
    }
    public static void main(String args[])
    {
    NonBlockingTestClient ntc=new NonBlockingTestClient();
    }
    private void work()
    {
    for(int i=0;i<2;i++)
      new Thread(new Request()).start();
    }
    public static int threadcount=0;
    public synchronized int incCount()
    {
    threadcount++;
    return threadcount;
    }
    class Request implements Runnable
    {
    private int ID;
    public Request()
    {
    ID=incCount();
    }
    public void run()
    {
    try
    {
    InetAddress ia = InetAddress.getLocalHost();
    InetSocketAddress isa = new InetSocketAddress(ia,4900);
    Socket skt = new Socket();
    skt.connect(isa);
    ;
    System.out.println("connect to :"+isa);
    OutputStream os=skt.getOutputStream();
    //os.write("C:\\tmp\\googleapi.zip".getBytes());
    os.write(jtfFileName.getText().getBytes());
    os.flush();
    InputStream is=skt.getInputStream();
    System.out.println("request of thread "+ID+",port="+skt.getLocalPort()+" send to server");
    byte b[]=new byte[1024];
    int read=0;
    FileOutputStream fos=new FileOutputStream(new File(""+ID+".zip"));
    do
    {
    read=is.read(b,0,1024);
    System.out.println("thread "+ID+" read "+read);
    if(read>0)
    fos.write(b,0,read);
    }
    while(read>0);
    fos.close();
    is.close();
    os.close();
    }
    catch(Exception e)
    {e.printStackTrace();}
    }
    }
    }
      

  6.   

    这个例子实在是好长·········
    IBM的那个好像是有点问题·······
    最近在看JDK14 Tutorial
    里面有这方面的例子,不过当然比这个简单了
    只有大约4K,或许你可以去参考参考?
      

  7.   

    http://www.javaworld.com/javaworld/jw-09-2001/jw-0907-merlin_p.html
      

  8.   

    你的服务器端的selector用法好像有点错, 为什么把serverSocketChannel注册到两个selector中阿?
      

  9.   

    ServerSocketChannel channel = 
    ServerSocketChannel.open();
    channel.configureBlocking(false);
    InetSocketAddress isa = new InetSocketAddress(port);
    channel.socket().bind(isa);//把ServerSocketChannel注册到通道,产生OP_ACCEPT事件
    channel.register(selector, SelectionKey.OP_ACCEPT);while (selector.select() > 0) {
       Set readyKeys = selector.selectedKeys();
       Iterator readyItor = readyKeys.iterator();
       while (readyItor.hasNext()) {
          SelectionKey key = (SelectionKey)readyItor.next();
          readyItor.remove();
          if (key.isAcceptable()) {
             ServerSocketChannel keyChannel = (ServerSocketChannel)key.channel();
             SocketChannel channel = keyChannel.accept();
             channel.configureBlocking(false);
             //在通道注册OP_READ事件
             channel.register(selector, SelectionKey.OP_READ);
          }else if (key.isReadable()) {
           SocketChannel keyChannel = (SocketChannel)key.channel(); 
           ..........................     
          }
    }客户端Socket连接服务器端,就会产生OP_ACCEPT事件的发生,这时会执行这句channel.register(selector, SelectionKey.OP_READ);那么while (selector.select() > 0) 为true,然后又会执行if (key.isReadable())中的代码,可这时客户端Socket并没有write操作,这样就会发生错误。那该如何解决呢?
      

  10.   

    masterz:
       你真的很厉害啊,太佩服了