WebSphere MQ以下简称为WMQ.
在通讯项目中,有这样的一个应用场景,简单描述如下:
1. 程序A需定时从MQ中取出消息(XML)
2. 将XML还原为DataSet
3. 将DataSet持久化到数据库该场景在总线型的消息传输框架中较为常见, 在一切正常的情况下,程序工作正常,数据不会发生错误或丢失. 但程序A介于WMQ, 与数据库之间, 程序两端的网络因素, 或者任意一端服务停止,均有可能会导致消息丢失. 因此比较稳妥的做法是将以上步骤采用XA事务进行全局托管.实现一个MQGet类:using System;
using System.Collections.Generic;
using System.Text;using System.Collections;
using System.Transactions;
using IBM.WMQ;
namespace WMQClient_WithXA
{
public class MqGet
{
private String _host = "127.0.0.1";
private int _port;
private String _channelName = "SYSTEM.DEF.SVRCONN";
private String _queueManagerName = null;
private String _queueName = null;
private int _charSet;
private WMQTransactionType _transactionType;
private bool isTopic = false;
private String _transportMode = "managed";
private bool commit = true; private MQQueueManager queueManager;
private MQQueue queue; private MyMqObject myMqObj;
private Hashtable properties;
private MQMessage message;
private MQGetMessageOptions getMessageOptions; public MqGet(string sMqQmgrName, string sQueueName, string sChannelName, string sHost, int iPort, int iCharacterSet, WMQTransactionType TransactionType, string sTransportMode = "managed")
{
getMessageOptions = new MQGetMessageOptions();
_host = sHost;
_port = iPort;
_channelName = sChannelName;
_queueManagerName = sMqQmgrName;
_queueName = sQueueName;
_transportMode = sTransportMode;
_charSet = iCharacterSet;
_transactionType = TransactionType; getMessageOptions.Options += MQC.MQGMO_WAIT;
getMessageOptions.WaitInterval = 20000; // 20 seconds wait properties = new Hashtable();
properties.Add(MQC.HOST_NAME_PROPERTY, _host);
properties.Add(MQC.PORT_PROPERTY, _port);
properties.Add(MQC.CHANNEL_PROPERTY, _channelName); switch (TransactionType)
{
case WMQTransactionType.NORMAL_TRANSACTION:
getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
break;
case WMQTransactionType.XA_TRANSACTION:
if (_transportMode == "managed")
{
properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED); // for managed mode
}
else
{
properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_XACLIENT);
}
getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
break;
}
} private MQQueueManager getMqManager(MQQueueManager qmg)
{
if (qmg == null)
{
try
{
qmg = new MQQueueManager(_queueManagerName, properties);
return qmg;
}
catch(Exception err)
{
Console.WriteLine(err.Message);
return null;
}
}
else
return qmg;
}
private MQQueue getMqQ(MQQueue q)
{
if (q == null)
{
try
{
q = myMqObj._qMg.AccessQueue(_queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
return q;
}
catch (Exception err)
{
Console.WriteLine(err.Message);
return null;
}
}
else
return q;
} private MyMqObject GetMqObj(ref MyMqObject mqObj)
{
if (mqObj._qMg == null)
{
mqObj._qMg = getMqManager(mqObj._qMg);
mqObj._q = null;
}
if ( mqObj._qMg !=null )
mqObj._q = getMqQ(mqObj._q);
if ( mqObj._q == null )
{
CloseMQConn(ref mqObj);
}
return mqObj;
}
public void CloseMQConn(ref MyMqObject mqObj)
{
if (mqObj._q != null)
{
try
{
mqObj._q.Close();
}
catch{}
mqObj._q = null;
} if (mqObj._qMg != null)
{
try
{
mqObj._qMg.Disconnect();
}
catch{}
mqObj._qMg = null;
}
} public MyMessage GetMessage()
{
myMqObj = GetMqObj(ref myMqObj);
if (myMqObj._qMg != null)
{
message = new MQMessage();
MyMessage msg = new MyMessage();
try
{
myMqObj._q.Get(message, getMessageOptions); byte[] buff = message.ReadBytes(message.MessageLength);
msg.MsgBody = System.Text.Encoding.GetEncoding(CodePageTrans.getMsCodePage(_charSet)).GetString(buff);
msg.PutTime = message.PutDateTime.AddHours(8);
message.ClearMessage();
return msg;
}
catch (MQException mqe)
{
Console.WriteLine("获取消息失败. 原因: " + mqe.ToString());
CloseMQConn(ref myMqObj);
}
}
return null;
} public bool Commit()
{
try
{
switch (_transactionType)
{
case WMQTransactionType.NORMAL_TRANSACTION:
myMqObj._qMg.Commit();
break;
}
}
catch { return false; }
return true;
} public bool Rollback()
{
try
{
switch (_transactionType)
{
case WMQTransactionType.NORMAL_TRANSACTION:
myMqObj._qMg.Backout();
break;
}
}
catch { return false; }
return true;
}
}
}
Oracle使用 ODP.NET 驱动, 测试代码使用显示事务方式, 代码如下:private void btnGetMessage_Click(object sender, EventArgs e)
{
DbProviderFactory dbFactory = DbProviderFactories.GetFactory("Oracle.DataAccess.Client");
DbConnection dbConn = dbFactory.CreateConnection();
dbConn.ConnectionString = sConnStr;
dbConn.Open(); using (CommittableTransaction transScope = new CommittableTransaction())
{
CommittableTransaction.Current = transScope;
dbConn.EnlistTransaction(transScope);
WMQClient_WithXA.MyMessage ss = mqGet.GetMessage(); // 从消息队列1中取出消息
if (ss != null)
{
MessageBox.Show(ss.MsgBody);
DbCommand ocmd = dbFactory.CreateCommand();
ocmd.CommandText = string.Format("insert into HY (NAME) VALUES ('{0}')", Guid.NewGuid().ToString());
ocmd.Connection = dbConn; ocmd.ExecuteNonQuery(); mqPut.PutMessage(ss.MsgBody); // 将消息放入到队列2
transScope.Commit(); // 或者回滚事务
}
CommittableTransaction.Current = null;
dbConn.Close();
}
}
完整DEMO, 免积分下载地址: http://download.csdn.net/detail/hyblusea/4529167
在通讯项目中,有这样的一个应用场景,简单描述如下:
1. 程序A需定时从MQ中取出消息(XML)
2. 将XML还原为DataSet
3. 将DataSet持久化到数据库该场景在总线型的消息传输框架中较为常见, 在一切正常的情况下,程序工作正常,数据不会发生错误或丢失. 但程序A介于WMQ, 与数据库之间, 程序两端的网络因素, 或者任意一端服务停止,均有可能会导致消息丢失. 因此比较稳妥的做法是将以上步骤采用XA事务进行全局托管.实现一个MQGet类:using System;
using System.Collections.Generic;
using System.Text;using System.Collections;
using System.Transactions;
using IBM.WMQ;
namespace WMQClient_WithXA
{
public class MqGet
{
private String _host = "127.0.0.1";
private int _port;
private String _channelName = "SYSTEM.DEF.SVRCONN";
private String _queueManagerName = null;
private String _queueName = null;
private int _charSet;
private WMQTransactionType _transactionType;
private bool isTopic = false;
private String _transportMode = "managed";
private bool commit = true; private MQQueueManager queueManager;
private MQQueue queue; private MyMqObject myMqObj;
private Hashtable properties;
private MQMessage message;
private MQGetMessageOptions getMessageOptions; public MqGet(string sMqQmgrName, string sQueueName, string sChannelName, string sHost, int iPort, int iCharacterSet, WMQTransactionType TransactionType, string sTransportMode = "managed")
{
getMessageOptions = new MQGetMessageOptions();
_host = sHost;
_port = iPort;
_channelName = sChannelName;
_queueManagerName = sMqQmgrName;
_queueName = sQueueName;
_transportMode = sTransportMode;
_charSet = iCharacterSet;
_transactionType = TransactionType; getMessageOptions.Options += MQC.MQGMO_WAIT;
getMessageOptions.WaitInterval = 20000; // 20 seconds wait properties = new Hashtable();
properties.Add(MQC.HOST_NAME_PROPERTY, _host);
properties.Add(MQC.PORT_PROPERTY, _port);
properties.Add(MQC.CHANNEL_PROPERTY, _channelName); switch (TransactionType)
{
case WMQTransactionType.NORMAL_TRANSACTION:
getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
break;
case WMQTransactionType.XA_TRANSACTION:
if (_transportMode == "managed")
{
properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED); // for managed mode
}
else
{
properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_XACLIENT);
}
getMessageOptions.Options += MQC.MQGMO_SYNCPOINT;
break;
}
} private MQQueueManager getMqManager(MQQueueManager qmg)
{
if (qmg == null)
{
try
{
qmg = new MQQueueManager(_queueManagerName, properties);
return qmg;
}
catch(Exception err)
{
Console.WriteLine(err.Message);
return null;
}
}
else
return qmg;
}
private MQQueue getMqQ(MQQueue q)
{
if (q == null)
{
try
{
q = myMqObj._qMg.AccessQueue(_queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
return q;
}
catch (Exception err)
{
Console.WriteLine(err.Message);
return null;
}
}
else
return q;
} private MyMqObject GetMqObj(ref MyMqObject mqObj)
{
if (mqObj._qMg == null)
{
mqObj._qMg = getMqManager(mqObj._qMg);
mqObj._q = null;
}
if ( mqObj._qMg !=null )
mqObj._q = getMqQ(mqObj._q);
if ( mqObj._q == null )
{
CloseMQConn(ref mqObj);
}
return mqObj;
}
public void CloseMQConn(ref MyMqObject mqObj)
{
if (mqObj._q != null)
{
try
{
mqObj._q.Close();
}
catch{}
mqObj._q = null;
} if (mqObj._qMg != null)
{
try
{
mqObj._qMg.Disconnect();
}
catch{}
mqObj._qMg = null;
}
} public MyMessage GetMessage()
{
myMqObj = GetMqObj(ref myMqObj);
if (myMqObj._qMg != null)
{
message = new MQMessage();
MyMessage msg = new MyMessage();
try
{
myMqObj._q.Get(message, getMessageOptions); byte[] buff = message.ReadBytes(message.MessageLength);
msg.MsgBody = System.Text.Encoding.GetEncoding(CodePageTrans.getMsCodePage(_charSet)).GetString(buff);
msg.PutTime = message.PutDateTime.AddHours(8);
message.ClearMessage();
return msg;
}
catch (MQException mqe)
{
Console.WriteLine("获取消息失败. 原因: " + mqe.ToString());
CloseMQConn(ref myMqObj);
}
}
return null;
} public bool Commit()
{
try
{
switch (_transactionType)
{
case WMQTransactionType.NORMAL_TRANSACTION:
myMqObj._qMg.Commit();
break;
}
}
catch { return false; }
return true;
} public bool Rollback()
{
try
{
switch (_transactionType)
{
case WMQTransactionType.NORMAL_TRANSACTION:
myMqObj._qMg.Backout();
break;
}
}
catch { return false; }
return true;
}
}
}
Oracle使用 ODP.NET 驱动, 测试代码使用显示事务方式, 代码如下:private void btnGetMessage_Click(object sender, EventArgs e)
{
DbProviderFactory dbFactory = DbProviderFactories.GetFactory("Oracle.DataAccess.Client");
DbConnection dbConn = dbFactory.CreateConnection();
dbConn.ConnectionString = sConnStr;
dbConn.Open(); using (CommittableTransaction transScope = new CommittableTransaction())
{
CommittableTransaction.Current = transScope;
dbConn.EnlistTransaction(transScope);
WMQClient_WithXA.MyMessage ss = mqGet.GetMessage(); // 从消息队列1中取出消息
if (ss != null)
{
MessageBox.Show(ss.MsgBody);
DbCommand ocmd = dbFactory.CreateCommand();
ocmd.CommandText = string.Format("insert into HY (NAME) VALUES ('{0}')", Guid.NewGuid().ToString());
ocmd.Connection = dbConn; ocmd.ExecuteNonQuery(); mqPut.PutMessage(ss.MsgBody); // 将消息放入到队列2
transScope.Commit(); // 或者回滚事务
}
CommittableTransaction.Current = null;
dbConn.Close();
}
}
完整DEMO, 免积分下载地址: http://download.csdn.net/detail/hyblusea/4529167
解决方案 »
- wpf 弹出登录消息框,该怎么做?
- 模拟form提交图片和视频有问题,帮忙看下
- 急:返回类的问题
- 文件操作问题:Normal file does not contain a definition of 'File Delete Operator
- C#来从从图像中找到一特定点
- 在线求问一个关闭窗口问题!!!
- 公司内部网络打开慢的原因!
- 双机通信规定前 10 个字节是命令 ,如果保证有10个固定的字节。
- Developer XtraTreeList第三方控件问题
- c# 将gridview 的数据下载到excel 2007
- 两个list,循环一个,另一个Remove报错,想不明白 求解
- JS如何清除浏览器当前页面的缓存
1. 使用隐式事务,代码如下: 发送1000条消息耗时大约30秒Stopwatch stw = new Stopwatch();
stw.Start();
for (int i = 0; i < 1000; i++)
{
using (TransactionScope transScope = new TransactionScope())
{
mqPut.PutMessage(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fffff") + ": AAAAAAABBBBBBBCCCCCCCCCDDDDDDDDDDDEEEEEEEFFFFFFFFFFGGGGGGGGGHHHHHHHHHHHHIIIIIIIIIIIIIIIIIIIIIIIJJJJJJJJJJJJJJJJJJJJJJKKKKKKKKKKKKKKKK,这是一个测试.");
transScope.Complete();
}}
MessageBox.Show("共耗时:" + stw.Elapsed.Seconds.ToString() + "秒");
2.采用显示事务耗时大概27秒
3.独立事务方式耗时1秒
4.不开启事务耗时<1秒.因此可以看出XA事务瓶颈比较大, 隐式的XA性能比显式略有提高,但ODP.NET似乎不支持隐式.
看来我OUT了, 弱弱的问下什么是查水表啊?
文件,走流傳輸?
在传输的过程中,DataSet之类的对象不能直接放进Mq, 必须序列化...所以我们把DataSet序列化为XML字符串