先说明一下:在这个问题里,大家可以看成是简单的Server/Client模型,一个服务端ServerSocket和多个客户端Socket的连接。Client端是Delphi做的,程序上线之后可能会有几千个Client端,呵呵,如果可能的话,会上到几万个Client端。这个是固定的:Client端每秒3次到Server端去取数据,每个Client端都是每秒3次取数。只有Client端程序退出之后Server才会断开这个Client端。Server端的实现:/**
*
*接收Client数据的服务线程
*
*/
class ClientServer extends Thread{
private ServerSocket ss;
private int PORT;
private DataSCalcServer dscs;
public ClientServer(int port,DataSCalcServer dscs){
this.PORT = port;
this.dscs = dscs;
}
public void run(){
ExecutorService execPool = null;
try{
ss = new ServerSocket(PORT);
// execPool = Executors.newFixedThreadPool(25); //创建固定大小的线程池* A处
while(true){
Socket socket = ss.accept();
// execPool.execute(new clientThread(socket));
new clientThread(socket).start();
}
}catch(IOException ioe){
execPool.shutdown();
System.out.println("Error ss.accept():"+ioe.getMessage());
}
}
class clientThread extends Thread{
private PrintWriter pw ;
private BufferedReader br ;
private Socket socket;
public clientThread(Socket socket){
this.socket = socket;
try{
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
pw = new PrintWriter(socket.getOutputStream());
}catch(Exception e){
e.printStackTrace();
}
}
public void run(){
boolean status = true;
try{
while(status){
try{
if(br != null){
String str = br.readLine().trim();
String packet = dscs.setToClientPacket(str); //打包发送给Client
if(packet != null){
pw.println(packet);
pw.flush();
}
}
sleep(1); //B处
}catch(Exception e){
status = false;
System.out.println("Warning delphiClient interrupted ....");
e.printStackTrace();
}
}
}catch(Exception e){
System.out.println("Error run():"+e.getMessage());
}finally{
try{
if(br != null)
br.close();
if(pw != null)
pw.close();
if(socket != null)
socket.close();
}catch(Exception e){
System.out.println("Error close failed.");
}
}
}
}
}
现在担心的问题就是:
1、如果几千个Client端,那Server端就会产生几千个线程,这样程序会不会死掉?! 以前测试过线程,就是当线程达到2000多个的时候,程序就会变慢了!
我想这个Server有几千个Client端时,也应该会死掉! 那我应该怎么改啊?!
2、如果上面A处使用线程池的话, Executors.newFixedThreadPool(25);这里线程池的大小要多少才好?! 呵呵!加了线程池的话,程序应该死不了了吧!死不了,但会有什么影响呢?!!
3、在B处加了睡眠之后,性能是不是应该有很大提升吧?!
*
*接收Client数据的服务线程
*
*/
class ClientServer extends Thread{
private ServerSocket ss;
private int PORT;
private DataSCalcServer dscs;
public ClientServer(int port,DataSCalcServer dscs){
this.PORT = port;
this.dscs = dscs;
}
public void run(){
ExecutorService execPool = null;
try{
ss = new ServerSocket(PORT);
// execPool = Executors.newFixedThreadPool(25); //创建固定大小的线程池* A处
while(true){
Socket socket = ss.accept();
// execPool.execute(new clientThread(socket));
new clientThread(socket).start();
}
}catch(IOException ioe){
execPool.shutdown();
System.out.println("Error ss.accept():"+ioe.getMessage());
}
}
class clientThread extends Thread{
private PrintWriter pw ;
private BufferedReader br ;
private Socket socket;
public clientThread(Socket socket){
this.socket = socket;
try{
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
pw = new PrintWriter(socket.getOutputStream());
}catch(Exception e){
e.printStackTrace();
}
}
public void run(){
boolean status = true;
try{
while(status){
try{
if(br != null){
String str = br.readLine().trim();
String packet = dscs.setToClientPacket(str); //打包发送给Client
if(packet != null){
pw.println(packet);
pw.flush();
}
}
sleep(1); //B处
}catch(Exception e){
status = false;
System.out.println("Warning delphiClient interrupted ....");
e.printStackTrace();
}
}
}catch(Exception e){
System.out.println("Error run():"+e.getMessage());
}finally{
try{
if(br != null)
br.close();
if(pw != null)
pw.close();
if(socket != null)
socket.close();
}catch(Exception e){
System.out.println("Error close failed.");
}
}
}
}
}
现在担心的问题就是:
1、如果几千个Client端,那Server端就会产生几千个线程,这样程序会不会死掉?! 以前测试过线程,就是当线程达到2000多个的时候,程序就会变慢了!
我想这个Server有几千个Client端时,也应该会死掉! 那我应该怎么改啊?!
2、如果上面A处使用线程池的话, Executors.newFixedThreadPool(25);这里线程池的大小要多少才好?! 呵呵!加了线程池的话,程序应该死不了了吧!死不了,但会有什么影响呢?!!
3、在B处加了睡眠之后,性能是不是应该有很大提升吧?!
2.这个不敢下结论,要多测试才能找到一个优化值;
3.确实sleep可以避免clientThread独占cpu资源,但是建议时间稍微长一点,毕竟你是3s才一次响应(楼主的代码我看不出来是L的小写还是数字1...)
忘了说了,这个Server端是可以多个的,楼上的说的是不是相当于多个Server端啊?
那同一台电脑中启动多个不同监听端口的Server端行不行?
还是必须要N台电脑,每台电脑只一个Server端?
1.对于可长连接的Client端的数量,与你服务端的系统及硬件相关。另需要进行服务端的优化。我这里有一个数据可以与你分享下:从理论上来讲,一个运行专属服务的Win32系统,应该有240MB左右的核心内存(理论上限是
256MB)可以供其使用,假设每一个连接占用4k(具体占用多少已经忘了,印象中应该低于这个值),那仍然能够
有可以建立超过6万个连接资源。如果是64位平台,就没有了这个核心内存的限制,也就意味着只要有足够的
内存资源,连接数可以远远地超过6万。从实际测试上来看,在Win32平台下可以同时接受了8万多个连接,但是几乎没有通讯能力,在6万多的时候,通迅可以保持正常。2.对于大数据量并发的处理是必须要使用线程池的,以减少实例化一个线程所耗费的时间。对于池中应该开多大的一个常驻线程数量,这个应该是根据实际的通讯量来考虑。
但是如果连接完全是长连接,就失去使用线程池的意义了。由于长连接的特性,线程实例化并非是整个系统运行的消耗,所以对于这个系统的设计,可以考虑不适用池来进行处理。3.Sleep(1)感觉是没有什么影响的,Sleep的时间应该是能够使得服务端系统在这段时间内执行完成其他线程所需执行的指令。不然就会CPU占用过高,对这里,需要用优化原则参考确定。另:对于长连接建议分布服务端(并非集群),用例如通讯服务器进行链路及数据传输的处理。我们测试过程中,使用专属服务器在20000左右套接的情况下还是比较轻松的能够处理。
谢谢,楼上的兄弟!!现在可以确定的是,每个客户端都是长连接的,就跟QQ一样,什么时候下线这不知道。那这样看来是不用线程池比较好了!。还有你所说的“可以建立超过6万个连接资源”这个,我不很明白啊!要是有6万个连接,那程序不就有6万个线程挂着吗??以前我测试的启了5千多个线程,而且每个线程还加了sleep,但还不行了啊,CPU很卡!!这Server端程序以后的环境是:CPU:四核,内存:4G ,系统:windows server 2003, 这环境一般能顶住多少个连接啊?
不过具体的东东不知道pizzame能不能大概说一哈?
而且服务器能否撑住要看服务的峰值并发,以及这个数据包处理的速度。
socket的已经很优化了,因为没有拆包和封包的过程。最好能做压力测试,评估服务器能够支持的客户端数,当用户发展到一定程度,再加上负载均横的策略。
1.线程的操作贴近系统级
一个额定系统中刨去操作系统的的开耗可供使用的资源是固定的,如果能够将对线程的操作建立在操作系统级别显然是更加明智的。因此,使用操作系统的API进行线程的操作应该是我们这里专属服务器设计的基础。在上一贴中我共享的数据是早些时候在微软官方上Windows操作系统上的一个实际数据。由于没有给出实际的测试数据及测试环境,所以建议在这里进行一个参考,是否能够到达6w这样一个量级。我个人感觉还是应该可信的。理由是这样的:我们自己的通讯服务器是使用C++来实现的,主要是实现对连接线程的调度,通讯协议处理,与应用服务器交互。由于增加了调度、通讯及交互处理带来了一定的消耗,在接近4w测试是一个极限值。反推这部分的消耗感觉6w还是没有撒谎的。2.业务处理的优化
这是一个重点,我们从操作系统中申请一个线程的目的是并行处理业务。而如何将业务处理划分是相当重要的,这里说一下我之前为什么建议使用专属服务器而不建议负载均衡的原因。负载均衡是在负载不额定的情况下通过调度减缓压力的一种方法,当负载额度固定而且压力很大的时候,调度就变得毫无意义。
这里举个例子:一套工序上需要2个步骤的工作:拆包、搬运。现在有2个工作队。当在这个工序工作压力不大的时候,我们可以轮流的分配这2个工作队来完成这一个工序的工作。但是如果压力过大,当2个工作队都满负荷运作都不能完成的时候,我们倒不如去细分这个工序,让第1队去做拆包,第2队去做搬运,已达到流水作业。
优化的重点在于分配的合理。由于对于实际的设计中是需要基于对业务的测试来进行的,所以我这里仅仅能够抛出2个粗粒度的方法论。希望能对liuzhengkang的设计起到帮助。
package net;import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;public class UDPSend { /**
* @param args
*/
public static void main(String[] args) {
try {
FileInputStream fis = new FileInputStream("pop.jpg");
BufferedInputStream bis = new BufferedInputStream(fis);
InetAddress address = InetAddress.getByName("192.168.0.11");
DatagramSocket dsocket = new DatagramSocket();
DatagramPacket packet = null;
byte[] msg = new byte[1024];
int len = 0;
while ((len = bis.read(msg)) != -1) {
packet = new DatagramPacket(msg, len, address, 8888);
dsocket.send(packet);
}
msg = "quit".getBytes();
packet = new DatagramPacket(msg, msg.length, address, 8888);
dsocket.send(packet);
System.out.println("发送完毕");
bis.close();
dsocket.close();
} catch (FileNotFoundException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
} catch (UnknownHostException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
} catch (SocketException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
} catch (IOException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}}
package net;import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;public class UDPReceive { /**
* @param args
*/
public static void main(String[] args) {
try {
FileOutputStream fos = new FileOutputStream("popNew.jpg");
BufferedOutputStream bos = new BufferedOutputStream(fos);
DatagramSocket dsocket = new DatagramSocket(8888);
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
System.out.println("等待接收数据……");
for (; ;) {
dsocket.receive(packet);
if (new String(buffer, 0, packet.getLength()).equalsIgnoreCase("quit")) {
break;
}
bos.write(buffer, 0, packet.getLength());
bos.flush();
}
System.out.println("完毕!!!");
bos.close();
dsocket.close();
} catch (SocketException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
} catch (IOException e) {
// TODO 自动生成 catch 块
e.printStackTrace();
}
}}
2:如果做通信类的,那么考虑用nio。
股票服务器?是不是运行topview的替代品的服务器啊?
再次谢谢大家的回复!!呵呵!再请问一下,非阻塞流可以保证实时性吗?刚找了这方面的资料,还是不太理解,但应该不行吧。这个程序有关系到:即时信息通信的和交易方面的,就是对实时性要求比较高吧,客户端发来请求的时候就要马上响应,就像我们网上购物下单什么的,实时响应速度一定要有保证啊。这样的话NIO就不行了!
还是NIO吧
谢谢大家的意见,本贴再加分!大家好像都看成是“几千个并发”了,我都搞晕了,“大量连接”和“大量并发”!!?几千个Client端连接,不一定就是几千个并发吧!!!以上大家的观点都是NIO,我不知道那个好,因为我没有用过NIO,希望有用过的朋友给点意见!
如果1秒钟内启动了1000个线程,并发可能是1000,也可能是1。如果每个线程要1秒才能完成,那么就是1000个并发,如果每个线程只要<1/1000秒,那么就有可能顺序完成,就等于没有并发的线程。
用JAVA做的程序,超过1000个并发线程,机器的性能就会大打折扣,这是因为把大量的CPU时间耗费到线程的调度上了。
你提到的需求:“Client端每秒3次到Server端去取数据,每个Client端都是每秒3次取数。只有Client端程序退出之后Server才会断开这个Client端。 ”“如果可能的话,会上到几万个Client端。”
用你写的程序来处理这个需求,极可能达到1000以上的并发线程,因此可以说你的程序不是这种需求好的解决方案。
针对你的需求,采用异步非堵塞式IO是更好的方案。
同步IO采用的是轮询方式,直到读出或者写入数据,因此你要处理多个通道的数据,就需要多个线程。
而异步IO采用的是观察者模式,只有事件发生的时候,才运行程序。因此不需要多余的线程。
服务器代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;/**
* 非堵塞式 Server Socket 示例
* @author hbwhwang
*
*/
public class NonBlockServer extends Thread {
static final int PORT = 19001;
private ByteBuffer dataBuffer = ByteBuffer.allocate(1024);
private Selector selector = null;
private Charset charset = Charset.forName("GB2312"); public NonBlockServer() {
try {
startup();
} catch (IOException e) {
e.printStackTrace();
}
} public void startup() throws IOException {
selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress(PORT)); ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server startup...");
} /**
* 业务处理方法,这里只简单地返回系统时间
* @return
*/
public String bizMethod(){
return String.valueOf(System.nanoTime());
}
public void run() {
if (selector==null)
return;
while (true) {
try {
int num = selector.select();
if (num==0){
Thread.yield();
continue;
}
} catch (IOException e) {
e.printStackTrace();
} Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove(); if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc=null;
try {
sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
} System.out.println("Got connection from "
+ sc.socket().getInetAddress() + ":"
+ sc.socket().getPort());
} else if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel(); dataBuffer.clear();
try {
//这里未对读入的信息处理。在实际环境中,可能读到的是指令。另外也可能一次读不完,那么需要考虑多次读
int r = sc.read(dataBuffer);
sc.register(selector, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
dataBuffer.flip();
CharBuffer cb=charset.decode(dataBuffer);
String temp=cb.toString();
System.out.println("Read data from "
+ sc.socket().getInetAddress() + ":"
+ sc.socket().getPort() + " - "
+ temp+",length:"+temp.length());
} else if (key.isWritable()) {
dataBuffer.clear();
//这里没有考虑一次写不完的情况
dataBuffer.put(bizMethod().getBytes());
dataBuffer.put("\n".getBytes());
dataBuffer.flip();
SocketChannel sc = (SocketChannel) key.channel();
try {
sc.write(dataBuffer);
sc.register(selector, 0);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Write to ..."
+ sc.socket().getInetAddress() + ":"
+ sc.socket().getPort());
}
}
}
} public static void main(String[] args) throws IOException {
new NonBlockServer().start();
}
}
测试客户端代码:import java.net.*;
import java.io.*;public class JabberClient extends Thread {
public void run(){
try {
InetAddress addr;
addr = InetAddress.getByName("localhost");
Socket socket = new Socket(addr, NonBlockServer.PORT);
BufferedReader in = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
// Output is automatically flushed
// by PrintWriter:
PrintWriter out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())), true);
out.print("howdy");
out.flush();
String str = in.readLine();
socket.close();
System.out.println(str);
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
for (int i=0;i<10;i++){
new JabberClient().start();
}
}
} // /:~
http://mina.apache.org/index.html
非常感谢hbwhwang!!您说的很对!现在这种情况看起来还是用NIO好。我现在也很矛盾啊!这已经开发了几个星期了,要是全改为NIO,就算是时间来的急,也担心后面的性能调试不知会花多少时间。我想再问一下,如果我把这程序连接的Client限制到1000个左右,以后启动 N台服务器,这样应该可以吧?
轻松达到上K个客户端没有任何问题.
轻松达到上K个客户端没有任何问题.