if(args.Length != 0) app.CreateWait(Convert.ToInt16(args[0])); else app.CreateWait(); // // TODO: 在此处添加代码以启动应用程序 // } /// <summary> /// 构造 /// </summary> public TUploadServer() { //Socket Packet Array aSock = new TSocketPacket[MAX_CONNECT]; for(int i = 0; i< MAX_CONNECT; i++) aSock[i] = new TSocketPacket(); // byte process m_byte = new EIIByte(); nSocketNum = 0; bSend = new byte[ONCE_COUNT]; m_ascn = new ASCIIEncoding(); pfnWorkerCallBack = new AsyncCallback(OnDataReceived); proOnAccept = new AsyncCallback(OnAccept); bCheck = new Byte[2]; int pos = 0; m_byte.SetWord(bCheck,CMD_S_CHECK,ref pos); // pfnWorkerCallBack = new AsyncCallback(OnReceive); // proOnSend = new AsyncCallback(OnSend);
} /// <summary> /// 创建等待连接 /// </summary> public void CreateWait(int port) { int nCount = 0;
IPAddress ip = Dns.GetHostByName(Dns.GetHostName()).AddressList[0]; // IPAddress ipAddress = IPAddress.Parse(IP); IPEndPoint ipe = new IPEndPoint(ip,port); sWait = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sWait.Bind(ipe); sWait.Listen(MAX_CONNECT); sWait.BeginAccept(proOnAccept,sWait); Console.WriteLine("The server is running at port {0}...",port); Console.WriteLine("The local End point is :" + ip.ToString() ); Console.WriteLine("Waiting for a connection.....");
完成端口一个完成端口其实就是一个通知队列,由操作系统把已经完成的重叠I/O请求的通知放入其中。当某项I/O操作一旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知。而套接字在被创建后,可以在任何时候与某个完成端口进行关联。通常情况下,我们会在应用程序中创建一定数量的工作者线程来处理这些通知。线程数量取决于应用程序的特定需要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的CPU时间,在此期间该线程可以运行,然后另一个线程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它线程来充分利用这些时间片。
///////////////////////////////////////////////////
//#define LAN
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.IO;using EII.Base;namespace EII.Upload
{ /// <summary>
/// TUploadServer 的摘要说明。
/// </summary>
class TUploadServer
{
private Socket sWait = null; //wait Socket
private EIIByte m_byte = null;
private ASCIIEncoding m_ascn ;
/// <summary>
/// Const define
/// </summary>
private const int PORT = 5656; private const int MAX_CONNECT = 50;
#if LAN
private const int ONCE_COUNT = 1024 * 1024; //once count byte
#else
private const int ONCE_COUNT = 1024;
#endif const int OPEN_FILE_FAILED = 2,
FILE_ALWAYS_OPENED = 3,
ALL_RIGHT = 1;
const short CMD_C_REQUERY = 0x0301, //客户端发送要求上传命令
CMD_S_REQUERY = 0x0302, //服务器响应
CMD_C_DETAIL = 0x0303, //客户端发送上传文件内容
CMD_S_DETAIL = 0x0304, //响应
CMD_S_CHECK = 0x0306; //服务器心跳检测 private int nSocketNum =0;
private TSocketPacket[] aSock; private byte[] bSend;
private byte[] bCheck; protected AsyncCallback pfnWorkerCallBack ;
protected AsyncCallback proOnAccept ; /// <summary>
/// 上传文件包
/// </summary>
public class TSocketPacket
{
public System.Net.Sockets.Socket thisSocket;
public byte[] dataBuffer = new byte[ONCE_COUNT]; public FileStream m_file ;
public int nTimes ;
public int nCurTime ;
public TSocketPacket()
{
Reset();
} /// <summary>
/// 清空
/// </summary>
public void Reset()
{ nTimes = 0;
nCurTime = 0;
if(m_file != null)
{
try
{
m_file.Flush();
m_file.Close();
}
catch
{
} finally
{
m_file = null;
}
} if( thisSocket != null)
{
try
{
thisSocket.Close();
}
catch
{
}
finally
{
thisSocket = null;
}
}
} // public bool bCmd = true;
}
/// <summary>
/// 应用程序的主入口点。
/// </summary>
[STAThread]
static void Main(string[] args)
{
// TcpListener myList=new TcpListener(ipAd,PORT);
//
// myList.Start(); TUploadServer app = new TUploadServer();
if(args.Length != 0)
app.CreateWait(Convert.ToInt16(args[0]));
else
app.CreateWait(); //
// TODO: 在此处添加代码以启动应用程序
//
}
/// <summary>
/// 构造
/// </summary>
public TUploadServer()
{
//Socket Packet Array
aSock = new TSocketPacket[MAX_CONNECT];
for(int i = 0; i< MAX_CONNECT; i++)
aSock[i] = new TSocketPacket(); // byte process
m_byte = new EIIByte(); nSocketNum = 0; bSend = new byte[ONCE_COUNT]; m_ascn = new ASCIIEncoding(); pfnWorkerCallBack = new AsyncCallback(OnDataReceived); proOnAccept = new AsyncCallback(OnAccept); bCheck = new Byte[2];
int pos = 0; m_byte.SetWord(bCheck,CMD_S_CHECK,ref pos);
// pfnWorkerCallBack = new AsyncCallback(OnReceive); // proOnSend = new AsyncCallback(OnSend);
}
/// <summary>
/// 创建等待连接
/// </summary>
public void CreateWait(int port)
{
int nCount = 0;
IPAddress ip = Dns.GetHostByName(Dns.GetHostName()).AddressList[0]; // IPAddress ipAddress = IPAddress.Parse(IP);
IPEndPoint ipe = new IPEndPoint(ip,port); sWait = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sWait.Bind(ipe);
sWait.Listen(MAX_CONNECT);
sWait.BeginAccept(proOnAccept,sWait); Console.WriteLine("The server is running at port {0}...",port);
Console.WriteLine("The local End point is :" + ip.ToString() );
Console.WriteLine("Waiting for a connection.....");
while(true)
{
System.Threading.Thread.Sleep(50);
nCount ++;
//每60秒检查一次
if(nCount >= 1200)
{
ActiveCheck();
nCount = 0;
}
}
} /// <summary>
/// 创建等待连接
/// </summary>
public void CreateWait()
{
CreateWait(PORT);
}
/// <summary>
/// 等待接收数据
/// </summary>
/// <param name="nIndex"></param>
private void WaitForData(int nIndex)
{
// Console.WriteLine("WaitForData {0}",soc.RemoteEndPoint.ToString());
try
{
TSocketPacket theSocPkt = aSock[nIndex]; if( theSocPkt == null)
return ;
// now start to listen for any data...
theSocPkt.thisSocket .BeginReceive (theSocPkt.dataBuffer ,0,theSocPkt.dataBuffer.Length ,SocketFlags.None,pfnWorkerCallBack,nIndex);
} catch(SocketException se)
{
ProcessSocketException(se,nIndex);
} }
////刚用C#时写的,莫笑 /// <summary>
/// 接收数据,并处理
/// </summary>
/// <param name="asyn"></param>
public void OnDataReceived(IAsyncResult asyn)
{
//send : CMD(WORD = 0x0301)+filename's length(WORD)+filename(target)+size(DWORD)+times(WORD),
//Receive: CMD(WORD = 0x0302)+reday status(BYTE 1= OK,2 = FAIL) //check
if( !((asyn.AsyncState) is int))
return; string sFileName = null;
short nNameLength = 0;
int nSize = 0;
byte byOper = ALL_RIGHT; //打开文件操作结果代码 int nIndex = (int)asyn.AsyncState; if(nIndex >= MAX_CONNECT || nIndex <0 )
return ;
TSocketPacket theSockId = aSock[nIndex] ; if(theSockId == null)
return ; try
{
//end receive...
int iRx = theSockId.thisSocket.EndReceive (asyn);
// Console.WriteLine("Received Data From {0},length = {1}",theSockId.thisSocket.RemoteEndPoint.ToString(),iRx);
int pos = 0; int nCmd = m_byte.GetWord(theSockId.dataBuffer,ref pos); if(nCmd == CMD_C_REQUERY) //requery
{ nNameLength = m_byte.GetWord(theSockId.dataBuffer,ref pos);
sFileName = m_ascn.GetString(theSockId.dataBuffer,pos,nNameLength); pos += nNameLength; nSize = m_byte.GetDword(theSockId.dataBuffer,ref pos); theSockId.nTimes= m_byte.GetDword(theSockId.dataBuffer,ref pos); Console.WriteLine("Get Requery file = '{0}',size = {1} ,times= {2}",sFileName,nSize,theSockId.nTimes); // assert m_file not opened
if (theSockId.m_file != null)
//return ;
byOper = FILE_ALWAYS_OPENED; try //open the file
{ theSockId.m_file = File.Open(sFileName,FileMode.OpenOrCreate,FileAccess.Write,FileShare.None ); Console.WriteLine("Open or Create file '{0}'",sFileName); byOper = ALL_RIGHT; theSockId.nCurTime = 0; } catch(Exception e)
{
Console.WriteLine(e.Message+","+sFileName);
byOper = OPEN_FILE_FAILED;
} //组织回应命令
pos = 0; m_byte.SetWord(bSend,CMD_S_REQUERY,ref pos); m_byte.SetByte(bSend,new byte[]{byOper},ref pos);
theSockId.thisSocket.Send(bSend,0,pos,SocketFlags.None );
}
else if(nCmd == CMD_C_DETAIL) //detail
{
// Console.WriteLine("Recevied file detail info {0} times",nCurTime);
// assert m_file is opened
if(theSockId.m_file == null)
return ;
//close
theSockId.nCurTime = m_byte.GetDword(theSockId.dataBuffer,ref pos); theSockId.m_file.Write(theSockId.dataBuffer, pos,iRx - pos); //每十个包保存一次
if((theSockId.nCurTime /10 ) == 0)
//save
theSockId.m_file.Flush(); //组织回应命令
pos = 0; m_byte.SetWord(bSend,CMD_S_DETAIL,ref pos);
m_byte.SetDword(bSend,theSockId.nCurTime,ref pos);
theSockId.thisSocket.Send(bSend,0,pos,SocketFlags.None); if( theSockId.nCurTime == theSockId.nTimes )
{
//save
theSockId.m_file.Flush();
theSockId.m_file.Close();
theSockId.m_file = null;
theSockId.nTimes = 0;
// sFileName= null;
} }
WaitForData(nIndex ); }// catch (ObjectDisposedException )
// {
// System.Diagnostics.Debugger.Log(0,"1","\nOnDataReceived: Socket has been closed\n");
// } catch(SocketException se)
{
ProcessSocketException(se,nIndex);
}
}
/// <summary>
/// 接受新连接,并保存到连接数组
/// </summary>
/// <param name="ar"></param>
private void OnAccept(IAsyncResult ar)
{
try
{
Socket sWait = (Socket)ar.AsyncState;
//new connect socket
Socket sNew = sWait.EndAccept(ar);
sWait.BeginAccept(proOnAccept,sWait);
Console.Write(sNew.RemoteEndPoint.ToString());
// 超过最大连接数
if(nSocketNum >= MAX_CONNECT)
{
sNew.Close();
return ;
}
for(int i =0; i< MAX_CONNECT ; i++)
{
if(aSock[i].thisSocket == null)
{
aSock[i].thisSocket = sNew; //insert
WaitForData(i);
nSocketNum ++;
Console.WriteLine(" insert a Socket ,position = {0},count = {1}",i,nSocketNum);
break;
} }
}
catch(ObjectDisposedException)
{
System.Diagnostics.Debugger.Log(0,"1","\n OnClientConnection: Socket has been closed\n");
}
catch(SocketException se)
{
Console.WriteLine ( se.Message );
} //
}
/// <summary>
/// 处理连接错误,断开连接,释放资源
/// </summary>
/// <param name="se"></param>
/// <param name="nIndex"></param>
private void ProcessSocketException(SocketException se,int nIndex)
{
if(se != null)
Console.WriteLine ("Error code = {0},message = '{1}'",se.ErrorCode,se.Message ); //断开连接
if(aSock[nIndex].thisSocket != null)
{
aSock[nIndex].Reset();
if( nSocketNum > 0)
nSocketNum --;
Console.WriteLine("remove a Client Socket ,position= {0},count = {1}",nIndex,nSocketNum);
}
}
//好多垃圾
/// <summary>
/// 活动检测
/// </summary>
private void ActiveCheck()
{
for(int i =0, j= 0; (i< MAX_CONNECT) &&( j < nSocketNum); i++)
{
if(aSock[i].thisSocket == null)
continue;
try
{
if(aSock[i].thisSocket.Connected )
aSock[i].thisSocket.Send(bCheck);
else
ProcessSocketException(null,i);
}
catch(Exception )
{
ProcessSocketException(null,i);
} j ++;
}
} }
}
服务开始时启动一个主线程,如果有客户端连接时就产生一个线程处理与客户端的连接。我把线程和客户端封装在类ClientThread中,另处还有一个类Protocol处理数据传输,我是根据java中的思路来做的,不过效率好像很低,有兴趣交流一下
http://expert.csdn.net/Expert/TopicView1.asp?id=1567017
服务程序应该尽量少的,开辟释放内存,事先开辟好足够的内存自己做内存管理(可以参看unix的内存管理)
服务程序尽量少的减少io操作,例如文件的读取,可以采用内存映像文件。服务程序与客户端通讯的协议要定义清楚规范,尽量少的减少数据的传递
还有一些常用的方法,例如:解析xml的时候不要用XmlDocument,数据量大的时候非常慢,改用XmlReader,自己控制。
还有一些,像数据缓冲,连接管理,等有功夫自己好好研究。
而客户端主要是因为超时或网络问题出现无法连接的错误。
目前解决方案如下:
在服务端采用分批传送的方法,能够使服务器内存有所降低,也使客户端响应速度大大加快。
并且使用传输完成标示,如果客户端没有收到完成标示,就一直尝试读取,直到超时。
为了能应付大规模访问,服务端采用Http的请求响应方式,一旦请求完成就自动断开。
但无法解决的问题还有,服务断内存一直上涨,无法释放,我用一线程定时GC.Collect(),但
不是很管用。
====
服务程序应该尽量少的开辟线程,通讯用异步来完成。
服务程序应该尽量少的,开辟释放内存,事先开辟好足够的内存自己做内存管理(可以参看unix的内存管理)
====
请问为什么要少的开辟线程,我的服务端需要大量的访问(实时),不用线程能行吗?
c#如何控制内存,unix不太懂。请dx指教指教。另:多谢大家的热情帮助!!!!!
//turui真是个热心的人!3k him!C#我用的时间也不长,以前写的服务程序是用纯c++写的
多线程非常消耗系统资源,系统光调度这些线程就已经很辛苦了
用线程来避免阻塞是一种好的方式,简单的方式,但是效率最差的方式
win封装好的异步socket可以很好的避免阻塞,效率也相对更好
(我们以前写的服务程序,一台机器8000-12000用户在线,算上主程序一共也只用了6个线程,主线程一个,时间扫描一个,其他的4个,通讯上几乎没用线程)
另外关于内存管理,经过实践发现,系统的开辟释放内存也非常消耗系统资源,我们预先开辟好足够的内存,他们之间用套机制管理起来,以后当我们要用到内存的时候,我们不再 new,而是我们自己内存管理的new方法,当然也会有相应的 释放方法,只是我们的释放是把不用的内存再从新返回到以开辟好的内存链表中。
另一些常用的细节我在回复中也说明白了。你自己多想想把,写一个好的服务程序不是一下能写完的。
最主要的是:一、稳定性,需要内存控制要好,但及其内存较大(2-4G)。二、效率,除了小部分查询外,大部分是适时的操作,某个大的查询运行时,同时进行的事务操作必须不能等待。其实这个问题到这应该结束了,但我觉得这是网络编程的一个难点,应该有一个更深入透彻的
方案。希望对这个问题感兴趣的朋友再进一步研究,最终给大家公布一个满意的结果。有兴趣的请msn联系:[email protected],共同关注。