今天看了一个开源代码,对他使用Condition方法,不知道他这么写有什么意思:
new GatewayThread;start后,他紧接着调用了awaitUp()方法。
问题来了GatewayThread.start后,awaitUp()转顺就失效了,这个写法是为了什么呢?_thread = new GatewayThread(_port, _destinationPort);
_thread.start();
_thread.awaitUp();public class GatewayThread extends Thread { protected final static Logger LOG = Logger.getLogger(GatewayThread.class); private final int _port;
private final int _destinationPort;
private ServerSocket _serverSocket;
private Lock _lock = new ReentrantLock();
private Condition _runningCondition = _lock.newCondition();
private boolean _running = false; public GatewayThread(int port, int destinationPort) {
_port = port;
_destinationPort = destinationPort;
setDaemon(true);
} @Override
public void run() {
final List<Thread> runningThreads = new Vector<Thread>();
try {
LOG.info("Starting gateway on port " + _port + " pointing to port " + _destinationPort);
_serverSocket = new ServerSocket(_port);
_lock.lock();
try {
_running = true;
_runningCondition.signalAll();
} finally {
_lock.unlock();
}
while (true) {
final Socket socket = _serverSocket.accept();
LOG.info("new client is connected " + socket.getInetAddress());
final InputStream incomingInputStream = socket.getInputStream();
final OutputStream incomingOutputStream = socket.getOutputStream(); final Socket outgoingSocket;
try {
outgoingSocket = new Socket("localhost", _destinationPort);
} catch (Exception e) {
LOG.warn("could not connect to " + _destinationPort);
continue;
}
final InputStream outgoingInputStream = outgoingSocket.getInputStream();
final OutputStream outgoingOutputStream = outgoingSocket.getOutputStream(); Thread writeThread = new Thread() {
@Override
public void run() {
runningThreads.add(this);
try {
int read = -1;
while ((read = incomingInputStream.read()) != -1) {
outgoingOutputStream.write(read);
}
} catch (IOException e) {
// ignore
} finally {
closeQuietly(outgoingOutputStream);
runningThreads.remove(this);
}
} @Override
public void interrupt() {
try {
socket.close();
outgoingSocket.close();
} catch (IOException e) {
LOG.error("error on stopping closing sockets", e);
} super.interrupt();
}
}; Thread readThread = new Thread() {
@Override
public void run() {
runningThreads.add(this);
try {
int read = -1;
while ((read = outgoingInputStream.read()) != -1) {
incomingOutputStream.write(read);
}
} catch (IOException e) {
// ignore
} finally {
closeQuietly(incomingOutputStream);
runningThreads.remove(this);
}
}
}; writeThread.setDaemon(true);
readThread.setDaemon(true); writeThread.start();
readThread.start();
}
} catch (SocketException e) {
if (!_running) {
throw ExceptionUtil.convertToRuntimeException(e);
}
LOG.info("Stopping gateway");
} catch (Exception e) {
LOG.error("error on gateway execution", e);
} for (Thread thread : new ArrayList<Thread>(runningThreads)) {
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
} protected void closeQuietly(Closeable closable) {
try {
closable.close();
} catch (IOException e) {
// ignore
}
} @Override
public void interrupt() {
try {
_serverSocket.close();
} catch (Exception cE) {
LOG.error("error on stopping gateway", cE);
}
super.interrupt();
} public void interruptAndJoin() throws InterruptedException {
interrupt();
join();
} public void awaitUp() {
_lock.lock();
try {
while (!_running) {
_runningCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
_lock.unlock();
}
}
}线程socketJavaConditionlock
new GatewayThread;start后,他紧接着调用了awaitUp()方法。
问题来了GatewayThread.start后,awaitUp()转顺就失效了,这个写法是为了什么呢?_thread = new GatewayThread(_port, _destinationPort);
_thread.start();
_thread.awaitUp();public class GatewayThread extends Thread { protected final static Logger LOG = Logger.getLogger(GatewayThread.class); private final int _port;
private final int _destinationPort;
private ServerSocket _serverSocket;
private Lock _lock = new ReentrantLock();
private Condition _runningCondition = _lock.newCondition();
private boolean _running = false; public GatewayThread(int port, int destinationPort) {
_port = port;
_destinationPort = destinationPort;
setDaemon(true);
} @Override
public void run() {
final List<Thread> runningThreads = new Vector<Thread>();
try {
LOG.info("Starting gateway on port " + _port + " pointing to port " + _destinationPort);
_serverSocket = new ServerSocket(_port);
_lock.lock();
try {
_running = true;
_runningCondition.signalAll();
} finally {
_lock.unlock();
}
while (true) {
final Socket socket = _serverSocket.accept();
LOG.info("new client is connected " + socket.getInetAddress());
final InputStream incomingInputStream = socket.getInputStream();
final OutputStream incomingOutputStream = socket.getOutputStream(); final Socket outgoingSocket;
try {
outgoingSocket = new Socket("localhost", _destinationPort);
} catch (Exception e) {
LOG.warn("could not connect to " + _destinationPort);
continue;
}
final InputStream outgoingInputStream = outgoingSocket.getInputStream();
final OutputStream outgoingOutputStream = outgoingSocket.getOutputStream(); Thread writeThread = new Thread() {
@Override
public void run() {
runningThreads.add(this);
try {
int read = -1;
while ((read = incomingInputStream.read()) != -1) {
outgoingOutputStream.write(read);
}
} catch (IOException e) {
// ignore
} finally {
closeQuietly(outgoingOutputStream);
runningThreads.remove(this);
}
} @Override
public void interrupt() {
try {
socket.close();
outgoingSocket.close();
} catch (IOException e) {
LOG.error("error on stopping closing sockets", e);
} super.interrupt();
}
}; Thread readThread = new Thread() {
@Override
public void run() {
runningThreads.add(this);
try {
int read = -1;
while ((read = outgoingInputStream.read()) != -1) {
incomingOutputStream.write(read);
}
} catch (IOException e) {
// ignore
} finally {
closeQuietly(incomingOutputStream);
runningThreads.remove(this);
}
}
}; writeThread.setDaemon(true);
readThread.setDaemon(true); writeThread.start();
readThread.start();
}
} catch (SocketException e) {
if (!_running) {
throw ExceptionUtil.convertToRuntimeException(e);
}
LOG.info("Stopping gateway");
} catch (Exception e) {
LOG.error("error on gateway execution", e);
} for (Thread thread : new ArrayList<Thread>(runningThreads)) {
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
} protected void closeQuietly(Closeable closable) {
try {
closable.close();
} catch (IOException e) {
// ignore
}
} @Override
public void interrupt() {
try {
_serverSocket.close();
} catch (Exception cE) {
LOG.error("error on stopping gateway", cE);
}
super.interrupt();
} public void interruptAndJoin() throws InterruptedException {
interrupt();
join();
} public void awaitUp() {
_lock.lock();
try {
while (!_running) {
_runningCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
_lock.unlock();
}
}
}线程socketJavaConditionlock
permission java.util.PropertyPermission "user.dir ", "read ";
permission java.util.PropertyPermission "user.home ", "read ";
permission java.util.PropertyPermission "java.home ", "read ";
permission java.util.PropertyPermission "java.class.path ", "read ";
permission java.util.PropertyPermission "user.name ", "read ";
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc ";
permission java.lang.RuntimePermission "accessClassInPackage.sun.audio ";
permission java.lang.RuntimePermission "modifyThread ";
permission java.lang.RuntimePermission "modifyThreadGroup ";
permission java.lang.RuntimePermission "loadLibrary.* ";
permission java.io.FilePermission " < <ALL FILES> > ", "read ";
permission java.io.FilePermission "${user.dir}${/}jmf.log ", "write ";
permission java.io.FilePermission "${user.home}${/}.JMStudioCfg ", "write ";
permission java.net.SocketPermission "* ", "connect,accept ";
permission java.io.FilePermission "C:\WINNT\TEMP\* ", "write ";
permission java.io.FilePermission "C:\WINNT\TEMP\* ", "delete ";
permission java.awt.AWTPermission "showWindowWithoutWarningBanner ";
permission javax.sound.sampled.AudioPermission "record ";
permission java.net.SocketPermission "- ", "listen ";
permission java.net.SocketPermission "- ", "accept ";
permission java.net.SocketPermission "- ", "connect ";
permission java.net.SocketPermission "- ", "resolve ";
permission java.security.AllPermission;
这段的主要意思好象是当服务器端没有监听前让他等待,一旦服务器监听开始了,就不再等待了
也就是在gatewayThread在建立socket之前,主程序就要等待try {
_running = true;
_runningCondition.signalAll();
} finally {
_lock.unlock();
}