/// <summary> /// 连接消息队列并从队列中接收消息 /// </summary> public static int ReceiveMessage(string fileName) {
byte[] data; int length,i=0; //打开已经存在的链接 MessageQueue queue = new MessageQueue(".\\private$\\queuenew"); queue.MessageReadPropertyFilter.CorrelationId = true; queue.MessageReadPropertyFilter.AppSpecific = true; try { //开启文件流 using (FileStream fs = File.OpenWrite(fileName)) {
MessageQueueTransaction Transaction = new MessageQueueTransaction(); //启动事务 Transaction.Begin(); //接收第一个消息 System.Messaging.Message msg = queue.Receive(new TimeSpan(0, 0, 1), Transaction); string id = msg.Id; //Get the Lenght of the message body stream length = Convert.ToInt32(msg.BodyStream.Length); //Create a buffer to hold the stream in memory data = new byte[length]; //Read the body stream msg.BodyStream.Read(data, 0, length); //Write the buffer into the file fs.Write(data, 0, length); //Receive following message i = 1; msg = queue.Peek(new TimeSpan(0, 0, 1)); while (msg.CorrelationId == id) { //Get the Lenght of the message body stream length = Convert.ToInt32(msg.BodyStream.Length); //Create a buffer to hold the stream in memory data = new byte[length]; //Read the body stream msg.BodyStream.Read(data, 0, length); //Write the buffer into the file fs.Write(data, 0, length); //Receive following message msg = queue.Receive(new TimeSpan(0, 0, 1), Transaction); id = msg.Id; //如何判断队列已经空了?? msg = queue.Peek(new TimeSpan(0, 0, 1)); i = 1; } Transaction.Commit(); } } catch (Exception ex) { i = 0; MessageBox.Show(ex.Message); } finally { queue.Refresh(); //release queue resources queue.Close(); } return i; }
/// <summary>
/// 连接消息队列并从队列中接收消息
/// </summary>
public static int ReceiveMessage(string fileName)
{
byte[] data;
int length,i=0;
//打开已经存在的链接
MessageQueue queue = new MessageQueue(".\\private$\\queuenew");
queue.MessageReadPropertyFilter.CorrelationId = true;
queue.MessageReadPropertyFilter.AppSpecific = true;
try
{
//开启文件流
using (FileStream fs = File.OpenWrite(fileName))
{
MessageQueueTransaction Transaction = new MessageQueueTransaction();
//启动事务
Transaction.Begin();
//接收第一个消息
System.Messaging.Message msg = queue.Receive(new TimeSpan(0, 0, 1), Transaction);
string id = msg.Id;
//Get the Lenght of the message body stream
length = Convert.ToInt32(msg.BodyStream.Length);
//Create a buffer to hold the stream in memory
data = new byte[length];
//Read the body stream
msg.BodyStream.Read(data, 0, length);
//Write the buffer into the file
fs.Write(data, 0, length);
//Receive following message
i = 1;
msg = queue.Peek(new TimeSpan(0, 0, 1));
while (msg.CorrelationId == id)
{
//Get the Lenght of the message body stream
length = Convert.ToInt32(msg.BodyStream.Length);
//Create a buffer to hold the stream in memory
data = new byte[length];
//Read the body stream
msg.BodyStream.Read(data, 0, length);
//Write the buffer into the file
fs.Write(data, 0, length);
//Receive following message
msg = queue.Receive(new TimeSpan(0, 0, 1), Transaction);
id = msg.Id;
//如何判断队列已经空了??
msg = queue.Peek(new TimeSpan(0, 0, 1));
i = 1;
}
Transaction.Commit();
}
}
catch (Exception ex)
{
i = 0;
MessageBox.Show(ex.Message);
}
finally
{
queue.Refresh();
//release queue resources
queue.Close();
}
return i;
}