自己编写了一个简单proxy程序,用于telnet协议的.头一次编写多线程程序,
结果出现很多无法处理的问题.
现在程序遇到这样的问题:连接进来几个客户端后,运行一段时间就会没有响应,CPU占用率100%
也不报错,哪怕抛出异常也行,搞得我没法调试.
程序大体结构是这样的:
每个客户端连接进来程序用一个会话处理客户端的请求,会话使用四个线程:
一个接收客户端的数据放到缓冲区
一个从缓冲区取数据,进行处理,然后发往服务器端
另外2个处理来自服务器端的数据,类似于处理客户端的线程下面是从缓冲区发往服务器的函数
private void BufferToServer()
{
while (true)
{
try
{
lock (lockerForServer)
{
//等待缓冲区有数据
while (BufferForCommand.Peek().Count == 0)
Monitor.Wait(lockerForServer);
//取数据ce
//.......
}
//进行处理
if (ce.Command == "#break" && ce.Owner == this)
{ break;
}
try
{
RaiseEvent(
"从客户端接受到一个命令",
ce,
(addin) =>
{
addin.OnReceiveCommand(ce);
}
);
}
catch (Exception e)
{
SendLine(e.Message);
} if (ce.Blocked)
continue; if (!ce.Blocked && ServerConnected)
{
lock (SendedCommand)
SendedCommand.Enqueue(ce);
RaiseEvent(
"向服务器发送了命令",
ce,
(addin) =>
{
//内部可能会调用SendDataToServer
addin.OnSendedCommand(ce);
}
);
LastTimeSendedDataToServer = DateTime.Now;
ce.SendedTime = DateTime.Now;
//发往服务器端
Server.SendData(ce.Command + "\r\n");
}
} catch (SocketException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
}
} //向缓冲区放数据
private void SendDataToServer(List<CommandEvent> cmds)
{
if (cmds == null || cmds.Count == 0)
return;
lock (lockerForServer)
{
if (cmds[0].Emergent || BufferForCommand.Count == 1)
{
foreach (var e in cmds)
BufferForCommand.Peek().Enqueue(e);
}
else
{
CommandQueue top = BufferForCommand.Pop();
foreach (var e in cmds)
{ BufferForCommand.Peek().Enqueue(e);
}
BufferForCommand.Push(top);
}
Monitor.Pulse(lockerForServer);
} }
下面是底层socket处理接收和发送的函数 //从socket接收数据放到缓冲区
private void EndReceive(IAsyncResult ar)
{
try
{
if (!Connected)
throw new SocketException(0);
Socket s = (Socket)ar.AsyncState;
int byteRead = Sck.EndReceive(ar);
char[] charbuffer = new char[decoder.GetCharCount(buffer, 0, byteRead)];
int charlen = decoder.GetChars(buffer, 0, byteRead, charbuffer, 0);
StringBuilder sb = null;
StringBuilder last = null;
lock (locker)
{
if (qBuffer.Count > 0)
{
last = qBuffer.Last(); if (last.Length == 0 || ControlChars.IndexOf(last[last.Length - 1]) < 0)
sb = last;
}
if (sb == null)
{
sb = new StringBuilder();
qBuffer.Enqueue(sb);
}
for (int i = 0; i < charlen; i++)
{
sb.Append(charbuffer[i]);
if (ControlChars.IndexOf(charbuffer[i]) >= 0)
{
sb = new StringBuilder();
if (i < charlen - 1)
qBuffer.Enqueue(sb);
}
}
Monitor.Pulse(locker);
}
Sck.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, EndReceive, Sck);
}
catch (Exception e)
{
Type ExceptionType=e.GetType();
if (ExceptionType == typeof(SocketException) || ExceptionType == typeof(ObjectDisposedException))
{
Close();
if (OnClosed != null)
OnClosed();
}
else
throw e;
} }//从缓冲区提取数据然后通知上层会话对象进行处理
ThreadPool.QueueUserWorkItem(
(state) =>
{
try
{
StringBuilder sb;
while (true)
{
lock (locker)
{
if (receiving == false)
break; while (qBuffer.Count == 0)
Monitor.Wait(locker); sb = qBuffer.Peek(); if (sb.Length > 0 && ControlChars.IndexOf(sb[sb.Length - 1]) >= 0)
{
qBuffer.Dequeue(); }
else
{
Monitor.Wait(locker, 1000);
qBuffer.Dequeue();
}
}
if (OnReceiveLine != null)
{//通知会话对象
OnReceiveLine(sb.ToString());
continue;
} }
}
finally
{ }
}
);
向高手请教下,多线程出现这样的问题有可能是哪里犯错了:死锁?对象不是线程安全?
结果出现很多无法处理的问题.
现在程序遇到这样的问题:连接进来几个客户端后,运行一段时间就会没有响应,CPU占用率100%
也不报错,哪怕抛出异常也行,搞得我没法调试.
程序大体结构是这样的:
每个客户端连接进来程序用一个会话处理客户端的请求,会话使用四个线程:
一个接收客户端的数据放到缓冲区
一个从缓冲区取数据,进行处理,然后发往服务器端
另外2个处理来自服务器端的数据,类似于处理客户端的线程下面是从缓冲区发往服务器的函数
private void BufferToServer()
{
while (true)
{
try
{
lock (lockerForServer)
{
//等待缓冲区有数据
while (BufferForCommand.Peek().Count == 0)
Monitor.Wait(lockerForServer);
//取数据ce
//.......
}
//进行处理
if (ce.Command == "#break" && ce.Owner == this)
{ break;
}
try
{
RaiseEvent(
"从客户端接受到一个命令",
ce,
(addin) =>
{
addin.OnReceiveCommand(ce);
}
);
}
catch (Exception e)
{
SendLine(e.Message);
} if (ce.Blocked)
continue; if (!ce.Blocked && ServerConnected)
{
lock (SendedCommand)
SendedCommand.Enqueue(ce);
RaiseEvent(
"向服务器发送了命令",
ce,
(addin) =>
{
//内部可能会调用SendDataToServer
addin.OnSendedCommand(ce);
}
);
LastTimeSendedDataToServer = DateTime.Now;
ce.SendedTime = DateTime.Now;
//发往服务器端
Server.SendData(ce.Command + "\r\n");
}
} catch (SocketException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
}
} //向缓冲区放数据
private void SendDataToServer(List<CommandEvent> cmds)
{
if (cmds == null || cmds.Count == 0)
return;
lock (lockerForServer)
{
if (cmds[0].Emergent || BufferForCommand.Count == 1)
{
foreach (var e in cmds)
BufferForCommand.Peek().Enqueue(e);
}
else
{
CommandQueue top = BufferForCommand.Pop();
foreach (var e in cmds)
{ BufferForCommand.Peek().Enqueue(e);
}
BufferForCommand.Push(top);
}
Monitor.Pulse(lockerForServer);
} }
下面是底层socket处理接收和发送的函数 //从socket接收数据放到缓冲区
private void EndReceive(IAsyncResult ar)
{
try
{
if (!Connected)
throw new SocketException(0);
Socket s = (Socket)ar.AsyncState;
int byteRead = Sck.EndReceive(ar);
char[] charbuffer = new char[decoder.GetCharCount(buffer, 0, byteRead)];
int charlen = decoder.GetChars(buffer, 0, byteRead, charbuffer, 0);
StringBuilder sb = null;
StringBuilder last = null;
lock (locker)
{
if (qBuffer.Count > 0)
{
last = qBuffer.Last(); if (last.Length == 0 || ControlChars.IndexOf(last[last.Length - 1]) < 0)
sb = last;
}
if (sb == null)
{
sb = new StringBuilder();
qBuffer.Enqueue(sb);
}
for (int i = 0; i < charlen; i++)
{
sb.Append(charbuffer[i]);
if (ControlChars.IndexOf(charbuffer[i]) >= 0)
{
sb = new StringBuilder();
if (i < charlen - 1)
qBuffer.Enqueue(sb);
}
}
Monitor.Pulse(locker);
}
Sck.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, EndReceive, Sck);
}
catch (Exception e)
{
Type ExceptionType=e.GetType();
if (ExceptionType == typeof(SocketException) || ExceptionType == typeof(ObjectDisposedException))
{
Close();
if (OnClosed != null)
OnClosed();
}
else
throw e;
} }//从缓冲区提取数据然后通知上层会话对象进行处理
ThreadPool.QueueUserWorkItem(
(state) =>
{
try
{
StringBuilder sb;
while (true)
{
lock (locker)
{
if (receiving == false)
break; while (qBuffer.Count == 0)
Monitor.Wait(locker); sb = qBuffer.Peek(); if (sb.Length > 0 && ControlChars.IndexOf(sb[sb.Length - 1]) >= 0)
{
qBuffer.Dequeue(); }
else
{
Monitor.Wait(locker, 1000);
qBuffer.Dequeue();
}
}
if (OnReceiveLine != null)
{//通知会话对象
OnReceiveLine(sb.ToString());
continue;
} }
}
finally
{ }
}
);
向高手请教下,多线程出现这样的问题有可能是哪里犯错了:死锁?对象不是线程安全?
感觉怎么用了这么多的lock和Monitor呢