WebSphere MQ以下简称为WMQ.
在通讯项目中,有这样的一个应用场景,简单描述如下:
1. 程序A需定时从MQ中取出消息(XML)
2. 将XML还原为DataSet
3. 将DataSet持久化到数据库该场景在总线型的消息传输框架中较为常见, 在一切正常的情况下,程序工作正常,数据不会发生错误或丢失. 但程序A介于WMQ, 与数据库之间, 程序两端的网络因素, 或者任意一端服务停止,均有可能会导致消息丢失. 因此比较稳妥的做法是将以上步骤采用XA事务进行全局托管.实现一个MQGet类:using System;
using System.Collections.Generic;
using System.Text;using System.Collections;
using System.Transactions;
using IBM.WMQ;
namespace WMQClient_WithXA
{
    public class MqGet
    {
        private String _host = "127.0.0.1";
        private int _port;
        private String _channelName = "SYSTEM.DEF.SVRCONN";
        private String _queueManagerName = null;
        private String _queueName = null;
        private int _charSet;
        private WMQTransactionType _transactionType;
        private bool isTopic = false;
        private String _transportMode = "managed";
        private bool commit = true;        private MQQueueManager queueManager;
        private MQQueue queue;        private MyMqObject myMqObj;
        private Hashtable properties;
        private MQMessage message;
        private MQGetMessageOptions getMessageOptions;        public MqGet(string sMqQmgrName, string sQueueName, string sChannelName, string sHost, int iPort, int iCharacterSet, WMQTransactionType TransactionType, string sTransportMode = "managed")
        {
            getMessageOptions = new MQGetMessageOptions();
            _host = sHost;
            _port = iPort;
            _channelName = sChannelName;
            _queueManagerName = sMqQmgrName;
            _queueName = sQueueName;
            _transportMode = sTransportMode;
            _charSet = iCharacterSet;
            _transactionType = TransactionType;            getMessageOptions.Options += MQC.MQGMO_WAIT;
            getMessageOptions.WaitInterval = 20000;  // 20 seconds wait            properties = new Hashtable();
            properties.Add(MQC.HOST_NAME_PROPERTY, _host);
            properties.Add(MQC.PORT_PROPERTY, _port);
            properties.Add(MQC.CHANNEL_PROPERTY, _channelName);            switch (TransactionType)
            {
                case WMQTransactionType.NORMAL_TRANSACTION:
                    getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
                    break;
                case WMQTransactionType.XA_TRANSACTION:
                    if (_transportMode == "managed")
                    {
                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);                                         // for managed mode                        
                    }
                    else
                    {
                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_XACLIENT);
                    }
                    getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
                    break;
            }
        }        private MQQueueManager getMqManager(MQQueueManager qmg)
        {
            if (qmg == null)
            {
                try
                {
                    qmg = new MQQueueManager(_queueManagerName, properties);
                    return qmg;
                }
                catch(Exception err)
                {
                    Console.WriteLine(err.Message);
                    return null;
                }
            }
            else
                return qmg;
        }
        private MQQueue getMqQ(MQQueue q)
        {
            if (q == null)
            {
                try
                {
                    q = myMqObj._qMg.AccessQueue(_queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
                    return q;
                }
                catch (Exception err)
                {
                    Console.WriteLine(err.Message);
                    return null;
                }
            }
            else
                return q;
        }        private MyMqObject GetMqObj(ref MyMqObject mqObj)
        {
            if (mqObj._qMg == null)
            {
                mqObj._qMg = getMqManager(mqObj._qMg);
                mqObj._q = null;
            }
            if ( mqObj._qMg !=null )
                mqObj._q = getMqQ(mqObj._q);
            if ( mqObj._q == null )
            {
                CloseMQConn(ref mqObj);
            }
            return mqObj;
        }
        public void CloseMQConn(ref MyMqObject mqObj)
        {
            if (mqObj._q != null)
            {
                try
                {
                    mqObj._q.Close();
                }
                catch{}
                mqObj._q = null;
            }            if (mqObj._qMg != null)
            {
                try
                {
                    mqObj._qMg.Disconnect();
                }
                catch{}
                mqObj._qMg = null;
            }
        }        public MyMessage GetMessage()
        {
            myMqObj = GetMqObj(ref myMqObj);
            if (myMqObj._qMg != null)
            {
                message = new MQMessage();
                MyMessage msg = new MyMessage();
                try
                {
                    myMqObj._q.Get(message, getMessageOptions);                    byte[] buff = message.ReadBytes(message.MessageLength);
                    msg.MsgBody = System.Text.Encoding.GetEncoding(CodePageTrans.getMsCodePage(_charSet)).GetString(buff);
                    msg.PutTime = message.PutDateTime.AddHours(8);
                    message.ClearMessage();
                    return msg;
                }
                catch (MQException mqe)
                {
                    Console.WriteLine("获取消息失败. 原因: " + mqe.ToString());
                    CloseMQConn(ref myMqObj);
                }
            }
            return null;
        }        public bool Commit()
        {
            try
            {
                switch (_transactionType)
                {
                    case WMQTransactionType.NORMAL_TRANSACTION:
                        myMqObj._qMg.Commit();
                        break;
                }
            }
            catch { return false; }
            return true;
        }        public bool Rollback()
        {
            try
            {
                switch (_transactionType)
                {
                    case WMQTransactionType.NORMAL_TRANSACTION:
                        myMqObj._qMg.Backout();
                        break;
                }
            }
            catch { return false; }
            return true;
        }
    }
}
Oracle使用 ODP.NET 驱动, 测试代码使用显示事务方式, 代码如下:private void btnGetMessage_Click(object sender, EventArgs e)
{
    DbProviderFactory dbFactory = DbProviderFactories.GetFactory("Oracle.DataAccess.Client");
    DbConnection dbConn = dbFactory.CreateConnection();
    dbConn.ConnectionString = sConnStr;
    dbConn.Open();    using (CommittableTransaction transScope = new CommittableTransaction())
    {
        CommittableTransaction.Current = transScope;
        dbConn.EnlistTransaction(transScope);
        WMQClient_WithXA.MyMessage ss = mqGet.GetMessage();  // 从消息队列1中取出消息
        if (ss != null)
        {
            MessageBox.Show(ss.MsgBody);
            DbCommand ocmd = dbFactory.CreateCommand();
            ocmd.CommandText = string.Format("insert into HY (NAME) VALUES  ('{0}')", Guid.NewGuid().ToString());
            ocmd.Connection = dbConn;            ocmd.ExecuteNonQuery();            mqPut.PutMessage(ss.MsgBody);     // 将消息放入到队列2
            transScope.Commit();              // 或者回滚事务
        }
        CommittableTransaction.Current = null;
        dbConn.Close();
    }
}
完整DEMO, 免积分下载地址: http://download.csdn.net/detail/hyblusea/4529167

