class Task
{
private SocketAddress clientAddr;
private String filename;
private long lastActiveTime;
private long createTime;
private ByteBuffer inputData;
private boolean needFlip;
public Task()
{
inputData = ByteBuffer.allocate(1024);
needFlip = true;
totalwrite=0;
}
public Task(SocketChannel sc)
{
inputData = ByteBuffer.allocate(1024);
needFlip = true;
if(sc!=null)
{
Socket s = sc.socket();
lastActiveTime = System.currentTimeMillis();
createTime = lastActiveTime;
clientAddr=s.getRemoteSocketAddress();
}
}
private SelectionKey key;
public void activate(SelectionKey sk)
{
key =sk;
}
public void deactivate()
{
key =null;
}
public boolean isMe(SocketChannel sc)
{
if(clientAddr==null)
return false;
Socket s = sc.socket();
return s.getRemoteSocketAddress().equals(clientAddr);
}
private boolean needRead()
{
if(filename!=null&&filename.length()>3)
return false;
int len=inputData.position();
if(len<=0)
return true;
inputData.flip();
String stmp="void";
try
{
Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
CharBuffer charBuf = dec.decode(inputData);
stmp=charBuf.toString();
}
catch(Exception e)
{
e.printStackTrace();
}
inputData.rewind();
inputData.position(len);
if(stmp.endsWith(".zip"))
{
filename = stmp.substring(stmp.length()-20);
debugMsg("needRead():"+filename+";inputData.position:"+len);
return false;
}
return true;
} public void work()
{
if(key==null)
return;
if(needRead())
readMessage(key);
if(!needRead())
writeMessage(key);
deactivate();
} private void readMessage(SelectionKey key)
{
needFlip = true;
int nBytes = 0;
totalwrite =0;
SocketChannel socket = (SocketChannel)key.channel();
try
{
// do
// {
nBytes = socket.read(inputData);
debugMsg("readMessage() read :"+nBytes+" from "+getChannelAddress(socket));
// }
// while(nBytes>0);
}
catch(IOException e)
{
e.printStackTrace();
}
}
public boolean isTimeOut()
{
if(System.currentTimeMillis()-lastActiveTime>1000*30)
return true;
return false;
}
private int writeTime =0;
public boolean isFinished()
{
//call this at every writeMessage()
if(writeTime<=0)
return false;
if(filelength==totalwrite)
{
return true;
}
return false;
}
public void destroy()
{
key=null;
writeTime = 0;
lastActiveTime = 0;
createTime = 0;
clientAddr = null;
inputData.rewind();
needFlip = true;
totalwrite = 0;
try
{
fc.close();
}
catch(Exception e)
{
e.printStackTrace();
}
fc = null;
}
public void init()
{
if(fc!=null)
return;
System.out.println("Task.init(), open the file for the first write action");
needRead();
try
{
File file;
file = new File(filename);
RandomAccessFile rdm = new RandomAccessFile(file,"r");
fc = rdm.getChannel();
// buffer = ByteBuffer.allocate(1024);
filelength = file.length();
}
catch(Exception e)
{
System.out.println("init() filename="+filename);
e.printStackTrace();
}
totalwrite = 0;
}
// private
// private RandomAccessFile rdm;
// private ByteBuffer buffer;
private FileChannel fc ;
private long filelength;
private int totalwrite;
private void writeMessage(SelectionKey key)
{
init();
if(fc==null)
return;
ByteBuffer buffer=ByteBuffer.allocate(1600);
writeTime++;
if(needFlip)
{
inputData.flip();
needFlip=false;
}
lastActiveTime = System.currentTimeMillis();
SocketChannel socket = (SocketChannel)key.channel();
try
{
int nBytes=0;
// do
// {
fc.position(totalwrite);
buffer.rewind();
fc.read(buffer);
buffer.flip();
nBytes = socket.write(buffer);
buffer.rewind();
if(nBytes>0)
{
totalwrite+=nBytes;
}
debugMsg("Task.writeMessage() nBytes = "+nBytes+";address="+getChannelAddress(socket));
// else
// {
// System.out.println("error in Task.writeMessage,nBytes = "+nBytes);
// System.out.println("socket close here due to write error");
// destroy();
// socket.close();
// }
// }
// while(nBytes>0 &&isFinished()==false);
if(isFinished())
{
System.out.println("the while file has been sent by Task.writeMessage");
}
}
catch(Exception e)
{
e.printStackTrace();
} }
private static void debugMsg(String s)
{
SelectorTestServer.debugMsg(s);
}
public static String getChannelAddress(SocketChannel sc)
{
Socket s = sc.socket();
StringBuffer sb = new StringBuffer();
sb.append("peer address:");
sb.append(s.getRemoteSocketAddress().toString());
return sb.toString();
}
}
//end of SelectorTestServer.java
{
private SocketAddress clientAddr;
private String filename;
private long lastActiveTime;
private long createTime;
private ByteBuffer inputData;
private boolean needFlip;
public Task()
{
inputData = ByteBuffer.allocate(1024);
needFlip = true;
totalwrite=0;
}
public Task(SocketChannel sc)
{
inputData = ByteBuffer.allocate(1024);
needFlip = true;
if(sc!=null)
{
Socket s = sc.socket();
lastActiveTime = System.currentTimeMillis();
createTime = lastActiveTime;
clientAddr=s.getRemoteSocketAddress();
}
}
private SelectionKey key;
public void activate(SelectionKey sk)
{
key =sk;
}
public void deactivate()
{
key =null;
}
public boolean isMe(SocketChannel sc)
{
if(clientAddr==null)
return false;
Socket s = sc.socket();
return s.getRemoteSocketAddress().equals(clientAddr);
}
private boolean needRead()
{
if(filename!=null&&filename.length()>3)
return false;
int len=inputData.position();
if(len<=0)
return true;
inputData.flip();
String stmp="void";
try
{
Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
CharBuffer charBuf = dec.decode(inputData);
stmp=charBuf.toString();
}
catch(Exception e)
{
e.printStackTrace();
}
inputData.rewind();
inputData.position(len);
if(stmp.endsWith(".zip"))
{
filename = stmp.substring(stmp.length()-20);
debugMsg("needRead():"+filename+";inputData.position:"+len);
return false;
}
return true;
} public void work()
{
if(key==null)
return;
if(needRead())
readMessage(key);
if(!needRead())
writeMessage(key);
deactivate();
} private void readMessage(SelectionKey key)
{
needFlip = true;
int nBytes = 0;
totalwrite =0;
SocketChannel socket = (SocketChannel)key.channel();
try
{
// do
// {
nBytes = socket.read(inputData);
debugMsg("readMessage() read :"+nBytes+" from "+getChannelAddress(socket));
// }
// while(nBytes>0);
}
catch(IOException e)
{
e.printStackTrace();
}
}
public boolean isTimeOut()
{
if(System.currentTimeMillis()-lastActiveTime>1000*30)
return true;
return false;
}
private int writeTime =0;
public boolean isFinished()
{
//call this at every writeMessage()
if(writeTime<=0)
return false;
if(filelength==totalwrite)
{
return true;
}
return false;
}
public void destroy()
{
key=null;
writeTime = 0;
lastActiveTime = 0;
createTime = 0;
clientAddr = null;
inputData.rewind();
needFlip = true;
totalwrite = 0;
try
{
fc.close();
}
catch(Exception e)
{
e.printStackTrace();
}
fc = null;
}
public void init()
{
if(fc!=null)
return;
System.out.println("Task.init(), open the file for the first write action");
needRead();
try
{
File file;
file = new File(filename);
RandomAccessFile rdm = new RandomAccessFile(file,"r");
fc = rdm.getChannel();
// buffer = ByteBuffer.allocate(1024);
filelength = file.length();
}
catch(Exception e)
{
System.out.println("init() filename="+filename);
e.printStackTrace();
}
totalwrite = 0;
}
// private
// private RandomAccessFile rdm;
// private ByteBuffer buffer;
private FileChannel fc ;
private long filelength;
private int totalwrite;
private void writeMessage(SelectionKey key)
{
init();
if(fc==null)
return;
ByteBuffer buffer=ByteBuffer.allocate(1600);
writeTime++;
if(needFlip)
{
inputData.flip();
needFlip=false;
}
lastActiveTime = System.currentTimeMillis();
SocketChannel socket = (SocketChannel)key.channel();
try
{
int nBytes=0;
// do
// {
fc.position(totalwrite);
buffer.rewind();
fc.read(buffer);
buffer.flip();
nBytes = socket.write(buffer);
buffer.rewind();
if(nBytes>0)
{
totalwrite+=nBytes;
}
debugMsg("Task.writeMessage() nBytes = "+nBytes+";address="+getChannelAddress(socket));
// else
// {
// System.out.println("error in Task.writeMessage,nBytes = "+nBytes);
// System.out.println("socket close here due to write error");
// destroy();
// socket.close();
// }
// }
// while(nBytes>0 &&isFinished()==false);
if(isFinished())
{
System.out.println("the while file has been sent by Task.writeMessage");
}
}
catch(Exception e)
{
e.printStackTrace();
} }
private static void debugMsg(String s)
{
SelectorTestServer.debugMsg(s);
}
public static String getChannelAddress(SocketChannel sc)
{
Socket s = sc.socket();
StringBuffer sb = new StringBuffer();
sb.append("peer address:");
sb.append(s.getRemoteSocketAddress().toString());
return sb.toString();
}
}
//end of SelectorTestServer.java
import java.io.*;
import java.net.*;public class NonBlockingTestClient
{
int MAXTHREAD = 2;
public static void main(String args[])
{
NonBlockingTestClient ntc=new NonBlockingTestClient();
for(int i=0;i<2;i++)
new Thread(ntc.new Request()).start();
}
public static int threadcount=0;
public synchronized int incCount()
{
threadcount++;
return threadcount;
}
class Request implements Runnable
{
private int ID;
public Request()
{
ID=incCount();
}
public void run()
{
try
{
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia,4900);
Socket skt = new Socket();
skt.connect(isa);
System.out.println("connect to :"+isa);
OutputStream os=skt.getOutputStream();
os.write("C:\\tmp\\googleapi.zip".getBytes());
os.flush();
InputStream is=skt.getInputStream();
System.out.println("request of thread "+ID+" send to server");
byte b[]=new byte[1024];
int read=0;
FileOutputStream fos=new FileOutputStream(new File(""+ID+".zip"));
do
{
read=is.read(b,0,1024);
System.out.println("thread "+ID+" read "+read);
if(read>0)
fos.write(b,0,read);
}
while(read>0);
fos.close();
is.close();
os.close();
}
catch(Exception e)
{e.printStackTrace();}
}
}
}
Merlin brings nonblocking I/O to the Java platform
IBM的文章我看过了,那个家伙闭着眼睛瞎写,有下面这个大大的bug,他竟然能把那篇吹nonblocking I/O的文章写出来,我晕
http://developer.java.sun.com/developer/bugParade/bugs/4755720.html
Bug Id 4755720
Votes 4
Synopsis registering OP_WRITE in nio 1.4.1 prevents selection of OP_READ events
import java.nio.*;
import java.net.*;
import java.nio.channels.*;
import java.io.*;
import java.util.*;
import java.nio.charset.*;
import java.awt.*;
import javax.swing.*;
import java.awt.event.*;
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
System.out.println(this.getClass().getName()+" listen on port "+port);
serverSocket.configureBlocking(false);
SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}public static void main(String args[])
{
new NonBlockingTestClient().show();
try
{ new Reactor(4900).run();}
catch (IOException ex) { ex.printStackTrace(); }
}
/*
Alternatively, use explicit SPI provider:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
http://gee.cs.oswego.edu
*/public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next()));
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
// class Reactor continued
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}
catch(IOException ex) { ex.printStackTrace();}
}
}
}
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
static final int MAXIN=1024;
static final int MAXOUT=1024;
ByteBuffer inputData = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
// Optionally try first read now
socket.configureBlocking(false);
sk = socket.register(sel, SelectionKey.OP_READ);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
private String fileName;
boolean isNeedRead()
{
if(fileName!=null&&fileName.endsWith(".zip"))
return false;
int len=inputData.position();
if(len<=0)
return true;
inputData.flip();
String stmp="void";
try
{
Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
CharBuffer charBuf = dec.decode(inputData);
stmp=charBuf.toString();
}
catch(Exception e)
{
e.printStackTrace();
}
if(stmp.endsWith(".zip"))
{
fileName = stmp;
debugMsg("needRead():"+fileName+";inputData.position:"+len);
return false;
}
inputData.rewind();
inputData.position(len);
return true;
}
void debugMsg(String s)
{
System.out.println(s);
}
boolean inputIsComplete()
{
return !isNeedRead();
}
boolean outputIsComplete() { if(fileLength==totalSend) return true; return false;}
void process()
{
if(fc==null)
{//open the file
try
{
fileLength =0;
totalSend=0;
File file = new File(fileName);
RandomAccessFile rdm = new RandomAccessFile(file,"r");
fc = rdm.getChannel();
fileLength = file.length();
}
catch(Exception e)
{
debugMsg("init() filename="+fileName);
e.printStackTrace();
}
}
}
// class Handler continued
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(inputData);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
private long fileLength;
private long totalSend;
private FileChannel fc ;
void send() throws IOException { if(fc==null)
{
sk.cancel();
debugMsg("unable to open file, cancel this connection");
return;
}
output.clear();
fc.position(totalSend);
fc.read(output);
output.flip();
int sendcount=socket.write(output);
if(sendcount>0)
{
totalSend+=sendcount;
}
debugMsg("send "+sendcount+" bytes");
if (outputIsComplete()) sk.cancel();
}
}class NonBlockingTestClient extends javax.swing.JFrame
{
int MAXTHREAD = 2;
JTextField jtfFileName=new JTextField("C:\\tmp\\googleapi.zip");
public NonBlockingTestClient(){
JButton jb=new JButton("send 2 request");
Container c=getContentPane();
c.setLayout(new GridLayout(0,1));
c.add(jtfFileName);c.add(jb);
jb.addActionListener(new java.awt.event.ActionListener()
{public void actionPerformed(ActionEvent e)
{ work(); }});
setSize(new Dimension(400,400));
setVisible(true);
pack();
}
public static void main(String args[])
{
NonBlockingTestClient ntc=new NonBlockingTestClient();
}
private void work()
{
for(int i=0;i<2;i++)
new Thread(new Request()).start();
}
public static int threadcount=0;
public synchronized int incCount()
{
threadcount++;
return threadcount;
}
class Request implements Runnable
{
private int ID;
public Request()
{
ID=incCount();
}
public void run()
{
try
{
InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(ia,4900);
Socket skt = new Socket();
skt.connect(isa);
;
System.out.println("connect to :"+isa);
OutputStream os=skt.getOutputStream();
//os.write("C:\\tmp\\googleapi.zip".getBytes());
os.write(jtfFileName.getText().getBytes());
os.flush();
InputStream is=skt.getInputStream();
System.out.println("request of thread "+ID+",port="+skt.getLocalPort()+" send to server");
byte b[]=new byte[1024];
int read=0;
FileOutputStream fos=new FileOutputStream(new File(""+ID+".zip"));
do
{
read=is.read(b,0,1024);
System.out.println("thread "+ID+" read "+read);
if(read>0)
fos.write(b,0,read);
}
while(read>0);
fos.close();
is.close();
os.close();
}
catch(Exception e)
{e.printStackTrace();}
}
}
}
IBM的那个好像是有点问题·······
最近在看JDK14 Tutorial
里面有这方面的例子,不过当然比这个简单了
只有大约4K,或许你可以去参考参考?
ServerSocketChannel.open();
channel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(port);
channel.socket().bind(isa);//把ServerSocketChannel注册到通道,产生OP_ACCEPT事件
channel.register(selector, SelectionKey.OP_ACCEPT);while (selector.select() > 0) {
Set readyKeys = selector.selectedKeys();
Iterator readyItor = readyKeys.iterator();
while (readyItor.hasNext()) {
SelectionKey key = (SelectionKey)readyItor.next();
readyItor.remove();
if (key.isAcceptable()) {
ServerSocketChannel keyChannel = (ServerSocketChannel)key.channel();
SocketChannel channel = keyChannel.accept();
channel.configureBlocking(false);
//在通道注册OP_READ事件
channel.register(selector, SelectionKey.OP_READ);
}else if (key.isReadable()) {
SocketChannel keyChannel = (SocketChannel)key.channel();
..........................
}
}客户端Socket连接服务器端,就会产生OP_ACCEPT事件的发生,这时会执行这句channel.register(selector, SelectionKey.OP_READ);那么while (selector.select() > 0) 为true,然后又会执行if (key.isReadable())中的代码,可这时客户端Socket并没有write操作,这样就会发生错误。那该如何解决呢?
你真的很厉害啊,太佩服了