为了把问题说明清楚,把我写的代码贴出来(共三部分),请大家为我解答:一个是Socket的服务端(用Java编写的),另一个是Socket的客户端(这个Socket客户端C++写的)。Socket的服务端可以向Socket的客户端发送信令,这个大家都知道,但现在Socket的服务端必须要抛出一个多线程便于一直接收Socket客户端发过来的信令,不然若客户端有什么请求,我就不知道了.
注:客户端必须先向服务端链接,这样服务端才得到一个链接(Socket).
下面详细说明一下:
服务器先起来,然后客户端向服务器端建立一个连接,服务器端收到连接后,用一个全局变量保存起来;
在这里保存起来有两个目的:1.让服务端保持这个链接,不需要以后重新再链接,2.可以在其它方法里(比如C部分)利用这个保存的Socket向客户端发信。
各个部分的含义:
A部分,服务器端收到客户端的连接,并将这个链接(socket)保存起来,注意,服务器端收到连接后并不是立即给客户端发送信令,具体向客户端发信的是通过C部分实现的;然后new SocketServerThread(socket).start(),便于可以一直(随时)接收客户端的信令。
B部分,接收客户端的信令。因为要可以随时接收不同客户端的信令(业务要求),所以要用到多线程。当通过C部分向客户端发送信令后(送信的内容是xml体,里面包含sessionid等其它数据),那么只能(因为要随时接收不同客户端的信令,所以用一个while循环)通过这个多线程接收了。先发送,再等待返回。那当多线程收到客户端信令的返回时,怎么通知C部分(已经收到返回了)呢?所以就在多线程里用一个全局Map把这个结果保存起来,便于C部分可以通过轮询这个变量,已得知客户端已返回。那怎么确定信息是C的哪次发信返回的结果呢?就是通过sessionid的值来确定的(下面有说明),即我每次在送信前记住这个sessionid,在收信时就找对应的结果(结果也是一个xml体)。每次送信的sessionid都不相同。这是一个瓶颈,尤其在多线程里,这正是我提问题的原因所在。
C部分,利用全局Map保存的socket向客户端发送信令;轮询这个变量,通过sessionid查找对应的结果,看多线程(B部分)是否收到客户端信令的返回。这是一个会议系统,只要创建会议(是通过用户在页面点击来调用C部分的方法来发起通信的),都会送信。
这是每次送信的内容,是一个xml体,即:
<iabi> <cmd id= "3000 " sessionid= "1412412341 "> <confid> 88888888 </confid> </cmd> </iabi>
A部分:public class SocketServer extends Thread {
private Logger log= Logger.getLogger(SocketServer.class.getName());
private ServerSocket serverSocket;
private int listenPort =8188;
public SocketServer(int listenPort) {
this.listenPort = listenPort;
}
public static Map socketMap=new HashMap();//保存socket的HashMap
public static Map resultMap=new HashMap();//保存发出通信的的返回结果Map
private Socket socket;
private boolean isLoop= false;
public void run(){
isLoop=true;
try {
serverSocket= new ServerSocket(listenPort);
log.info("Starting listen......");
String ccsIp="";
CcsInfo ccsInfo=null;
while(isLoop){
socket = serverSocket.accept();
log.info("Listening->RemoteSocketAddress:"+socket.getRemoteSocketAddress()+" InetAddress:"+socket.getInetAddress());
ccsIp=socket.getRemoteSocketAddress().toString().trim();
ccsIp=ccsIp.substring(1,ccsIp.indexOf(":"));
log.info("RemoteSocketIp:"+ccsIp);
socketMap.put(ccsIp,socket);
log.info("Having listen the IP:" +ccsIp+" The ccsID:"+ccsInfo.getCcsId());
new SocketServerThread(socket).start();
}
} catch (Exception e) {
isLoop = false;
log.error(" "+e);
}
}
}B部分:public class SocketServerThread extends Thread {
private Logger log= Logger.getLogger(SocketServerThread.class.getName());
private Socket socket;
private int result;
private boolean isLoop;
ByteArrayOutputStream buf = new ByteArrayOutputStream();
public SocketServerThread(Socket s){
this.socket = s;
}
public void run(){
isLoop=true;
result=0;
InputStream in=null;
try {
in =new BufferedInputStream(socket.getInputStream());
} catch (IOException e){
log.error("The CCS "+socket.getRemoteSocketAddress()+" is unusual:"+e);
}
int readLen=1024;
int count=0;
byte[] readBuf=new byte[readLen];
while(true&&isLoop){
count=0;
try{
do{
count=in.read(readBuf);
if(count==-1){
isLoop=false;
throw new IOException("The receive data length is not right");
}
buf.write(readBuf,0,count);
}while (count==readLen);
log.info("Receive:"+new String(buf.toByteArray()));
}catch(IOException ex){
log.error("The CCS "+socket.getRemoteSocketAddress()+" correspondence is unusual:"+ex);
}
//收到的结果是一个xml体,然后解析出sessionid和通信的结果result
SocketServer.resultMap.put(sessionid,result);//保存返回结果
}
}
}C部分: public class Communicate {
public synchronized String conferenceComm(String ip){ OutputStream out=null;
try {
out = socket.getOutputStream();
} catch (IOException e){
log.error("Connecting the CCS that ip is "+socket.getInetAddress()+" is exception:"+e);
return null;
}
String sessionid="1412412341";//这个sessionid是随机产生的,可以保证每次产生的不一样.
//先发送
String cmdXml="<iabi> <cmd id='3000' sessionid="+sessionid+"> <confid> 88888888 </confid> </cmd> </iabi>";
try {
byte[] response =cmdXml.getBytes();
out.write(response);
out.flush();
} catch (IOException e){
log.error("Sending command to CCS that ip is "+socket.getInetAddress()+" is exception:"+e);
return null;
}
log.info("Send:"+cmdXml);
String result=null;
int i=0;
while(i<5){//轮询
try {
Thread.sleep(100);
} catch (InterruptedException e){
log.error(e);
}
i++;
result=SocketServer.resultMap.get(sessionid);//根据sessionid查找对应的通信结果.
if(result!=null){
break;
}
}
return result;
}
}
若并发的创建会议,就有瓶颈了,因为多线程操作Map,性能方面很不好,还要在C部分轮循.本人想了很久,要满足这种业务,还要很好的性能,自己没有找到好办法.请各位高手帮忙,先谢谢了!
注:客户端必须先向服务端链接,这样服务端才得到一个链接(Socket).
下面详细说明一下:
服务器先起来,然后客户端向服务器端建立一个连接,服务器端收到连接后,用一个全局变量保存起来;
在这里保存起来有两个目的:1.让服务端保持这个链接,不需要以后重新再链接,2.可以在其它方法里(比如C部分)利用这个保存的Socket向客户端发信。
各个部分的含义:
A部分,服务器端收到客户端的连接,并将这个链接(socket)保存起来,注意,服务器端收到连接后并不是立即给客户端发送信令,具体向客户端发信的是通过C部分实现的;然后new SocketServerThread(socket).start(),便于可以一直(随时)接收客户端的信令。
B部分,接收客户端的信令。因为要可以随时接收不同客户端的信令(业务要求),所以要用到多线程。当通过C部分向客户端发送信令后(送信的内容是xml体,里面包含sessionid等其它数据),那么只能(因为要随时接收不同客户端的信令,所以用一个while循环)通过这个多线程接收了。先发送,再等待返回。那当多线程收到客户端信令的返回时,怎么通知C部分(已经收到返回了)呢?所以就在多线程里用一个全局Map把这个结果保存起来,便于C部分可以通过轮询这个变量,已得知客户端已返回。那怎么确定信息是C的哪次发信返回的结果呢?就是通过sessionid的值来确定的(下面有说明),即我每次在送信前记住这个sessionid,在收信时就找对应的结果(结果也是一个xml体)。每次送信的sessionid都不相同。这是一个瓶颈,尤其在多线程里,这正是我提问题的原因所在。
C部分,利用全局Map保存的socket向客户端发送信令;轮询这个变量,通过sessionid查找对应的结果,看多线程(B部分)是否收到客户端信令的返回。这是一个会议系统,只要创建会议(是通过用户在页面点击来调用C部分的方法来发起通信的),都会送信。
这是每次送信的内容,是一个xml体,即:
<iabi> <cmd id= "3000 " sessionid= "1412412341 "> <confid> 88888888 </confid> </cmd> </iabi>
A部分:public class SocketServer extends Thread {
private Logger log= Logger.getLogger(SocketServer.class.getName());
private ServerSocket serverSocket;
private int listenPort =8188;
public SocketServer(int listenPort) {
this.listenPort = listenPort;
}
public static Map socketMap=new HashMap();//保存socket的HashMap
public static Map resultMap=new HashMap();//保存发出通信的的返回结果Map
private Socket socket;
private boolean isLoop= false;
public void run(){
isLoop=true;
try {
serverSocket= new ServerSocket(listenPort);
log.info("Starting listen......");
String ccsIp="";
CcsInfo ccsInfo=null;
while(isLoop){
socket = serverSocket.accept();
log.info("Listening->RemoteSocketAddress:"+socket.getRemoteSocketAddress()+" InetAddress:"+socket.getInetAddress());
ccsIp=socket.getRemoteSocketAddress().toString().trim();
ccsIp=ccsIp.substring(1,ccsIp.indexOf(":"));
log.info("RemoteSocketIp:"+ccsIp);
socketMap.put(ccsIp,socket);
log.info("Having listen the IP:" +ccsIp+" The ccsID:"+ccsInfo.getCcsId());
new SocketServerThread(socket).start();
}
} catch (Exception e) {
isLoop = false;
log.error(" "+e);
}
}
}B部分:public class SocketServerThread extends Thread {
private Logger log= Logger.getLogger(SocketServerThread.class.getName());
private Socket socket;
private int result;
private boolean isLoop;
ByteArrayOutputStream buf = new ByteArrayOutputStream();
public SocketServerThread(Socket s){
this.socket = s;
}
public void run(){
isLoop=true;
result=0;
InputStream in=null;
try {
in =new BufferedInputStream(socket.getInputStream());
} catch (IOException e){
log.error("The CCS "+socket.getRemoteSocketAddress()+" is unusual:"+e);
}
int readLen=1024;
int count=0;
byte[] readBuf=new byte[readLen];
while(true&&isLoop){
count=0;
try{
do{
count=in.read(readBuf);
if(count==-1){
isLoop=false;
throw new IOException("The receive data length is not right");
}
buf.write(readBuf,0,count);
}while (count==readLen);
log.info("Receive:"+new String(buf.toByteArray()));
}catch(IOException ex){
log.error("The CCS "+socket.getRemoteSocketAddress()+" correspondence is unusual:"+ex);
}
//收到的结果是一个xml体,然后解析出sessionid和通信的结果result
SocketServer.resultMap.put(sessionid,result);//保存返回结果
}
}
}C部分: public class Communicate {
public synchronized String conferenceComm(String ip){ OutputStream out=null;
try {
out = socket.getOutputStream();
} catch (IOException e){
log.error("Connecting the CCS that ip is "+socket.getInetAddress()+" is exception:"+e);
return null;
}
String sessionid="1412412341";//这个sessionid是随机产生的,可以保证每次产生的不一样.
//先发送
String cmdXml="<iabi> <cmd id='3000' sessionid="+sessionid+"> <confid> 88888888 </confid> </cmd> </iabi>";
try {
byte[] response =cmdXml.getBytes();
out.write(response);
out.flush();
} catch (IOException e){
log.error("Sending command to CCS that ip is "+socket.getInetAddress()+" is exception:"+e);
return null;
}
log.info("Send:"+cmdXml);
String result=null;
int i=0;
while(i<5){//轮询
try {
Thread.sleep(100);
} catch (InterruptedException e){
log.error(e);
}
i++;
result=SocketServer.resultMap.get(sessionid);//根据sessionid查找对应的通信结果.
if(result!=null){
break;
}
}
return result;
}
}
若并发的创建会议,就有瓶颈了,因为多线程操作Map,性能方面很不好,还要在C部分轮循.本人想了很久,要满足这种业务,还要很好的性能,自己没有找到好办法.请各位高手帮忙,先谢谢了!
就说一下思路吧,也许对你有帮助。
首先我传的是对象,如果你觉得这样效率低,那就不用往下看了,只是为了扩展性才这样做的。
先定义了一个基类包,主要属性有ip,到达时间,是否关闭
然后是一个字符串包(专门传送字符类数据,目前这个就够了),采用的是类似http协议的方式,分为包头(http中ContentType:...)和包体(<html...),当然存放使用面向对象的方式,用一个map存包头(这里我们就可以自己定义了)信息,一个string存内容。
其次,服务器端用的是线程池,每接收一个客户端都放线程池里。
服务器端存放客户端处理的线程池(假设每个线程名叫C),另外再开2个线程,1个监听客户端线程A,一个向客户端发送线程B,创建1个同步列表,A线程每得到一个客户端就创建C并把同步列表传进去,再把C仍入线程池中运行,C循环接收对应客户端socket信息,每接收一个就仍入同步队列直到接收到结束标志,这样不必等待,能直接接收下一个,B线程循环读取同步队列,一旦有数据就处理,并拥有C线程的列表(b创建时用构造函数放进去的),所以可以根据接收到的信息,发送通知所有客户端。做的过程中碰到一些问题,现总结一下
ObjectInputStream的构造函数是阻塞的,后面代码不能运行,必须先创建ObjectOutputStream
ObjectOutputStream 和ObjectInputStream要一一对应,原因是ObjectInputStream的构造函数会先读取4个也不知道5个字节的信息,其实就是ObjectOutputStream 的写入的,这也是为什么会阻塞的原因。本来同步用Piped流做的但有问题所以改成同步列表了和另一个解决了但不知原因的问题,希望高手能帮我看看
见此贴http://topic.csdn.net/u/20071116/21/b4a89f71-ce40-41d2-8c1a-3877f1f678cc.html
你的想法跟我一开始看到这个问题的想法差不多,一个收信线程专门负责收信,然后把收到的信息put到一个queue(队列)里,然后另一个送信线程专门从queue里取信息,然后把取出的信息送信。但是似乎跟LZ的想法有出入。在LZ的另一贴里,我就一直提出一个问题,到底LZ是什么时候送信的?从LZ的解释以及LZ的代码上来看,LZ的送信和收信是同步的,即LZ本意想在送完信以后,就一直等待返信结果,也就是LZ的C部分代码的后面,循环5次去取返信结果,取不到就返回null,估计调用处理把null当作timeout处理的。to LZ
我还是一直有疑问,为什么送信以后非要等待收信处理完?从这样的处理试样上,我只能这么推测,客户端向服务器端发送一个创建会议请求,然后服务器端开始调用送信方法,并且等待送信的返信结果,收到返信结果后,服务器端再把这个结果返回给客户端的请求,这样,算一个客户端的创建会议请求结束。是这样吗?如果是这样的,那么你的瓶颈问题存在是没办法的,因为你这样本身就是要求送信和返信同步,也就是应答式的。也许我不理解你的试样吧,我觉得你这里没必要送信和返信同步,也就是送信结束后就立刻返回,即给客户端返回一个创建会议ok,然后采用LS的方法,收信线程(即你的B部分)把返信结果存到一个queue里,然后再创建一个送信线程,从queue中take取出返信结果,然后再通过socket给创建会议的客户端传输信息。这样一切的信息传送,都在socket的流中去完成。
"LZ本意想在送完信以后,就一直等待返信结果",这个我理解的意思是客户端连上服务器后,就一直连着了,当然要等待接收数据(开一线程等待),服务器端接收到数据后就把客户端socket保存起来,开一线程处理那个客户端socket直到客户端socket关闭为止,然后等待下一客户端连上来。处理客户端的线程获得数据就放到同步队列中(不处理所以不用等待,可直接就收下一数据,并且每个数据顺序和发送数据一致的),另一个线程读取同步队列,一旦有数据就处理请求并发相应信息给所有连上来的客户端。to LZ
有点不明白为什么要用信令,除非你用udp,tcp的话面向连接协议,
只要连上来的客户端不关掉下次传输数据就能标识那个客户端了。
线程层次结构图
服务器端
主线程
A监听线程(获得客户端socket并创建处理客户端线程类,并将此类放入一个处理队列中)
B处理客户端线程1(保存客户端socket,一旦接收到数据就放到同步对列中,然后接收下一数据,客户端关闭时要将自己从处理队列中移除)
B处理客户端线程2
B处理客户端线程...(此线程类采用线程池,避免线程过多)
C读取数据线程(从同步队列中读取数据,并调用处理客户端线程类的方法向处理队列中的客户端广播数据)客户端
主线程
接收数据线程(发送后就等待接收,发送如果是手动的话,比如我的聊天室就是,点击发送按钮才发送,所以此线程只负责接收,发直接通过主线程,当然你也可以在主线程里把发送数据扔进一个发送同步队列,再开一个线程专门读取队列并发送数据)发现csdn现在能上传文件,等会儿注释写好了,把程序传上来。不过不知道行不行。
你的意思我清楚,但我的意思你可能没完全明白。C部分的处理,你的想法是在服务器端由一个送信线程去处理。但是LZ的想法却是在客户端发送一个请求时调用C部分,然后一直等待接收数据结束,然后把接收的数据返回给客户端,这样算是客户端的一次请求结束,就像request和reply一样,是同步的,如果在规定的时间内reply没收到,就算是timeout。不知道是不是我没理解LZ的意思。从功能实现上来说,我的想法和你很接近,至少我做过的异步通信系统,跟你的很很类似。所以我才一直对LZ的做法持有疑惑。如果是同步通信,那么瓶颈是避免不了的。to LZ
是不是试样上有什么特殊的要求,所以才采用了这样的设计,这样的设计瓶颈是难免的。当然,也有可能我没理解的意思,但是我从你的代码和说明中,觉得你的做法是采用了同步通信。
下载地址:http://d.download.csdn.net/down/287824/hrtc
有错误或不好的地方请指正,谢谢还有这个帖的问题帮忙看看
本来同步用Piped流做的但有问题所以改成同步列表了和另一个解决了但不知原因的问题,希望高手能帮我看看
见此贴http://topic.csdn.net/u/20071116/21/b4a89f71-ce40-41d2-8c1a-3877f1f678cc.html
问题解决了,通过"同步队列"解决了这个问题.谢谢hrtc给出的例子。感谢qybao为耐心的解答问题!