解决方案 »

  1.   

    关于使用XA事务的性能测试
    1. 使用隐式事务,代码如下: 发送1000条消息耗时大约30秒Stopwatch stw = new Stopwatch();
    stw.Start(); 
    for (int i = 0; i < 1000; i++)
    {
        using (TransactionScope transScope = new TransactionScope())
        {
            mqPut.PutMessage(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fffff") + ": AAAAAAABBBBBBBCCCCCCCCCDDDDDDDDDDDEEEEEEEFFFFFFFFFFGGGGGGGGGHHHHHHHHHHHHIIIIIIIIIIIIIIIIIIIIIIIJJJJJJJJJJJJJJJJJJJJJJKKKKKKKKKKKKKKKK,这是一个测试.");
            transScope.Complete();
        }}
    MessageBox.Show("共耗时:" + stw.Elapsed.Seconds.ToString() + "秒"); 
    2.采用显示事务耗时大概27秒
    3.独立事务方式耗时1秒
    4.不开启事务耗时<1秒.因此可以看出XA事务瓶颈比较大, 隐式的XA性能比显式略有提高,但ODP.NET似乎不支持隐式.
      

  2.   


    看来我OUT了, 弱弱的问下什么是查水表啊? 
      

  3.   

    结贴前来顶一下, 看来使用WMQ的童鞋不多...
      

  4.   

    弱弱的问一句 如何得到微软的mvp的徽章啊
      

  5.   

    LZ直接是把XML当成文件读取进来,再存入数据库,为什么不一开始直接传入集合之类的对象呢。干嘛要用xml
    文件,走流傳輸?
      

  6.   


    在传输的过程中,DataSet之类的对象不能直接放进Mq, 必须序列化...所以我们把DataSet序列化为XML字符串