小弟java才入门,一个项目需要监听自定义端口7001的socket传来的自定义协议的数据,一开始用servlet监听,发现servlet的通信中有大量的http信息,只好自己写了监听类,随着servlet的init启动监听部分一开始比较困惑DataInputStream 的堵塞方式,怎么搞那个读写的协调都不成,后来按高手的建议把读写分开双线程操作,然后又有线程间同步的问题,实在是想不清楚,就不管了什么同步了,反而调出来了。程序如下:package mf;import java.io.*;
import java.net.*;
import java.util.*;public class MutilThreadServer { private ServerSocket serverListenSocket = null; public MutilThreadServer() {
} public void start() {
// You can also init thread pool in this way.
try {
serverListenSocket = new ServerSocket(7001);
serverListenSocket.setReuseAddress(true);
System.out.println("I'm listening");
int count = 0;
while (true) {
Socket socket = serverListenSocket.accept();
new SocketProcess(socket);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
serverCleanup();
}
} public static void main(String args[]) {
MutilThreadServer server = new MutilThreadServer();
server.start();
} public void serverCleanup() {
if (null != serverListenSocket) {
try {
serverListenSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} class SocketProcess { private Socket socket = null;
private String writeBuff = null; public SocketProcess(Socket socket) {
this.socket = socket;
MyReadThread r = new MyReadThread();
MyWriteThread w = new MyWriteThread();
r.start();
w.start();
} class MyReadThread extends Thread { private DataInputStream in = null;
private byte readBuff[]; public MyReadThread() {
try {
in = new DataInputStream(socket.getInputStream());
readBuff = new byte[512];
} catch (IOException e) {
e.printStackTrace();
}
} private void myWait(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
}
} private void cleanup() {
if (null != socket) {
try {
socket.close();
socket = null;
System.out.println("now clear up");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} @Override
public void run() {
int nRead;
try {
while (null != socket) { //没有用同步
Arrays.fill(readBuff, (byte) 0x00);
try {
socket.sendUrgentData(0xff); //用这个办法测试链接是否已经中断
nRead = in.read(readBuff, 0, readBuff.length);
if (-1 == nRead) {
System.err.println(" read waiting");
myWait(3000);
continue;
}
} catch (SocketException ex) {
break;
}
System.err.println("read in :" + nRead + " bytes") ;
String str = new String(readBuff);
str.trim();
writeBuff = str;
System.err.println(str);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
cleanup();
}
}
}
//------------------------------------------------------ class MyWriteThread extends Thread { private DataOutputStream out = null; public MyWriteThread() {
try {
out = new DataOutputStream(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
} private void myWait(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
}
} private void cleanup() {
if (null != socket) {
try {
socket.close();
socket = null;
System.out.println("now clear up");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} @Override
public void run() {
try {
while (null != socket) { //没有用同步
try {
if (null != writeBuff) {
System.err.println("find data in buff");
socket.sendUrgentData(0x61);
out.write(writeBuff.getBytes(), 0, writeBuff.length());
out.flush();
writeBuff = null;
}
System.err.println(" write waiting");
myWait(3000);
} catch (SocketException ex) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
cleanup();
}
}
}
}
}
import java.net.*;
import java.util.*;public class MutilThreadServer { private ServerSocket serverListenSocket = null; public MutilThreadServer() {
} public void start() {
// You can also init thread pool in this way.
try {
serverListenSocket = new ServerSocket(7001);
serverListenSocket.setReuseAddress(true);
System.out.println("I'm listening");
int count = 0;
while (true) {
Socket socket = serverListenSocket.accept();
new SocketProcess(socket);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
serverCleanup();
}
} public static void main(String args[]) {
MutilThreadServer server = new MutilThreadServer();
server.start();
} public void serverCleanup() {
if (null != serverListenSocket) {
try {
serverListenSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} class SocketProcess { private Socket socket = null;
private String writeBuff = null; public SocketProcess(Socket socket) {
this.socket = socket;
MyReadThread r = new MyReadThread();
MyWriteThread w = new MyWriteThread();
r.start();
w.start();
} class MyReadThread extends Thread { private DataInputStream in = null;
private byte readBuff[]; public MyReadThread() {
try {
in = new DataInputStream(socket.getInputStream());
readBuff = new byte[512];
} catch (IOException e) {
e.printStackTrace();
}
} private void myWait(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
}
} private void cleanup() {
if (null != socket) {
try {
socket.close();
socket = null;
System.out.println("now clear up");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} @Override
public void run() {
int nRead;
try {
while (null != socket) { //没有用同步
Arrays.fill(readBuff, (byte) 0x00);
try {
socket.sendUrgentData(0xff); //用这个办法测试链接是否已经中断
nRead = in.read(readBuff, 0, readBuff.length);
if (-1 == nRead) {
System.err.println(" read waiting");
myWait(3000);
continue;
}
} catch (SocketException ex) {
break;
}
System.err.println("read in :" + nRead + " bytes") ;
String str = new String(readBuff);
str.trim();
writeBuff = str;
System.err.println(str);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
cleanup();
}
}
}
//------------------------------------------------------ class MyWriteThread extends Thread { private DataOutputStream out = null; public MyWriteThread() {
try {
out = new DataOutputStream(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
} private void myWait(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
}
} private void cleanup() {
if (null != socket) {
try {
socket.close();
socket = null;
System.out.println("now clear up");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} @Override
public void run() {
try {
while (null != socket) { //没有用同步
try {
if (null != writeBuff) {
System.err.println("find data in buff");
socket.sendUrgentData(0x61);
out.write(writeBuff.getBytes(), 0, writeBuff.length());
out.flush();
writeBuff = null;
}
System.err.println(" write waiting");
myWait(3000);
} catch (SocketException ex) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
cleanup();
}
}
}
}
}
import java.io.*;
import java.net.*;
import java.util.*;public class TestClient2 {
byte buff[] = new byte[1024];
Socket socket;
String strBuff;
boolean closeFlag;
public static void main(String[] args) {
// TODO code application logic here
new TestClient2(7001);
}
public TestClient2(int port) {
try {
socket = new Socket("127.0.0.1", port);
closeFlag = false;
MyReadThread r = new MyReadThread();
r.start();
MyWriteThread w = new MyWriteThread();
w.start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void cleanup() {
if (null != socket) {
try {
socket.close();
socket = null;
System.out.println("now clear up");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class MyReadThread extends Thread {
private DataInputStream in = null;
private byte readBuff[];
public MyReadThread() {
try {
in = new DataInputStream(socket.getInputStream());
readBuff = new byte[512];
} catch (IOException e) {
e.printStackTrace();
}
}
private void myWait(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
}
}
@Override
public void run() {
try {
while (null != socket) {
int nRead;
Arrays.fill(readBuff, (byte) 0x00);
try {
socket.sendUrgentData(0x61);
System.out.println("try read from server");
nRead = in.read(readBuff, 0, readBuff.length);
System.out.println("see this not jam");
} catch (SocketException ex) {
break;
}
if (-1 == nRead) {
System.out.println(" read waiting");
myWait(1000);
continue;
}
String str = new String(readBuff);
str.trim();
System.out.println(str);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
cleanup();
}
}
}
//------------------------------------------------------
class MyWriteThread extends Thread {
private DataOutputStream out = null;
public MyWriteThread() {
try {
out = new DataOutputStream(socket.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
}
}
private void myWait(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
}
}
@Override
public void run() {
try {
BufferedReader sysIn = new BufferedReader(new InputStreamReader(System.in));
while (null != socket) {
try {
System.out.print("Input:");
String message = sysIn.readLine();
if (message.equals("bye")) {
break;
}
out.write(message.getBytes());
out.flush();
myWait(1000);
} catch (SocketException ex) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
cleanup();
}
}
}
}