using System; using System.Xml; using Finstone.Configuration; using System.Configuration; using MQAX200; using System.Threading;namespace Finstone.MessageQueue { /// <summary> /// A sample implementation of IMessageQueue for MQSeries technology. /// It allows developers to send, retrieve messages from MQSeries and register /// event for new message arrivals. /// </summary> public class MQSeries : IMessageQueue { private ConfigurationManager cm; private MessageArrivalHandler handler; private string queueManager; private string QueueName; private MQQueue queue; private MQSession queueSession; private MQQueueManager mqm; private bool queueListenerStarted; private int sleepTime; /// <summary> /// Constructor that retrieve the queue related information /// for MessageQueueConfiguration object /// </summary> /// <param name="queueName">the name of the queue</param> public MQSeries(string queueName) { cm = (ConfigurationManager)ConfigurationSettings.GetConfig("Framework"); XmlNode queueInfo = cm.MessageQueueConfig.RetrieveQueueInformation("*[@name='" + queueName + "']" ); queueManager = queueInfo.SelectSingleNode("QueueManager").InnerText; QueueName = queueInfo.SelectSingleNode("QueueName").InnerText; sleepTime = Int32.Parse(queueInfo.SelectSingleNode("SleepTime").InnerText); queueSession = new MQSessionClass(); } /// <summary> /// send the message to the MQSeries's queue /// </summary> /// <param name="m">a message object to be sent</param> public void Send(Message m) { //create a new MQSeries message MQMessage message = (MQMessage)queueSession.AccessMessage(); message.WriteString(m.Content.ToString()); MQPutMessageOptions messageOption = (MQPutMessageOptions)queueSession.AccessPutMessageOptions();
//send the message to the MQSeries queue queue.Put(message,messageOption);
} /// <summary> /// Retrieve message from the MQSeries's queue /// </summary> /// <returns></returns> public Message Retrieve() { //create a new message MQMessage message = (MQMessage)queueSession.AccessMessage(); MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions();
//fill the new message object with message from he queue //unlike MSMQ, GET is not a blocking call, instead, it raise //an exception if trying to retrieve message from a queue that is emtpy. queue.Get(message,messageOption,System.Reflection.Missing.Value);
//create a new message object that contains the //message from the queue. Message m = new Message(); m.Content = message.ReadString(message.MessageLength); m.Label = message.MessageId; return m; } /// <summary> /// event for arrival messages to the queue /// </summary> public event MessageArrivalHandler MessageArrival { //when new handler is register for the event, start the listener //if it is not yet started add { handler += value; if (queueListenerStarted != true) { //create a new thread to listen on the queue ThreadStart ts = new ThreadStart(StartQueueListener); Thread t = new Thread(ts); t.Start(); } } remove { handler -= value; //stop the listener if no handler is listed if (handler == null || handler.GetInvocationList().Length <= 0) { StopQueueListener(); } } } /// <summary> /// Start the listen to the queue for incoming messages and /// notifiy the handlers as new messges arrive /// </summary> public void StartQueueListener() { //create a separate connection to the message queue queueListenerStarted = true; MQQueueManager listenermqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager); MQQueue listenerqueue =(MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","",""); listenerqueue.Open(); try { MQMessage message = (MQMessage)queueSession.AccessMessage(); MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions(); while(queueListenerStarted == true) { System.Threading.Thread.Sleep(sleepTime); if (handler.GetInvocationList().Length > 0) { try { //GET will raise an exception if no message is in the queue. //we want to keep listening despite of the exception, see exception block //for detail listenerqueue.Get(message,messageOption,System.Reflection.Missing.Value); Finstone.MessageQueue.Message FinstoneMessage = new Finstone.MessageQueue.Message(); FinstoneMessage.Label = message.MessageId; FinstoneMessage.Content = message.ReadString(message.MessageLength); //fire the event handler(FinstoneMessage,QueueName); } catch (System.Runtime.InteropServices.COMException ex) { //-2147467259 represents the error code for retrieving //message from an empty queue. do nothing if gotting this error code. if (ex.ErrorCode != -2147467259) { throw ex; } } } } } finally { //close the connetion listenerqueue.Close(); listenermqm.Disconnect(); } } /// <summary> /// stop the listener /// </summary> public void StopQueueListener() { queueListenerStarted = false;
}
/// <summary> /// open the connection to the message queue /// </summary> public void Open() { mqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager); queue = (MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","",""); queue.Open(); } /// <summary> /// close the connection to the message queue /// </summary> public void Close() { if (queue != null) { queue.Close(); } if (mqm != null) { mqm.Disconnect(); } } } }
using System; using System.Configuration; using Finstone.Configuration;namespace Finstone.MessageQueue { /// <summary> /// MessageQueueManager contains the API that developers /// would interact with the most. It provide the "bridge" /// to the underlying implementation of message queueing. /// This compnoent show most simplified linkage between MessageQueueManager /// and underlying Message queue. The more dissimilar between IMessageManager and the /// underlying Message Queue, the more complex MessageQueueManager becomes. /// </summary> public class MessageQueueManager : IMessageQueueManager { private IMessageQueue mq; private ConfigurationManager cm; public MessageQueueManager(IMessageQueue messageQueue) { mq = messageQueue; } /// <summary> /// pass the call the actual message queue implementation /// </summary> /// <param name="m">message object</param> public void SendMessage(Message m) { mq.Send(m); } /// <summary> /// pass the call the actual message queue implementation /// </summary> /// <returns>the retrieved message from the queue</returns> public Message RetrieveMessage() { return mq.Retrieve(); } /// <summary> /// register event hanlder that trigger when new message arrives at the queue /// </summary> /// <param name="mah">MessageArrivalHandler delegate</param> public void RegisterMessageArrivalHanlder(MessageArrivalHandler mah) { mq.MessageArrival += mah; } /// <summary> /// unregister event handler that trigger when new message arrives at the queue. /// </summary> /// <param name="mah">MessageArrivalHandler delegate</param> public void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah) { mq.MessageArrival -= mah; } /// <summary> /// open the connection of the underlying mesage queue /// </summary> public void OpenConnection() { mq.Open(); } /// <summary> /// close the connection of the underlying message queue /// </summary> public void CloseConnection() { mq.Close(); } } }
using System;namespace Finstone.MessageQueue { /// <summary> /// Containing the interface and class referenced in /// the message queue impelmentation class /// </summary> public delegate void MessageArrivalHandler (Message m, string queueName);
/// <summary> /// interface that every message queue implementation must implements /// </summary> public interface IMessageQueue { void Send(Message message); Message Retrieve(); event MessageArrivalHandler MessageArrival; void Open(); void Close();
} /// <summary> /// Interface for MessageQueueManager. it is used by /// client application to interact with MessageQueueManager. /// </summary> public interface IMessageQueueManager { void SendMessage(Message message); Message RetrieveMessage(); void RegisterMessageArrivalHanlder(MessageArrivalHandler mah); void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah); void OpenConnection(); void CloseConnection(); } /// <summary> /// A basic Message class, can be extended to create more /// implmentation specific message class /// </summary> public class Message { public string Label; public object Content; } }
using System.Xml;
using Finstone.Configuration;
using System.Configuration;
using MQAX200;
using System.Threading;namespace Finstone.MessageQueue
{
/// <summary>
/// A sample implementation of IMessageQueue for MQSeries technology.
/// It allows developers to send, retrieve messages from MQSeries and register
/// event for new message arrivals.
/// </summary>
public class MQSeries : IMessageQueue
{
private ConfigurationManager cm;
private MessageArrivalHandler handler;
private string queueManager;
private string QueueName;
private MQQueue queue;
private MQSession queueSession;
private MQQueueManager mqm;
private bool queueListenerStarted;
private int sleepTime;
/// <summary>
/// Constructor that retrieve the queue related information
/// for MessageQueueConfiguration object
/// </summary>
/// <param name="queueName">the name of the queue</param>
public MQSeries(string queueName)
{
cm = (ConfigurationManager)ConfigurationSettings.GetConfig("Framework");
XmlNode queueInfo = cm.MessageQueueConfig.RetrieveQueueInformation("*[@name='" + queueName + "']" );
queueManager = queueInfo.SelectSingleNode("QueueManager").InnerText;
QueueName = queueInfo.SelectSingleNode("QueueName").InnerText;
sleepTime = Int32.Parse(queueInfo.SelectSingleNode("SleepTime").InnerText);
queueSession = new MQSessionClass();
} /// <summary>
/// send the message to the MQSeries's queue
/// </summary>
/// <param name="m">a message object to be sent</param> public void Send(Message m)
{
//create a new MQSeries message
MQMessage message = (MQMessage)queueSession.AccessMessage();
message.WriteString(m.Content.ToString());
MQPutMessageOptions messageOption = (MQPutMessageOptions)queueSession.AccessPutMessageOptions();
//send the message to the MQSeries queue
queue.Put(message,messageOption);
} /// <summary>
/// Retrieve message from the MQSeries's queue
/// </summary>
/// <returns></returns>
public Message Retrieve()
{
//create a new message
MQMessage message = (MQMessage)queueSession.AccessMessage();
MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions();
//fill the new message object with message from he queue
//unlike MSMQ, GET is not a blocking call, instead, it raise
//an exception if trying to retrieve message from a queue that is emtpy.
queue.Get(message,messageOption,System.Reflection.Missing.Value);
//create a new message object that contains the
//message from the queue.
Message m = new Message();
m.Content = message.ReadString(message.MessageLength);
m.Label = message.MessageId;
return m;
} /// <summary>
/// event for arrival messages to the queue
/// </summary>
public event MessageArrivalHandler MessageArrival
{
//when new handler is register for the event, start the listener
//if it is not yet started
add
{
handler += value;
if (queueListenerStarted != true)
{
//create a new thread to listen on the queue
ThreadStart ts = new ThreadStart(StartQueueListener);
Thread t = new Thread(ts);
t.Start();
}
}
remove
{
handler -= value;
//stop the listener if no handler is listed
if (handler == null || handler.GetInvocationList().Length <= 0)
{
StopQueueListener();
}
}
} /// <summary>
/// Start the listen to the queue for incoming messages and
/// notifiy the handlers as new messges arrive
/// </summary>
public void StartQueueListener()
{
//create a separate connection to the message queue
queueListenerStarted = true;
MQQueueManager listenermqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager);
MQQueue listenerqueue =(MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","","");
listenerqueue.Open();
try
{
MQMessage message = (MQMessage)queueSession.AccessMessage();
MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions();
while(queueListenerStarted == true)
{
System.Threading.Thread.Sleep(sleepTime);
if (handler.GetInvocationList().Length > 0)
{
try
{
//GET will raise an exception if no message is in the queue.
//we want to keep listening despite of the exception, see exception block
//for detail
listenerqueue.Get(message,messageOption,System.Reflection.Missing.Value);
Finstone.MessageQueue.Message FinstoneMessage = new Finstone.MessageQueue.Message();
FinstoneMessage.Label = message.MessageId;
FinstoneMessage.Content = message.ReadString(message.MessageLength);
//fire the event
handler(FinstoneMessage,QueueName);
}
catch (System.Runtime.InteropServices.COMException ex)
{
//-2147467259 represents the error code for retrieving
//message from an empty queue. do nothing if gotting this error code.
if (ex.ErrorCode != -2147467259)
{
throw ex;
}
}
}
} }
finally
{
//close the connetion
listenerqueue.Close();
listenermqm.Disconnect();
}
} /// <summary>
/// stop the listener
/// </summary>
public void StopQueueListener()
{
queueListenerStarted = false;
}
/// <summary>
/// open the connection to the message queue
/// </summary>
public void Open()
{
mqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager);
queue = (MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","","");
queue.Open(); } /// <summary>
/// close the connection to the message queue
/// </summary>
public void Close()
{
if (queue != null)
{
queue.Close();
}
if (mqm != null)
{
mqm.Disconnect();
}
}
}
}
using System.Configuration;
using Finstone.Configuration;namespace Finstone.MessageQueue
{
/// <summary>
/// MessageQueueManager contains the API that developers
/// would interact with the most. It provide the "bridge"
/// to the underlying implementation of message queueing.
/// This compnoent show most simplified linkage between MessageQueueManager
/// and underlying Message queue. The more dissimilar between IMessageManager and the
/// underlying Message Queue, the more complex MessageQueueManager becomes.
/// </summary>
public class MessageQueueManager : IMessageQueueManager
{
private IMessageQueue mq;
private ConfigurationManager cm; public MessageQueueManager(IMessageQueue messageQueue)
{
mq = messageQueue;
} /// <summary>
/// pass the call the actual message queue implementation
/// </summary>
/// <param name="m">message object</param>
public void SendMessage(Message m)
{
mq.Send(m);
}
/// <summary>
/// pass the call the actual message queue implementation
/// </summary>
/// <returns>the retrieved message from the queue</returns>
public Message RetrieveMessage()
{
return mq.Retrieve();
} /// <summary>
/// register event hanlder that trigger when new message arrives at the queue
/// </summary>
/// <param name="mah">MessageArrivalHandler delegate</param>
public void RegisterMessageArrivalHanlder(MessageArrivalHandler mah)
{
mq.MessageArrival += mah;
} /// <summary>
/// unregister event handler that trigger when new message arrives at the queue.
/// </summary>
/// <param name="mah">MessageArrivalHandler delegate</param>
public void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah)
{
mq.MessageArrival -= mah;
} /// <summary>
/// open the connection of the underlying mesage queue
/// </summary>
public void OpenConnection()
{
mq.Open();
} /// <summary>
/// close the connection of the underlying message queue
/// </summary>
public void CloseConnection()
{
mq.Close();
} }
}
{
/// <summary>
/// Containing the interface and class referenced in
/// the message queue impelmentation class
/// </summary>
public delegate void MessageArrivalHandler (Message m, string queueName);
/// <summary>
/// interface that every message queue implementation must implements
/// </summary>
public interface IMessageQueue
{
void Send(Message message);
Message Retrieve();
event MessageArrivalHandler MessageArrival;
void Open();
void Close();
} /// <summary>
/// Interface for MessageQueueManager. it is used by
/// client application to interact with MessageQueueManager.
/// </summary>
public interface IMessageQueueManager
{
void SendMessage(Message message);
Message RetrieveMessage();
void RegisterMessageArrivalHanlder(MessageArrivalHandler mah);
void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah);
void OpenConnection();
void CloseConnection();
} /// <summary>
/// A basic Message class, can be extended to create more
/// implmentation specific message class
/// </summary>
public class Message
{
public string Label;
public object Content;
}
}
请问关于MQQueueManager的创建是如何建立的呢。
您提供的代码是安装服务器端的连接方法和创建MQ管理器对象的方法,请问是否有只安装客户端创建MQQueueManager的方法?