Spring提供了一个JMS远程调用方法的实现,但是访问量大了总会出现慢速消费者的错误。自己实现了一下,客户端只有一个消息者来接收消息。各位看看改进一下。这是服务端。
/**
* 将一个对象开放成可以接收指定消息并执行消息中指明的方法的工厂。
* 消息类型是TextMessage,其消息体是一个字符串表示需要调用的方法的名称。
* 接收时分别会查找附加属性JmsConfig.PARAMETER_LIST_VALUE和JmsConfig.PARAMETER_LIST_CLASS
* 分别代表方法参数和参数类型,其分别是Object[]和Class[]类型。
*
* 方法调用的结果,以一个ObjectMessage类型的消息。其包含一个Map实例,其中以
* JmsConfig.RETURN_VALUE为KEY的是方法的返回值,JmsConfig.RETURN_EXCEPTION为方法执行
* 时出现的异常。客户端应该优先检查JmsConfig.RETURN_EXCEPTION是否为null,如果为null再
* 检视JmsConfig.RETURN_VALUE是否有值。
*
* 而且为了分离其他的消息,默认使用了一个消息选择器。选择器的key为JmsConfig.MESSAGE_SELECT_KEY
* 值为JmsConfig.MESSAGE_SELECT_VALUE。
* 同时还有一个附加属性是必须配置时设定,就是作为服务开放的对象起一个服务名称。这是一个客户
* 端和服务端约定的名称,服务端接收消息时只会收取JmsConfig.SERVICE_NAME等于指定值的消息。
*
* 定义了两个队列,一个用于接收方法另一个用于返回方法的调用。
* 每次接收到方法调用的消息,都会构造一个新的线程来处理。此线程处理方法的调用,并往返回队列中
* 发送包装有返回值的消息。
*
* 例如:调用这样一个方法 List test(int arg0,Set arg1),那么接收到的消息应该是这样。
*
* Map callInfo = new HashMap();
* //方法参数信息
* callInfo.put(JmsConfig.PARAMETER_LIST_CLASS,new Class[]{Integer.Type,Set.class});
* callInfo.put(JmsConfig.PARAMETER_LIST_VALUE,new Object[]{new Integer(1),new HashSet()});
* //方法名称
* callInfo.put(JmsConfig.METHOD_NAME,"test");
* ObjectMessage message = session.createObjectMessage((Serializable) callInfo);
* 最终将message发送出去。
* JmsConfig.PARAMETER_LIST_CLASS和JmsConfig.PARAMETER_LIST_VALUE为两个数组,两个数组
* 长度必须一致。
*
* 那么如果方法有返回值的话,将会是如下的消息。
*
* ObjectMessage message = ...//这是接收到方法调用请求消息
* Map returnCollection = ...//调用结果的容器。
* ObjectMessage replyMessage = session.createObjectMessage((Serializable) returnCollection);
* replyMessage.setJMSCorrelationID(message.getJMSCorrelationID());//关联原消息
* 将此replyMessage消息发送到返回队列中。
*
* 调用结果的容器,其实是一个大小为2的HasMap实例。其中JmsConfig.RETURN_VALUE为KEY的值是
* 调用结果值,JmsConfig.RETURN_EXCEPTION为调用产生的异常。JMSException异常不会写入此容器。
*
* 发送消息的模式,1不持久,2进行持久。
*
* @version 1.00 2009/06/03
* @since 1.5
* @author Mike
* @see com.etnetchina.remoting.jms.JmsConfig
* @see com.etnetchina.remoting.jms.MethodCallListener
* @see com.etnetchina.id.IdGenerate
*/
public class JmsServiceExporter { private Object target;//目标对象。
private ConnectionFactory connectionFactory;
private Connection connection;
private Destination sendQueue;//接收消息的队列。
private Destination receiveQueue;//返回消息的队列。
private long timeLive = 0;
private Session receiveSession;//接收消息的Session
private String serviceName;//服务名称
//发送模式,默认不持久
private int deliveryMode = DeliveryMode.NON_PERSISTENT; //执行的线程池
private final ExecutorService threadPool =
Executors.newCachedThreadPool();
private static final Log log = LogFactory.getLog(JmsServiceExporter.class); /**
* 初始化方法,会检查必要的资源是否已经准备完备。
* @throws java.lang.Exception 初始化异常。
*/
public void init() throws Exception {
Exception ex;
if (this.getConnectionFactory() == null) {
ex = new JMSException("JMS ConnectionFactory object is empty.");
log.error(ex);
throw ex;
} if (this.getSendQueue() == null) {
ex = new JMSException(
"Address of the queue to receive messages is empty.");
log.error(ex);
throw ex;
} if (this.getReceiveQueue() == null) {
ex = new JMSException(
"Address of the queue to return messages is empty.");
log.error(ex);
throw ex;
} if (target == null) {
ex = new NullPointerException("Target object for the null.");
log.error(ex);
throw ex;
} if (threadPool == null || threadPool.isShutdown()) {
ex = new Exception("Thread pool is empty or has been closed.");
log.error(ex);
throw ex;
} if (this.getServiceName() == null || this.getServiceName().isEmpty()) {
ex = new Exception("Service Name is not legitimate. [" + this.
getServiceName() + "]");
log.error(ex);
throw ex;
} this.afterPropertiesSet();
} /**
* 销毁动作,结束所有服务的监听。
* @throws java.lang.Exception 结束服务时出现异常。
*/
public void destroy() throws Exception {
if (receiveSession != null) {
receiveSession.close();
} if (getConnection() != null) {
getConnection().close();
} if (getThreadPool() != null) {
getThreadPool().shutdown();
}
} /**
* 获取当前持有的目标对象。
* @return 目标对象。
*/
public Object getTarget() {
return target;
} /**
* 设置目标对象,最终使用反射在此对象上调用方法。
* @param target 源目标对象。
*/
public void setTarget(Object target) {
this.target = target;
} /**
* 返回JMS的连接对象。
* @return 连接对象。
*/
public Connection getConnection() {
return connection;
} /**
* 发送消息的队列。
* @return 发送消息队列。
*/
public Destination getSendQueue() {
return sendQueue;
} /**
* 设置发送消息的队列。
* @param sendQueue 发送消息的队列。
*/
public void setSendQueue(Destination sendQueue) {
this.sendQueue = sendQueue;
} /**
* 返回接收消息对列地址。
* @return 调用接收消息对列地址。
*/
public Destination getReceiveQueue() {
return receiveQueue;
} /**
* 设置接收消息对列地址。
* @param receiveQueue 调用接收消息对列地址。
*/
public void setReceiveQueue(Destination receiveQueue) {
this.receiveQueue = receiveQueue;
} /**
* 获取当前的回复消息生命周期,单位为毫秒。
* @return 回复消息的生命周期。
*/
public long getTimeLive() {
return timeLive;
} /**
* 设置回复消息生命周期,单位为毫秒。
* @param timeLive 回复消息生命周期,单位为毫秒。
*/
public void setTimeLive(long timeLive) {
this.timeLive = timeLive;
} /**
* 日志记录对象。
* @return 日志记录。
*/
public static Log getLog() {
return log;
} /**
* 获取当前使用的线程池。
* @return 线程池
*/
public ExecutorService getThreadPool() {
return threadPool;
} /**
* 获取开放的服务名称。
* @return 服务名称。
*/
public String getServiceName() {
return serviceName;
} /**
* 设置开放的服务名称。
* @param serviceName 服务名称。
*/
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
} /**
* 获取JMS连接工厂
* @return JMS连接工厂
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
} /**
* 设置JMS连接工石
* @param connectionFactory JMS连接工厂
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
} /**
* @return the deliveryMode
*/
public int getDeliveryMode() {
return deliveryMode;
} /**
* @param deliveryMode the deliveryMode to set
* @throws IllegalArgumentException 设定的传递模式错误。
*/
public void setDeliveryMode(int deliveryMode) throws
IllegalArgumentException {
//如果设置的是无效的消息持久特性
if (deliveryMode != DeliveryMode.NON_PERSISTENT ||
deliveryMode != DeliveryMode.PERSISTENT) {
StringBuilder buff = new StringBuilder();
buff.append("Transmission mode error, can only be between ");
buff.append(DeliveryMode.NON_PERSISTENT);
buff.append(" and ");
buff.append(DeliveryMode.PERSISTENT);
buff.append(" of the plastic figures.");
buff.append("[");
buff.append(deliveryMode);
buff.append("]");
throw new IllegalArgumentException(buff.toString());
}
this.deliveryMode = deliveryMode;
} /**
* 启动消息监听。
* @throws JMSException 启动失败。
*/
protected void afterPropertiesSet() throws JMSException {
connection = this.getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); StringBuilder serviceSelect = new StringBuilder();
serviceSelect.append("(");
serviceSelect.append(JmsConfig.SERVICE_NAME);
serviceSelect.append("='");
serviceSelect.append(this.getServiceName());
serviceSelect.append("'");
serviceSelect.append(" and ");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_KEY);
serviceSelect.append("='");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_VALUE);
serviceSelect.append("')"); connection.start(); receiveSession.createConsumer(receiveQueue,
serviceSelect.toString()).setMessageListener(
new MethodCallListener(this)); log.info("Start the remote service: " + this.getServiceName());
}
}
/**
* 将一个对象开放成可以接收指定消息并执行消息中指明的方法的工厂。
* 消息类型是TextMessage,其消息体是一个字符串表示需要调用的方法的名称。
* 接收时分别会查找附加属性JmsConfig.PARAMETER_LIST_VALUE和JmsConfig.PARAMETER_LIST_CLASS
* 分别代表方法参数和参数类型,其分别是Object[]和Class[]类型。
*
* 方法调用的结果,以一个ObjectMessage类型的消息。其包含一个Map实例,其中以
* JmsConfig.RETURN_VALUE为KEY的是方法的返回值,JmsConfig.RETURN_EXCEPTION为方法执行
* 时出现的异常。客户端应该优先检查JmsConfig.RETURN_EXCEPTION是否为null,如果为null再
* 检视JmsConfig.RETURN_VALUE是否有值。
*
* 而且为了分离其他的消息,默认使用了一个消息选择器。选择器的key为JmsConfig.MESSAGE_SELECT_KEY
* 值为JmsConfig.MESSAGE_SELECT_VALUE。
* 同时还有一个附加属性是必须配置时设定,就是作为服务开放的对象起一个服务名称。这是一个客户
* 端和服务端约定的名称,服务端接收消息时只会收取JmsConfig.SERVICE_NAME等于指定值的消息。
*
* 定义了两个队列,一个用于接收方法另一个用于返回方法的调用。
* 每次接收到方法调用的消息,都会构造一个新的线程来处理。此线程处理方法的调用,并往返回队列中
* 发送包装有返回值的消息。
*
* 例如:调用这样一个方法 List test(int arg0,Set arg1),那么接收到的消息应该是这样。
*
* Map callInfo = new HashMap();
* //方法参数信息
* callInfo.put(JmsConfig.PARAMETER_LIST_CLASS,new Class[]{Integer.Type,Set.class});
* callInfo.put(JmsConfig.PARAMETER_LIST_VALUE,new Object[]{new Integer(1),new HashSet()});
* //方法名称
* callInfo.put(JmsConfig.METHOD_NAME,"test");
* ObjectMessage message = session.createObjectMessage((Serializable) callInfo);
* 最终将message发送出去。
* JmsConfig.PARAMETER_LIST_CLASS和JmsConfig.PARAMETER_LIST_VALUE为两个数组,两个数组
* 长度必须一致。
*
* 那么如果方法有返回值的话,将会是如下的消息。
*
* ObjectMessage message = ...//这是接收到方法调用请求消息
* Map returnCollection = ...//调用结果的容器。
* ObjectMessage replyMessage = session.createObjectMessage((Serializable) returnCollection);
* replyMessage.setJMSCorrelationID(message.getJMSCorrelationID());//关联原消息
* 将此replyMessage消息发送到返回队列中。
*
* 调用结果的容器,其实是一个大小为2的HasMap实例。其中JmsConfig.RETURN_VALUE为KEY的值是
* 调用结果值,JmsConfig.RETURN_EXCEPTION为调用产生的异常。JMSException异常不会写入此容器。
*
* 发送消息的模式,1不持久,2进行持久。
*
* @version 1.00 2009/06/03
* @since 1.5
* @author Mike
* @see com.etnetchina.remoting.jms.JmsConfig
* @see com.etnetchina.remoting.jms.MethodCallListener
* @see com.etnetchina.id.IdGenerate
*/
public class JmsServiceExporter { private Object target;//目标对象。
private ConnectionFactory connectionFactory;
private Connection connection;
private Destination sendQueue;//接收消息的队列。
private Destination receiveQueue;//返回消息的队列。
private long timeLive = 0;
private Session receiveSession;//接收消息的Session
private String serviceName;//服务名称
//发送模式,默认不持久
private int deliveryMode = DeliveryMode.NON_PERSISTENT; //执行的线程池
private final ExecutorService threadPool =
Executors.newCachedThreadPool();
private static final Log log = LogFactory.getLog(JmsServiceExporter.class); /**
* 初始化方法,会检查必要的资源是否已经准备完备。
* @throws java.lang.Exception 初始化异常。
*/
public void init() throws Exception {
Exception ex;
if (this.getConnectionFactory() == null) {
ex = new JMSException("JMS ConnectionFactory object is empty.");
log.error(ex);
throw ex;
} if (this.getSendQueue() == null) {
ex = new JMSException(
"Address of the queue to receive messages is empty.");
log.error(ex);
throw ex;
} if (this.getReceiveQueue() == null) {
ex = new JMSException(
"Address of the queue to return messages is empty.");
log.error(ex);
throw ex;
} if (target == null) {
ex = new NullPointerException("Target object for the null.");
log.error(ex);
throw ex;
} if (threadPool == null || threadPool.isShutdown()) {
ex = new Exception("Thread pool is empty or has been closed.");
log.error(ex);
throw ex;
} if (this.getServiceName() == null || this.getServiceName().isEmpty()) {
ex = new Exception("Service Name is not legitimate. [" + this.
getServiceName() + "]");
log.error(ex);
throw ex;
} this.afterPropertiesSet();
} /**
* 销毁动作,结束所有服务的监听。
* @throws java.lang.Exception 结束服务时出现异常。
*/
public void destroy() throws Exception {
if (receiveSession != null) {
receiveSession.close();
} if (getConnection() != null) {
getConnection().close();
} if (getThreadPool() != null) {
getThreadPool().shutdown();
}
} /**
* 获取当前持有的目标对象。
* @return 目标对象。
*/
public Object getTarget() {
return target;
} /**
* 设置目标对象,最终使用反射在此对象上调用方法。
* @param target 源目标对象。
*/
public void setTarget(Object target) {
this.target = target;
} /**
* 返回JMS的连接对象。
* @return 连接对象。
*/
public Connection getConnection() {
return connection;
} /**
* 发送消息的队列。
* @return 发送消息队列。
*/
public Destination getSendQueue() {
return sendQueue;
} /**
* 设置发送消息的队列。
* @param sendQueue 发送消息的队列。
*/
public void setSendQueue(Destination sendQueue) {
this.sendQueue = sendQueue;
} /**
* 返回接收消息对列地址。
* @return 调用接收消息对列地址。
*/
public Destination getReceiveQueue() {
return receiveQueue;
} /**
* 设置接收消息对列地址。
* @param receiveQueue 调用接收消息对列地址。
*/
public void setReceiveQueue(Destination receiveQueue) {
this.receiveQueue = receiveQueue;
} /**
* 获取当前的回复消息生命周期,单位为毫秒。
* @return 回复消息的生命周期。
*/
public long getTimeLive() {
return timeLive;
} /**
* 设置回复消息生命周期,单位为毫秒。
* @param timeLive 回复消息生命周期,单位为毫秒。
*/
public void setTimeLive(long timeLive) {
this.timeLive = timeLive;
} /**
* 日志记录对象。
* @return 日志记录。
*/
public static Log getLog() {
return log;
} /**
* 获取当前使用的线程池。
* @return 线程池
*/
public ExecutorService getThreadPool() {
return threadPool;
} /**
* 获取开放的服务名称。
* @return 服务名称。
*/
public String getServiceName() {
return serviceName;
} /**
* 设置开放的服务名称。
* @param serviceName 服务名称。
*/
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
} /**
* 获取JMS连接工厂
* @return JMS连接工厂
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
} /**
* 设置JMS连接工石
* @param connectionFactory JMS连接工厂
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
} /**
* @return the deliveryMode
*/
public int getDeliveryMode() {
return deliveryMode;
} /**
* @param deliveryMode the deliveryMode to set
* @throws IllegalArgumentException 设定的传递模式错误。
*/
public void setDeliveryMode(int deliveryMode) throws
IllegalArgumentException {
//如果设置的是无效的消息持久特性
if (deliveryMode != DeliveryMode.NON_PERSISTENT ||
deliveryMode != DeliveryMode.PERSISTENT) {
StringBuilder buff = new StringBuilder();
buff.append("Transmission mode error, can only be between ");
buff.append(DeliveryMode.NON_PERSISTENT);
buff.append(" and ");
buff.append(DeliveryMode.PERSISTENT);
buff.append(" of the plastic figures.");
buff.append("[");
buff.append(deliveryMode);
buff.append("]");
throw new IllegalArgumentException(buff.toString());
}
this.deliveryMode = deliveryMode;
} /**
* 启动消息监听。
* @throws JMSException 启动失败。
*/
protected void afterPropertiesSet() throws JMSException {
connection = this.getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); StringBuilder serviceSelect = new StringBuilder();
serviceSelect.append("(");
serviceSelect.append(JmsConfig.SERVICE_NAME);
serviceSelect.append("='");
serviceSelect.append(this.getServiceName());
serviceSelect.append("'");
serviceSelect.append(" and ");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_KEY);
serviceSelect.append("='");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_VALUE);
serviceSelect.append("')"); connection.start(); receiveSession.createConsumer(receiveQueue,
serviceSelect.toString()).setMessageListener(
new MethodCallListener(this)); log.info("Start the remote service: " + this.getServiceName());
}
}
*
* 消息监听器。只会处理类型为TextMessage的消息,构造一个任务交给线程池来执行。
* @version 1.00
* @since 1.5
* @author Mike
*/
public class MethodCallListener implements MessageListener { private JmsServiceExporter serviceExporter = null; /**
* 构造方法。
* @param serviceExporter 服务发布工厂。
*/
public MethodCallListener(JmsServiceExporter serviceExporter) {
this.serviceExporter = serviceExporter;
} /**
* 收到消息时执行的方法,如果消息兼容TextMessage那么将构造一个线程去完成调用和回复的任务。
* @param message 调用请求消息。
*/
public void onMessage(Message message) {
if (JmsServiceExporter.getLog().isDebugEnabled()) {
JmsServiceExporter.getLog().debug(
CheckUtil.replaceArgs("Received messages {0}. ",
message.toString()));
}
try {
if (!message.getJMSRedelivered()) {
if (ObjectMessage.class.isInstance(message)) {
if (JmsServiceExporter.getLog().isDebugEnabled()) {
JmsServiceExporter.getLog().debug(
"Message types identified as ObjectMessage, for processing.");
}
String clientId = message.getStringProperty(
JmsConfig.SELF_IDENTITY);
//只有客户端标识属性非空才认为是有效属性。
if (clientId != null && !clientId.isEmpty()) {
MethodCallHandle task = new MethodCallHandle(
serviceExporter, clientId);
task.setMessage((ObjectMessage) message);
serviceExporter.getThreadPool().submit(task);
} else {
if (JmsServiceExporter.getLog().isWarnEnabled()) {
JmsServiceExporter.getLog().warn(
CheckUtil.replaceArgs(
"ID is (0) of the message, the client ID {1} is invalid.",
message.getJMSCorrelationID(), clientId));
}
}
}
}
} catch (JMSException ex) {
JmsServiceExporter.getLog().error(ex);
}
}
}
* 实际执行单元,根据指定的Message消息来执行目标上的合式方法,并把结果包装成
* ObjectMessage发往回复队列地址。
*
* @version 1.00
* @since 1.5
* @author Mike
* @see com.etnetchina.remoting.jms.JmsConfig
* @see com.etnetchina.remoting.jms.JmsServiceExporter
* @see com.etnetchina.remoting.jms.MethodCallListener
*/
public class MethodCallHandle implements Runnable { private JmsServiceExporter serviceExporter = null;
private Log log = null;
private ObjectMessage message;
private static Object[] emptyParam = new Object[0];
private String clientId = null; /**
* 构造方法,必须提供JMS连接对象,回复消息的队列地址和目标对象。
* @param serviceExporter
* @param clientId 客户端标识。
*/
public MethodCallHandle(JmsServiceExporter serviceExporter, String clientId) {
this.serviceExporter = serviceExporter;
log = JmsServiceExporter.getLog();
this.clientId = clientId;
} /**
* 获取当前方法调用请求的消息。
* @return 方法调用请求的消息。
*/
public ObjectMessage getMessage() {
return message;
} /**
* 设置方法调用请求的消息。
* @param message 方法调用请求的消息。
*/
public void setMessage(ObjectMessage message) {
this.message = message;
} /**
* 线程执行方法,会根据来源消息调用目标对象的方法并将调用结果包装成消息后发送到答复队列中。
*/
public void run() {
Map<String, Serializable> returnCollection = executeMethod();
try {
sendReplyMessage(returnCollection);
} catch (JMSException ex) {
handleException(returnCollection, ex);
}
} /**
* 执行方法调用,并填充结果容器。
* @return 结果容器,已经包含结果或者异常。
*/
private Map<String, Serializable> executeMethod() {
Map<String, Serializable> returnCollection = new HashMap<String, Serializable>(2);
try { Object value = invoke();
returnCollection.put(JmsConfig.RETURN_VALUE, (Serializable)value); } catch (Exception ex) {
handleException(returnCollection, ex);
} return returnCollection;
} /**
* 异常处理。
* @param returnCollection 结果容器。
* @param ex 异常。
*/
private void handleException(Map<String, Serializable> returnCollection,
Exception ex) {
if (!JMSException.class.isInstance(ex)) {
if (InvocationTargetException.class.isInstance(ex)) {
returnCollection.put(JmsConfig.RETURN_EXCEPTION,
new Exception(((InvocationTargetException) ex).
getTargetException().getMessage()));
} else {
returnCollection.put(JmsConfig.RETURN_EXCEPTION,
new Exception(ex.getMessage()));
}
} log.error(ex);
} /**
* 执行目标方法调用。
* @return 目标对象执行的返回对象。
* @throws javax.jms.JMSException
* @throws java.lang.NoSuchMethodException
* @throws java.lang.IllegalAccessException
* @throws java.lang.IllegalArgumentException
* @throws java.lang.reflect.InvocationTargetException
*/
@SuppressWarnings("unchecked")
private Object invoke()
throws JMSException,
NoSuchMethodException,
IllegalAccessException,
IllegalArgumentException,
InvocationTargetException { Map<String, Object> callInfo = (Map<String, Object>) message.getObject();
String methodName = (String) callInfo.get(JmsConfig.METHOD_NAME);
Object[] args = (Object[]) callInfo.get(JmsConfig.PARAMETER_LIST_VALUE);
Class[] paramsClass = (Class[]) callInfo.get(
JmsConfig.PARAMETER_LIST_CLASS);
Object result = null;
//如果参数类型和参数值都非空,那么必须两个数组的长度一致才可执行。
if (paramsClass != null && args != null) { if (args.length == paramsClass.length) {
result = BeanUtil.invokeMethod(serviceExporter.getTarget(),
methodName,
args,
paramsClass);
} else {
StringBuilder buff = new StringBuilder();
buff.append("Parameters and parameter values do not match. ");
buff.append("Parameters [");
buff.append(Arrays.toString(paramsClass));
buff.append("] , ");
buff.append("parameter values [");
buff.append(Arrays.toString(args));
buff.append("]");
throw new NoSuchMethodException(buff.toString());
} } else {
result = BeanUtil.invokeMethod(serviceExporter.getTarget(),
methodName, emptyParam);
} if (log.isDebugEnabled()) {
StringBuilder buff = new StringBuilder();
buff.append(methodName);
buff.append(" the results of method calls ");
buff.append(result);
buff.append(".");
log.debug(buff.toString());
buff = null;
} return result;
} /**
* 将对象包装成ObjectMessage进行发布。
* @param returnCollection 结果容器。
* @throws javax.jms.JMSException
*/
private void sendReplyMessage(Map<String, Serializable> returnCollection) throws
JMSException {
Session session = null;
MessageProducer producer = null;
try { session = serviceExporter.getConnection().createSession(false,
Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(serviceExporter.getSendQueue()); ObjectMessage replyMessage = session.createObjectMessage(
(Serializable) returnCollection);
replyMessage.setJMSCorrelationID(message.getJMSCorrelationID());
replyMessage.setStringProperty(JmsConfig.SELF_IDENTITY,
this.clientId);
replyMessage.setStringProperty(JmsConfig.SERVICE_NAME,
serviceExporter.getServiceName()); if (log.isDebugEnabled()) {
log.debug(
CheckUtil.replaceArgs(
"Construction JMS remote method invocation response messages. [{0}]",
replyMessage.toString()));
} producer.setDeliveryMode(serviceExporter.getDeliveryMode());
if (serviceExporter.getTimeLive() > 0) {
producer.setTimeToLive(serviceExporter.getTimeLive());
}
producer.send(replyMessage); } finally {
if (producer != null) {
producer.close();
} if (session != null) {
session.close();
}
}
}
}
private String serviceInterface;
private Connection connection;
private ConnectionFactory connectionFactory;
private Queue sendQueue;
private Queue receiveQueue;
private long sendTimeLive = 0;
private long receiveTimeOut = 60000L;
private String serviceName;
private Class interfaceClass;
private boolean ready = false;
private int clearMessageDelay = 1;
private final String selfIdentity = IdGenerate.getUUIDString();
private Session receiveSession = null;
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
private static final Log log = LogFactory.getLog(JmsClientExporter.class);
private static final Map<String, Map<String, Serializable>> returnPool =
new ConcurrentHashMap<String, Map<String, Serializable>>();
private static final java.util.Queue<String> failureQueue =
new ConcurrentLinkedQueue<String>();
private final ScheduledExecutorService clearMessagePool =
Executors.newScheduledThreadPool(1); public void init() throws Exception {
//省略一些验证初始化参数工作 connection = getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); StringBuilder buff = new StringBuilder();
buff.append("(");
buff.append(JmsConfig.SERVICE_NAME);
buff.append("='");
buff.append(this.getServiceName());
buff.append("' and ");
buff.append("");
buff.append(JmsConfig.SELF_IDENTITY);
buff.append("='");
buff.append(this.selfIdentity);
buff.append("')");
receiveSession.createConsumer(receiveQueue,
buff.toString()).setMessageListener(new MessageReceiveListener());
connection.start(); clearMessagePool.scheduleWithFixedDelay(
new ClearExpiredMessageTask(),
getClearMessageDelay(),
getClearMessageDelay(),
TimeUnit.SECONDS);
this.ready = true;
} /**
* 销毁方法。
* @throws javax.jms.JMSException
*/
public void destroy() throws JMSException {
if (receiveSession != null) {
receiveSession.close();
} if (connection != null) {
connection.close();
} clearMessagePool.shutdown();
} /**
* 放弃接收消息。
* @param JMSCorrelationID 消息的连接id.
*/
public void abandoned(String JMSCorrelationID) {
boolean action = true;
try {
action = failureQueue.offer(JMSCorrelationID);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
} if (!action) {
log.warn(
CheckUtil.replaceArgs(
"Message to abandon failed, {0} Message Queuing does not allow failure to add a new failure message.",
JMSCorrelationID));
}
} /**
* 获取当前清理消息的执行间隔,单位秒。
* @return 执行间隔。
*/
public int getClearMessageDelay() {
return clearMessageDelay;
} /**
* 设置当前清理失效消息的执行间隔,单位秒。
* @param clearMessageDelay 执行间隔。
*/
public void setClearMessageDelay(int clearMessageDelay) {
this.clearMessageDelay = clearMessageDelay;
} /**
* 返回当前客户端标识符。
* @return 客户端标识。
*/
public String getSelfIdentity() {
return selfIdentity;
} /**
* 返回指定ID的消息,如果没有将返回null。
* @param JMSCorrelationID 消息连接ID.
* @return 消息,如果不存在将返回null。
*/
public Map<String, Serializable> findMessage(String JMSCorrelationID) {
if (returnPool.containsKey(JMSCorrelationID)) {
return returnPool.remove(JMSCorrelationID);
} else {
return null;
}
} /**
* @return the serviceInterface
*/
public String getServiceInterface() {
return serviceInterface;
} /**
* @param serviceInterface the serviceInterface to set
*/
public void setServiceInterface(String serviceInterface) {
this.serviceInterface = serviceInterface;
} /**
* @return the connection
*/
public Connection getConnection() {
return connection;
} /**
* @return the sendQueue
*/
public Queue getSendQueue() {
return sendQueue;
} /**
* @param sendQueue the sendQueue to set
*/
public void setSendQueue(Queue sendQueue) {
this.sendQueue = sendQueue;
} /**
* @return the receiveQueue
*/
public Queue getReceiveQueue() {
return receiveQueue;
} /**
* @param receiveQueue the receiveQueue to set
*/
public void setReceiveQueue(Queue receiveQueue) {
this.receiveQueue = receiveQueue;
} /**
* @return the sendTimeLive
*/
public long getSendTimeLive() {
return sendTimeLive;
} /**
* @param sendTimeLive the sendTimeLive to set
*/
public void setSendTimeLive(long sendTimeLive) {
this.sendTimeLive = Math.abs(sendTimeLive);
} /**
* @return the receiveTimeOut
*/
public long getReceiveTimeOut() {
return receiveTimeOut;
} /**
* 接收回复消息的超时时间,0为无限。不允许负数。
* @param receiveTimeOut the receiveTimeOut to set
*/
public void setReceiveTimeOut(long receiveTimeOut) {
this.receiveTimeOut = Math.abs(receiveTimeOut);
} /**
* 获取开放的服务名称。
* @return 服务名称。
*/
public String getServiceName() {
return serviceName;
} /**
* 设置开放的服务名称。
* @param serviceName 服务名称。
*/
public void setServiceName(String serviceName) {
this.serviceName = serviceName; } /**
* 获取连接工厂。
* @return 连接工厂。
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
} /**
* 设定连接工厂
* @param connectionFactory 连接工厂。
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
} /**
* 获取当前服务接口类型。
* @return 服务接口类型。
*/
public Class getInterfaceClass() {
return interfaceClass;
} /**
* @return the deliveryMode
*/
public int getDeliveryMode() {
return deliveryMode;
} /**
* @param deliveryMode the deliveryMode to set
* @throws IllegalArgumentException 设定的传递模式错误。
*/
public void setDeliveryMode(int deliveryMode) throws
IllegalArgumentException {
//如果设置的是无效的消息持久特性
if (deliveryMode != DeliveryMode.NON_PERSISTENT ||
deliveryMode != DeliveryMode.PERSISTENT) {
StringBuilder buff = new StringBuilder();
buff.append("Transmission mode error, can only be between ");
buff.append(DeliveryMode.NON_PERSISTENT);
buff.append(" and ");
buff.append(DeliveryMode.PERSISTENT);
buff.append(" of the plastic figures.");
buff.append("[");
buff.append(deliveryMode);
buff.append("]");
throw new IllegalArgumentException(buff.toString());
}
this.deliveryMode = deliveryMode;
} /**
* 获取当前日志记录器。
* @return 日志记录器。
*/
public static Log getLogger() {
return log;
} /**
* 返回对接口的代理实例。
* @return 代理实例。
* @throws Exception 没有正确初始化。
*/
public Object getObject() throws Exception {
if (this.ready) {
InvocationHandler handler = new MethodCallClinetInvocationHandler(
this);
return ProxyFactory.createInterfactProxy(this.getInterfaceClass(),
handler);
} else {
throw new Exception(
serviceName + " has not been properly initialized.");
}
} /**
* 接收服务端回复消息的监听器,只会将类型为ObjectMessage和有JMSCorrelationID属性
* 的消息放入消息池中。
*/
private class MessageReceiveListener implements MessageListener { public void onMessage(Message message) {
try {
String jmsId = message.getJMSCorrelationID(); if (log.isDebugEnabled()) {
log.debug(
CheckUtil.replaceArgs(
"Receive the remote method response message, the message id is (0).",
jmsId));
} if (jmsId != null && ObjectMessage.class.isInstance(message)) {
ObjectMessage objMessage = (ObjectMessage) message;
Map<String, Serializable> returnCollection =
(Map<String, Serializable>) objMessage.getObject();
returnPool.put(jmsId, returnCollection); }
} catch (JMSException ex) {
log.error(ex.getMessage(), ex);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
} /**
* 清理过期失效的消息任务。
*/
private class ClearExpiredMessageTask implements Runnable { public void run() {
String jmsID = null;
try {
jmsID = failureQueue.poll();
if (jmsID != null) {
returnPool.remove(jmsID);
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
* 一个代理实例,用于拦截对于指定接口的方法调用。
* 将调用转换成JMS消息,并获取远程服务的返回值。
* @version 1.00 2009.06.08
* @since 1.5
* @author Mike
*/
public class MethodCallClinetInvocationHandler implements InvocationHandler { private JmsClientExporter clientExporter;
private Log log;
private static final long sleepTime = 30; /**
* 构造方法,需要客户端构造工厂。
* @param clientExporter 客户端构造工厂。
*/
public MethodCallClinetInvocationHandler(JmsClientExporter clientExporter) {
this.clientExporter = clientExporter;
log = JmsClientExporter.getLogger();
} /**
* 实际的拦截方法,每次调用目标对象方法都会被此方法拦截。转换成JMS通信,获取远程方法的
* 返回结果。
* @param proxy 代理类。
* @param method 调用的方法。
* @param args 调用方法时的参数。
* @return 调用结果。
* @throws java.lang.Throwable
*/
public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
if (log.isDebugEnabled()) {
log.debug(method.toString());
log.debug(Arrays.toString(args));
} Connection connection = clientExporter.getConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
try {
ObjectMessage requestMessage = buildMessage(session, method, args);
sendMessage(session, requestMessage); Map<String, Serializable> returnCollection = receiveMessage(
requestMessage.getJMSCorrelationID()); Exception returnException =
(Exception) returnCollection.get(JmsConfig.RETURN_EXCEPTION); Object returnValue = returnCollection.get(JmsConfig.RETURN_VALUE); if (returnException != null) {
throw returnException;
} else {
return returnValue;
} } catch (JMSException ex) {
log.error(ex);
throw ex;
} catch (Exception ex) {
log.error(ex);
throw ex;
} finally {
if (session != null) {
session.close();
}
}
} /**
* 根据指定的方法信息和参数信息,构造合式的Message。
* @param session 当前JMS的Session实例。
* @param method 方法信息。
* @param args 参数信息。
* @return 构造好的ObjectMessage.
* @throws javax.jms.JMSException
*/
private ObjectMessage buildMessage(Session session, Method method,
Object[] args)
throws JMSException, Exception {
String methodName = method.getName();
//调用参数 Map<String, Serializable> callInfo = new HashMap<String, Serializable>(3);
callInfo.put(JmsConfig.METHOD_NAME, methodName);
Class[] paramsClass = method.getParameterTypes();
if (paramsClass.length > 0) {
callInfo.put(JmsConfig.PARAMETER_LIST_VALUE, args);
callInfo.put(JmsConfig.PARAMETER_LIST_CLASS,
method.getParameterTypes());
} ObjectMessage message = session.createObjectMessage(
(Serializable) callInfo);
message.setStringProperty(JmsConfig.MESSAGE_SELECT_KEY,
JmsConfig.MESSAGE_SELECT_VALUE);
message.setJMSCorrelationID(IdGenerate.getUUIDString());
message.setStringProperty(JmsConfig.SERVICE_NAME,
clientExporter.getServiceName());
message.setStringProperty(JmsConfig.SELF_IDENTITY,
clientExporter.getSelfIdentity()); if (log.isDebugEnabled()) {
log.debug("Name of the method call " + methodName);
if (args != null) {
log.debug("Parameter type is " + Arrays.toString(args));
} else {
log.debug("Parameter is null.");
}
log.debug("Message is " + message);
} return message;
} /**
* 发送调用信息消息。
* @param session
* @param message
* @throws javax.jms.JMSException
*/
private void sendMessage(Session session, ObjectMessage message) throws
JMSException {
MessageProducer producer = null;
try {
producer = session.createProducer(clientExporter.getSendQueue());
producer.setDeliveryMode(clientExporter.getDeliveryMode()); if (clientExporter.getSendTimeLive() > 0) {
producer.setTimeToLive(clientExporter.getSendTimeLive());
} producer.send(message); if (log.isDebugEnabled()) {
log.debug("Send request message.");
}
} finally {
if (producer != null) {
producer.close();
}
}
} /**
* 阻塞的接收服务端的回复,至到接收到消息或者超时。
* @param messageID 消息的连接ID.
* @return 消息调用结果。
* @throws javax.jms.JMSException
* @throws java.lang.InterruptedException
*/
@SuppressWarnings("unchecked")
private Map<String, Serializable> receiveMessage(String messageID)
throws JMSException, InterruptedException {
Map<String, Serializable> returnCollection = null;
long count = 0;//已经过去毫秒数计时,此值大于timeOut即退出循环。
long timeOut = clientExporter.getReceiveTimeOut();
//开始阻塞方法,直到超时或者接收到消息。
RuntimeException timeOutException = null;
while (true) {
returnCollection = clientExporter.findMessage(messageID);
if (returnCollection == null) {
if (count > timeOut) {
timeOutException = new RuntimeException("Request timed out.");
break;
} else {
if (timeOut > 0) {
count += sleepTime;
}
synchronized (this) {
this.wait(sleepTime);
}
}
} else {
break;
}
}
if (timeOutException != null) {
clientExporter.abandoned(messageID);
throw timeOutException;
} return returnCollection;
}
}
log.debug(
CheckUtil.replaceArgs(
"Receive the remote method response message, the message id is (0).",
jmsId));
}建议使用 MessageFormat.format 方法,呵呵
CheckUtil是一些我自己用的工具类,呵方便自己。