private byte[] dataInput;
public byte[] getDataInput() {
byte[] temp = dataInput;
return temp;
}
public void setDataInput(byte[] dataInput) {
this.dataInput = dataInput;
}开始get方法也有bug,后来定义了一个临时变量,bug就不再了,但是set方法定义临时变量却不行,我该怎么改写set方法才会没有bug了呢?
public byte[] getDataInput() {
byte[] temp = dataInput;
return temp;
}
public void setDataInput(byte[] dataInput) {
this.dataInput = dataInput;
}开始get方法也有bug,后来定义了一个临时变量,bug就不再了,但是set方法定义临时变量却不行,我该怎么改写set方法才会没有bug了呢?
public void setDataInput(byte[] dataInput) {
//this.dataInput = dataInput;
if(dataInput == null){
this.dataInput = null;
}else{
this.dataInput = new byte[dataInput.length];
for(int i = 0 ; i<dataInput.length;i++){
this.dataInput[i]=dataInput[i];
}
}
}
当一个对象或引用类型变量被当作参数传递时,也是值传递,这个值就是对象的引用,因此JAVA中只有值传递,没有引用传递
for (int i = 0; i < MAX_THREADS; i++) {
Thread r = new Reader();
Thread w = new Writer();
r.start();
w.start();
}Reader这个类是继承Thread类的,为什么r.start()和w.start()都有bug?
public byte[] getDataInput() {
if(dataInput == null){
return null;
}else{
return (byte[]) dataInput.clone();
}
}
线程,我基本不怎么用。所以请先给BUG描述。然后根据现象结合API等分析
* 多线程读数据
*/package com.hr.qutserver.pretreatserver;
import java.util.List;
import java.util.LinkedList;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.nio.ByteBuffer;
import java.io.IOException;import com.letpower.framework.log.SysLogger;
public class Reader extends Thread {
static SysLogger log = SysLogger.getInstance(Reader.class.getName());
@SuppressWarnings("unchecked")
private static List pool = new LinkedList();
private static Notifier notifier = Notifier.getNotifier(); public Reader() {
} public void run() {
while (true) {
try {
SelectionKey key;
synchronized (pool) {
while (pool.isEmpty()) {
pool.wait();
}
key = (SelectionKey) pool.remove(0);
} // 读取数据
read(key);
}
catch (Exception e) {
log.error(e.getMessage(),e);
continue;
}
}
} /**
* 读取客户端发出请求数据
* @param sc 套接通道
*/
private static int BUFFER_SIZE = 1024;
public static byte[] readRequest(SocketChannel sc) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int off = 0;
int r = 0;
byte[] data = new byte[BUFFER_SIZE * 10]; while ( true ) {
buffer.clear();
r = sc.read(buffer);
if (r == -1 || r == 0 ) break;
if ( (off + r) > data.length) {
data = grow(data, BUFFER_SIZE * 10);
}
byte[] buf = buffer.array();
System.arraycopy(buf, 0, data, off, r);
off += r;
}
byte[] req = new byte[off];
System.arraycopy(data, 0, req, 0, off);
return req;
} /**
* 处理连接数据读取
* @param key SelectionKey
*/
public void read(SelectionKey key) {
try {
// 读取客户端数据
SocketChannel sc = (SocketChannel) key.channel();
byte[] clientData = readRequest(sc); Request request = (Request)key.attachment();
request.setDataInput(clientData); // 触发onRead
notifier.fireOnRead(request); // 提交主控线程进行写处理
new PreQutServer().processWriteRequest(key);
}
catch (Exception e) {
log.error(e.getMessage(),e);
notifier.fireOnError("Error occured in Reader: " + e.getMessage());
}
} /**
* 处理客户请求,管理用户的连接池,并唤醒队列中的线程进行处理
*/
@SuppressWarnings("unchecked")
public static void processRequest(SelectionKey key) {
synchronized (pool) {
pool.add(pool.size(), key);
pool.notifyAll();
}
} /**
* 数组扩容
* @param src byte[] 源数组数据
* @param size int 扩容的增加量
* @return byte[] 扩容后的数组
*/
public static byte[] grow(byte[] src, int size) {
byte[] tmp = new byte[src.length + size];
System.arraycopy(src, 0, tmp, 0, src.length);
return tmp;
}
}
这是Reader的代码
@SuppressWarnings("unchecked")
private List wpool = new LinkedList(); // ��Ӧ��
private Selector selector;
private ServerSocketChannel sschannel;
private InetSocketAddress address;
protected Notifier notifier;
private int port;
static SysLogger mLog = SysLogger.getInstance(TestServer.class.getName());
/**
* ������ط����߳�
* @param port ����˿�
* @throws java.lang.Exception
*/
private static int MAX_THREADS = 4;
public TestServer(int port) throws Exception {
this.port = port;
notifier = Notifier.getNotifier();
ServerHandler socketserver = new ServerHandler();
Notifier notifier = Notifier.getNotifier();
notifier.addListener(socketserver);
for (int i = 0; i < MAX_THREADS; i++) {
Thread r = new Reader();
Thread w = new Writer();
r.start();
w.start();
}
selector = Selector.open();
sschannel = ServerSocketChannel.open();
sschannel.configureBlocking(false);
address = new InetSocketAddress(port);
ServerSocket ss = sschannel.socket();
ss.bind(address);
sschannel.register(selector, SelectionKey.OP_ACCEPT);
} public TestServer() {//修改:20090625赵涵 (新增空参数构造方法)
} @SuppressWarnings("unchecked")
public void run() {
System.out.println("Server started ...");
System.out.println("Server listening on port: " + port);
while (true) {
try {
int num = 0;
num = selector.select(); if (0 < num) {
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
if ( (key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
notifier.fireOnAccept(); SocketChannel sc = ssc.accept();
sc.configureBlocking(false); System.out.println("���յ��ͻ�l�ӣ�4��:" +
sc.socket().getInetAddress() +
":" + sc.socket().getPort());
Request request = new Request(sc);
notifier.fireOnAccepted(request);
sc.register(selector, SelectionKey.OP_READ, request);
}
else if ( (key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ) {
Reader.processRequest(key);
key.cancel();
}
else if ( (key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE ) {
Writer.processRequest(key);
key.cancel();
}
}
}
else {
addRegister(); }
}
catch (Exception e) {
notifier.fireOnError("Error occured in Server: " + e.getMessage());
continue;
}
}
}
private void addRegister() {
synchronized (wpool) {
while (!wpool.isEmpty()) {
SelectionKey key = (SelectionKey) wpool.remove(0);
SocketChannel schannel = (SocketChannel)key.channel();
try {
schannel.register(selector, SelectionKey.OP_WRITE, key.attachment());
}
catch (Exception e) {
mLog.error(e.getMessage(),e);
try {
schannel.finishConnect();
schannel.close();
schannel.socket().close();
notifier.fireOnClosed((Request)key.attachment());
}
catch (Exception e1) {
mLog.error(e1.getMessage(),e1);//修改:20090624 赵涵
}
notifier.fireOnError("Error occured in addRegister: " + e.getMessage());
}
}
}
}
@SuppressWarnings("unchecked")
public void processWriteRequest(SelectionKey key) {
synchronized (wpool) {
wpool.add(wpool.size(), key);
wpool.notifyAll();
}
selector.wakeup(); } public static void main(String args[])throws Exception{ try{
new Thread(new TestServer(1111)).start();
}catch(Exception ex){
ex.printStackTrace();
}
}
}
Reader没有子类,上面这段程序是有bug的那段
w.start();
在构造方法中调用线程的start方法是出现问题的
[email protected]