使用消息队列 异步插入数据,能发送消息,但是无法读取消息,配置文件中的消息对列的path设置如下:<add key="StudentQueuePath" value="FormatName:DIRECT=OS:MachineName\Private$\RSStudents"/>代码如下:
private static readonly string queuePath = ConfigurationManager.AppSettings["StudentQueuePath"];
protected MessageQueue queue = new MessageQueue(queuePath);
protected TimeSpan timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds)); public virtual object Receive()
{
try
{
using (Message message = queue.Receive(timeout, transactionType))
return message;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
throw new TimeoutException(); throw;
}
}
提示错误“远程计算机不可用。”
为什么我能将消息发送,可是读取的时候却无法读取呢?是不是StudentQueuePath的设置有错误?
private static readonly string queuePath = ConfigurationManager.AppSettings["StudentQueuePath"];
protected MessageQueue queue = new MessageQueue(queuePath);
protected TimeSpan timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds)); public virtual object Receive()
{
try
{
using (Message message = queue.Receive(timeout, transactionType))
return message;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
throw new TimeoutException(); throw;
}
}
提示错误“远程计算机不可用。”
为什么我能将消息发送,可是读取的时候却无法读取呢?是不是StudentQueuePath的设置有错误?
system.messaging.messagequeue queue = new system.messaging.messagequeue(".\\private$\\msmqdemo");
// receive message, 同步的receive方法阻塞当前执行线程,直到一个message可以得到
system.messaging.message message = queue.receive();
message.formatter = new system.messaging.xmlmessageformatter(new type[] {typeof(string)});
txtreceivemessage.text = message.body.tostring();
<add key="StudentQueuePath" value="FormatName:DIRECT=OS:MachineName\Private$\RSStudents"/>
更改为
<add key="StudentQueuePath" value="FormatName:DIRECT=.\Private$\RSStudents"/>
发送的时候就会出错,提示“格式名无效”。发送代码如下:
public virtual void Send(object msg)
{
queue.Send(msg, transactionType);
}
这样就行了,然后你create这个queue了没有?
这里面的queuePath = ConfigurationManager.AppSettings["StudentQueuePath"];
这样为什么不行呢?非要用create吗?
给个示例代码给你,我以前写的。 /// <summary>
/// 利用系统message构造的消息队列,可在不同的进程之间共享队列
/// </summary>
/// <typeparam name="T"></typeparam>
public class MSMQueue<T>
{
private MessageQueue _queue;
private bool _noBlockThread = false;
private string _queuePath = "";
private int _dequeueThreadCount = 0;
/// <summary>
/// 是否不阻塞线程
/// </summary>
public bool NoBlockThread
{
get
{
return _noBlockThread;
}
} internal MessageQueue BaseQueue
{
get
{
return _queue;
}
} private const string EMTPYMESSAGE = "";
private string _queueName = ""; /// <summary>
/// 队列名称
/// </summary>
public string QueueName
{
get
{
if (string.IsNullOrEmpty(_queueName))
{
_queueName = _queue.QueueName.Split('\\')[1];
}
return _queueName;
}
} public MSMQueue(string queuePath)
{
bool isExists = CreateMessageQueue(queuePath);
if (isExists)
{
RepairCount();
}
} private const string _queueNamePrefix = @"private$\"; /// <summary>
/// 构造已知msmq队列
/// </summary>
/// <param name="queue"></param>
internal MSMQueue(MessageQueue queue)
{
_queue = queue;
_queuePath = string.Concat(@".\",_queueNamePrefix, QueueName);
//Count属性中会去修正InnerCount值
RepairCount(); } internal MSMQueue(MessageQueue queue, int dequeueThreadCount)
{
_queue = queue;
_queuePath = string.Concat(@".\", _queueNamePrefix, QueueName);
_dequeueThreadCount = dequeueThreadCount;
_noBlockThread = true;
RestoreCount();
//Count属性中会去修正InnerCount值
RepairCount();
} public MSMQueue(string queuePath, int dequeueThreadCount)
{
if (dequeueThreadCount <= 0)
{
throw new ArgumentException("获取消息的线程数必须大于0!");
}
//当队列中没有可用消息时,不阻塞线程
_noBlockThread = true;
_dequeueThreadCount = dequeueThreadCount;
bool isExists = CreateMessageQueue(queuePath);
//为每个读取消息的线程定义一个默认的空消息,这样当队列为空时,不会被阻塞
if (!isExists)
{
for (int i = 0; i < _dequeueThreadCount; i++)
{
EnqueueEmptyMessage();
}
}
}
/// <summary>
/// 如果是存在的队列,则需计算count值,如果当前空消息比dequeue的线程数量少,则要补上
/// </summary>
private void RestoreCount()
{
int count = 0;
Message[] messages = _queue.GetAllMessages();
//得到所有的消息,如果是空消息,则计数
foreach (Message msg in messages)
{
msg.Formatter = Formater;
if (msg.Body.Equals(EMTPYMESSAGE))
{
count++;
}
}
////可用的消息等于所有消息减去空消息
//_count = messages.Length - count;
//如果空消息的数量比当前定义的dequeue线程数少,则补上空消息
if (_dequeueThreadCount > count)
{
count = _dequeueThreadCount - count;
for (int i = 0; i < count; i++)
{
EnqueueEmptyMessage();
}
}
else if (_dequeueThreadCount < count)
{
_dequeueThreadCount = count;
}
} private static BinaryMessageFormatter formater = new BinaryMessageFormatter(); private static BinaryMessageFormatter Formater
{
get
{
return formater;
}
} /// <summary>
/// 插入空消息
/// </summary>
private void EnqueueEmptyMessage()
{
try
{
Message message = new Message();
message.Priority = MessagePriority.Lowest;
message.Body = EMTPYMESSAGE;
message.Formatter = Formater;
message.Label = _queuePath;
_queue.Send(message);
}
catch
{
}
} //是否让msmq将消息保存到本地磁盘
private bool _recoverable = false;
//当前消息数量
private long _count;
/// <summary>
/// 队列是否为空
/// </summary>
/// <returns></returns>
public bool IsEmpty()
{
return this.Count <= 0 || BaseQueueIsEmpty();
} /// <summary>
/// msmq队列是否为空
/// </summary>
/// <returns></returns>
private bool BaseQueueIsEmpty()
{
Message message = null;
bool empty = true;
try
{
message = _queue.Peek(timeouttime);
}
catch
{
}
if (message != null)
{
message.Formatter = Formater;
if (!message.Body.Equals(EMTPYMESSAGE))
{
empty = false;
}
}
return empty;
} /// <summary>
/// 数量
/// </summary>
public int Count
{
get
{
return (int)Interlocked.Read(ref _count);
}
} /// <summary>
/// 修正内存中的count
/// </summary>
public void RepairCount()
{
//如果内存中的数量为0但队列里的数量不为0,则要修正内存中的数量
int temp;
if (Count == 0 && InnerCount > 0)
{
temp = (int)(InnerCount - Count);
if (temp > 0)
{
Interlocked.Add(ref _count, temp);
}
}
} /// <summary>
/// 计数在MessageQueue队列实际存在的消息数量
/// </summary>
protected int InnerCount
{
get
{
try
{
return _queue.GetAllMessages().Length - _dequeueThreadCount;
}
catch
{
return 0;
}
}
} /// <summary>
/// 是否将消息保存到磁盘
/// </summary>
public bool Recoverable
{
get
{
return _recoverable;
}
set
{
_recoverable = value;
}
} /// <summary>
/// 压入消息
/// </summary>
/// <param name="entity"></param>
public void Enqueue(T entity)
{
Message message = new Message();
message.UseDeadLetterQueue = true;
message.Recoverable = Recoverable;
message.Formatter = Formater;
message.Label = _queuePath;
message.Body = entity;
_queue.Send(message);
Interlocked.Increment(ref _count);
} /// <summary>
/// 压入有关键字的消息
/// </summary>
/// <param name="key"></param>
/// <param name="entity"></param>
public void Enqueue(string key, T entity)
{
Message message = new Message();
message.UseDeadLetterQueue = true;
message.Recoverable = Recoverable;
message.CorrelationId = GetCoorelationId(key);
message.Formatter = Formater;
message.Label = _queuePath;
message.Body = entity;
_queue.Send(message);
Interlocked.Increment(ref _count);
} /// <summary>
/// 压入有优先级的消息
/// </summary>
/// <param name="entity"></param>
/// <param name="priority"></param>
public void Enqueue(T entity, QueuePriority priority)
{
Message message = new Message();
message.UseDeadLetterQueue = true;
message.Recoverable = Recoverable;
message.Priority = (MessagePriority)Convert.ToInt32(priority);
message.Formatter = Formater;
message.Label = _queuePath;
message.Body = entity;
_queue.Send(message);
Interlocked.Increment(ref _count);
} public void Enqueue(string key, T entity, QueuePriority priority)
{
Message message = new Message();
message.UseDeadLetterQueue = true;
message.Recoverable = Recoverable;
message.CorrelationId = GetCoorelationId(key);
message.Priority = (MessagePriority)Convert.ToInt32(priority);
message.Formatter = Formater;
message.Label = _queuePath;
message.Body = entity;
_queue.Send(message);
Interlocked.Increment(ref _count);
} /// <summary>
/// 判断当前queue的messagequeue是否存在如果不存在,则创建一个
/// </summary>
/// <returns></returns>
public bool CheckMessageQueueExists()
{
bool ret = MessageQueue.Exists(this._queuePath);
if (!ret)
{
CreateMessageQueue(this._queuePath);
}
return ret;
}
/// 压出消息
/// </summary>
/// <returns></returns>
public T Dequeue()
{
T msgRequest = default(T); // 从队列中获取任务消息
try
{
//当不阻塞线程模式时,
if (NoBlockThread)
{
msgRequest = ReceiveFromQueueNoBlock();
}
else
{
Message message = _queue.Receive();
message.Formatter = Formater;
Interlocked.Decrement(ref _count);
msgRequest = (T)message.Body;
}
return msgRequest;
}
catch
{
return msgRequest;
}
} private static TimeSpan timeouttime = new TimeSpan(0, 0, 3); /// <summary>
/// 在不阻塞线程的模式下获取消息
/// </summary>
/// <returns></returns>
private T ReceiveFromQueueNoBlock()
{
T messageBody = default(T);
if (Count > 0)
{
Message msgSrcMessage = null;
try
{
msgSrcMessage = _queue.Receive(timeouttime);
}
catch
{
return messageBody;
}
msgSrcMessage.Formatter = Formater;
if (msgSrcMessage.Body.Equals(EMTPYMESSAGE))
{
EnqueueEmptyMessage();
return default(T);
}
else
{
Interlocked.Decrement(ref _count);
messageBody = (T)msgSrcMessage.Body;
}
}
return messageBody;
} /// <summary>
/// 在不阻塞线程的模式下通过关键字获取消息
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
private T ReceiveFromQueueNoBlockByCorrelationId(string key)
{
T messageBody = default(T);
if (Count > 0)
{
try
{
Message msgSrcMessage = null;
try
{
msgSrcMessage = _queue.ReceiveByCorrelationId(GetCoorelationId(key), timeouttime);
}
catch
{
return messageBody;
}
msgSrcMessage.Formatter = Formater;
Interlocked.Decrement(ref _count);
messageBody = (T)msgSrcMessage.Body;
}
catch
{
}
}
return messageBody;
} /// <summary>
/// 将id转换为guid格式
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
private static string GetCoorelationId(string id)
{
if (!string.IsNullOrEmpty(id) && id.Length <= 32)
{
if (id.Length < 32)
{
id = id.PadLeft(32, '0');
}
id = id.Insert(8, "-").Insert(13, "-").Insert(18, "-").Insert(23, "-");
}
else
{
throw new Exception("CoorelationId太长!");
}
return string.Concat(id, @"\", id.Length);
} /// <summary>
/// 压出消息
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public T Dequeue(string key)
{
T msgRequest = default(T); // 从队列中获取任务消息
try
{
if (NoBlockThread)
{
msgRequest = ReceiveFromQueueNoBlockByCorrelationId(key);
}
else
{ Message msgSrcMessage = _queue.ReceiveByCorrelationId(GetCoorelationId(key));
msgSrcMessage.Formatter = Formater;
Interlocked.Decrement(ref _count);
msgRequest = (T)msgSrcMessage.Body;
}
// 重新构造任务消息对象 return msgRequest;
}
catch
{
return msgRequest;
}
}
private static object locker = new object();
/// <summary>
/// 创建MSMQ,已经存在则或略。
/// </summary>
protected bool CreateMessageQueue(string queuePath)
{
bool isExists = true;
try
{
if (queuePath.StartsWith(string.Concat(@".\",_queueNamePrefix), StringComparison.OrdinalIgnoreCase))
{
_queuePath = queuePath;
}
else
{
_queuePath = string.Concat(@".\", _queueNamePrefix, queuePath);
}
if (!MessageQueue.Exists(_queuePath))
{
lock (locker)
{
if (!MessageQueue.Exists(_queuePath))
{
// 创建消息队列
_queue = MessageQueue.Create(_queuePath, false);
// 附加 Everyone 访问权限
MessageQueueAccessControlEntry accessControl = new MessageQueueAccessControlEntry(new Trustee("Everyone"), MessageQueueAccessRights.FullControl);
_queue.SetPermissions(accessControl);
isExists = false;
}
else
{
_queue = new MessageQueue(_queuePath);
isExists = true;
}
}
}
else
{
_queue = new MessageQueue(_queuePath);
isExists = true;
}
//_queue.QueueName = queuePath;
_queue.Formatter = Formater;
MessageQueue.EnableConnectionCache = true;
}
catch (Exception ex)
{
throw new Exception("创建消息队列出错。", ex);
}
return isExists;
} public void Clear()
{
_queue.Purge();
} /// <summary>
/// 删除msmq消息队列
/// </summary>
/// <param name="queue"></param>
/// <param name="allowExistMessage"></param>
/// <returns></returns>
public static bool DeleteQueue(MSMQueue<T> queue, bool allowExistMessage)
{
try
{
lock (locker)
{
if (MessageQueue.Exists(queue._queuePath))
{
if ((allowExistMessage ? true : queue.InnerCount <= 0))
{
queue.BaseQueue.Close();
System.Messaging.MessageQueue.Delete(queue._queuePath);
return true;
}
}
}
}
catch
{ }
return false;
} /// <summary>
/// 恢复队列
/// </summary>
/// <param name="prefix"></param>
/// <param name="dequeueThreadCount">从队列里拿取的最大线程数</param>
/// <returns></returns>
public static MSMQueue<T>[] Reload(string prefix, int dequeueThreadCount)
{
prefix = string.Concat(_queueNamePrefix, prefix);
MessageQueue[] messagequeues = System.Messaging.MessageQueue.GetPrivateQueuesByMachine(System.Environment.MachineName);
List<MSMQueue<T>> msmQueues = new List<MSMQueue<T>>();
for (int i = 0; i < messagequeues.Length; i++)
{
if (messagequeues[i].QueueName.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
msmQueues.Add(new MSMQueue<T>(messagequeues[i], dequeueThreadCount));
}
}
return msmQueues.ToArray();
} /// <summary>
/// 恢复队列
/// </summary>
/// <param name="queuePath"></param>
/// <param name="dequeueThreadCount">从队列里拿取的最大线程数</param>
/// <returns></returns>
public static MSMQueue<T> ReloadMessageQueue(string queuePath, int dequeueThreadCount)
{
if (!queuePath.StartsWith(string.Concat(@".\",_queueNamePrefix), StringComparison.OrdinalIgnoreCase))
{
queuePath = string.Concat(@".\", _queueNamePrefix, queuePath);
}
MSMQueue<T> msmQueues = null;
if (MessageQueue.Exists(queuePath))
{
MessageQueue queue = new MessageQueue(queuePath);
msmQueues = new MSMQueue<T>(queue, dequeueThreadCount);
}
return msmQueues;
} /// <summary>
/// 恢复队列
/// </summary>
/// <param name="prefix"></param>
/// <returns></returns>
public static MSMQueue<T>[] Reload(string prefix)
{
prefix = string.Concat(_queueNamePrefix, prefix);
MessageQueue[] messagequeues = System.Messaging.MessageQueue.GetPrivateQueuesByMachine(System.Environment.MachineName);
List<MSMQueue<T>> msmQueues = new List<MSMQueue<T>>();
for (int i = 0; i < messagequeues.Length; i++)
{
if (messagequeues[i].QueueName.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
msmQueues.Add(new MSMQueue<T>(messagequeues[i]));
}
}
return msmQueues.ToArray();
} /// <summary>
/// 从死信队列恢复消息到队列
/// </summary>
/// <param name="prefix"></param>
public static void ReloadFromDeadLetterQueue(string prefix)
{
prefix = string.Concat("private$\\", prefix);
MessageQueue deadLetterQueue = new MessageQueue(string.Concat(System.Environment.MachineName, @"\Deadletter$"));
MessageEnumerator e = deadLetterQueue.GetMessageEnumerator2(); Dictionary<string, MessageQueue> messagequeues = new Dictionary<string, MessageQueue>();
MessageQueue queue;
while (e.MoveNext())
{
Message msg = e.RemoveCurrent();
string label = msg.Label;
if (label.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
if (messagequeues.ContainsKey(label))
{
queue = messagequeues[label];
}
else
{
queue = new MessageQueue(label);
messagequeues.Add(label, queue);
}
queue.Send(msg);
}
}
e.Close();
}
~MSMQueue()
{
if (_queue != null)
{
_queue.Close();
_queue.Dispose();
_queue = null;
}
}
}
private static object locker = new object();
/// <summary>
/// 创建MSMQ,已经存在则或略。
/// </summary>
protected bool CreateMessageQueue(string queuePath)
{
bool isExists = true;
try
{
if (queuePath.StartsWith(string.Concat(@".\",_queueNamePrefix), StringComparison.OrdinalIgnoreCase))
{
_queuePath = queuePath;
}
else
{
_queuePath = string.Concat(@".\", _queueNamePrefix, queuePath);
}
if (!MessageQueue.Exists(_queuePath))
{
lock (locker)
{
if (!MessageQueue.Exists(_queuePath))
{
// 创建消息队列
_queue = MessageQueue.Create(_queuePath, false);
// 附加 Everyone 访问权限
MessageQueueAccessControlEntry accessControl = new MessageQueueAccessControlEntry(new Trustee("Everyone"), MessageQueueAccessRights.FullControl);
_queue.SetPermissions(accessControl);
isExists = false;
}
else
{
_queue = new MessageQueue(_queuePath);
isExists = true;
}
}
}
else
{
_queue = new MessageQueue(_queuePath);
isExists = true;
}
//_queue.QueueName = queuePath;
_queue.Formatter = Formater;
MessageQueue.EnableConnectionCache = true;
}
catch (Exception ex)
{
throw new Exception("创建消息队列出错。", ex);
}
return isExists;
} public void Clear()
{
_queue.Purge();
} /// <summary>
/// 删除msmq消息队列
/// </summary>
/// <param name="queue"></param>
/// <param name="allowExistMessage"></param>
/// <returns></returns>
public static bool DeleteQueue(MSMQueue<T> queue, bool allowExistMessage)
{
try
{
lock (locker)
{
if (MessageQueue.Exists(queue._queuePath))
{
if ((allowExistMessage ? true : queue.InnerCount <= 0))
{
queue.BaseQueue.Close();
System.Messaging.MessageQueue.Delete(queue._queuePath);
return true;
}
}
}
}
catch
{ }
return false;
} /// <summary>
/// 恢复队列
/// </summary>
/// <param name="prefix"></param>
/// <param name="dequeueThreadCount">从队列里拿取的最大线程数</param>
/// <returns></returns>
public static MSMQueue<T>[] Reload(string prefix, int dequeueThreadCount)
{
prefix = string.Concat(_queueNamePrefix, prefix);
MessageQueue[] messagequeues = System.Messaging.MessageQueue.GetPrivateQueuesByMachine(System.Environment.MachineName);
List<MSMQueue<T>> msmQueues = new List<MSMQueue<T>>();
for (int i = 0; i < messagequeues.Length; i++)
{
if (messagequeues[i].QueueName.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
msmQueues.Add(new MSMQueue<T>(messagequeues[i], dequeueThreadCount));
}
}
return msmQueues.ToArray();
} /// <summary>
/// 恢复队列
/// </summary>
/// <param name="queuePath"></param>
/// <param name="dequeueThreadCount">从队列里拿取的最大线程数</param>
/// <returns></returns>
public static MSMQueue<T> ReloadMessageQueue(string queuePath, int dequeueThreadCount)
{
if (!queuePath.StartsWith(string.Concat(@".\",_queueNamePrefix), StringComparison.OrdinalIgnoreCase))
{
queuePath = string.Concat(@".\", _queueNamePrefix, queuePath);
}
MSMQueue<T> msmQueues = null;
if (MessageQueue.Exists(queuePath))
{
MessageQueue queue = new MessageQueue(queuePath);
msmQueues = new MSMQueue<T>(queue, dequeueThreadCount);
}
return msmQueues;
} /// <summary>
/// 恢复队列
/// </summary>
/// <param name="prefix"></param>
/// <returns></returns>
public static MSMQueue<T>[] Reload(string prefix)
{
prefix = string.Concat(_queueNamePrefix, prefix);
MessageQueue[] messagequeues = System.Messaging.MessageQueue.GetPrivateQueuesByMachine(System.Environment.MachineName);
List<MSMQueue<T>> msmQueues = new List<MSMQueue<T>>();
for (int i = 0; i < messagequeues.Length; i++)
{
if (messagequeues[i].QueueName.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
msmQueues.Add(new MSMQueue<T>(messagequeues[i]));
}
}
return msmQueues.ToArray();
} /// <summary>
/// 从死信队列恢复消息到队列
/// </summary>
/// <param name="prefix"></param>
public static void ReloadFromDeadLetterQueue(string prefix)
{
prefix = string.Concat("private$\\", prefix);
MessageQueue deadLetterQueue = new MessageQueue(string.Concat(System.Environment.MachineName, @"\Deadletter$"));
MessageEnumerator e = deadLetterQueue.GetMessageEnumerator2(); Dictionary<string, MessageQueue> messagequeues = new Dictionary<string, MessageQueue>();
MessageQueue queue;
while (e.MoveNext())
{
Message msg = e.RemoveCurrent();
string label = msg.Label;
if (label.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
if (messagequeues.ContainsKey(label))
{
queue = messagequeues[label];
}
else
{
queue = new MessageQueue(label);
messagequeues.Add(label, queue);
}
queue.Send(msg);
}
}
e.Close();
}
~MSMQueue()
{
if (_queue != null)
{
_queue.Close();
_queue.Dispose();
_queue = null;
}
}
}
// 摘要:
// 指定消息队列在消息传递到队列的过程中应用于该消息的优先级,以及指定何时将消息插入目标队列。
public enum QueuePriority
{
// 摘要:
// 最低消息优先级。
Lowest = 0,
//
// 摘要:
// 位于 Low 和 Lowest 消息优先级之间。
VeryLow = 1,
//
// 摘要:
// 低消息优先级。
Low = 2,
//
// 摘要:
// 普通消息优先级。
Normal = 3,
//
// 摘要:
// 位于 System.Messaging.MessagePriority.High 和 System.Messaging.MessagePriority.Normal
// 消息优先级之间。
AboveNormal = 4,
//
// 摘要:
// 高消息优先级。
High = 5,
//
// 摘要:
// 位于 Highest 和 High 消息优先级之间。
VeryHigh = 6,
//
// 摘要:
// 最高消息优先级。
Highest = 7,
}