服务端public class JmsServiceExporter {
private Object target;
private ConnectionFactory connectionFactory;
private Connection connection;
private Destination sendQueue;//接收消息的队列。
private Destination receiveQueue;//返回消息的队列。
private long timeLive = 0;
private Session receiveSession;//接收消息的Session
private String serviceName;//服务名称
//发送模式,默认不持久
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
//执行的线程池
private final ExecutorService threadPool =
Executors.newCachedThreadPool();
private static final Log log = LogFactory.getLog(JmsServiceExporter.class); /**
* 初始化方法,会检查必要的资源是否已经准备完备。
* @throws java.lang.Exception 初始化异常。
*/
public void init() throws Exception {
Exception ex;
//省略必须的资源检查
this.afterPropertiesSet();
} /**
* 销毁动作,结束所有服务的监听。
* @throws java.lang.Exception 结束服务时出现异常。
*/
public void destroy() throws Exception {
if (receiveSession != null) {
receiveSession.close();
}
if (getConnection() != null) {
getConnection().close();
}
if (getThreadPool() != null) {
getThreadPool().shutdown();
}
} //省略了属性的get/set方法 /**
* 启动消息监听。
* @throws JMSException 启动失败。
*/
protected void afterPropertiesSet() throws JMSException {
connection = this.getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); StringBuilder serviceSelect = new StringBuilder();
serviceSelect.append("(");
serviceSelect.append(JmsConfig.SERVICE_NAME);
serviceSelect.append("='");
serviceSelect.append(this.getServiceName());
serviceSelect.append("'");
serviceSelect.append(" and ");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_KEY);
serviceSelect.append("='");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_VALUE);
serviceSelect.append("')"); connection.start(); receiveSession.createConsumer(receiveQueue,
serviceSelect.toString()).setMessageListener(
new MethodCallListener(this)); log.info("Start the remote service: " + this.getServiceName());
}
}客户端public class JmsClientExporter {
private String serviceInterface;
private Connection connection;
private ConnectionFactory connectionFactory;
private Queue sendQueue;//发布队列。
private Queue receiveQueue;//接收的队列。
private long sendTimeLive = 0;//发送消息的生命周期,单位毫秒。
private long receiveTimeOut = 60000L;//接收回复消息的超时时间,单位毫秒。
private String serviceName;
private Class interfaceClass;
private boolean ready = false;
private int clearMessageDelay = 1;
private final String selfIdentity = IdGenerate.getUUIDString();
private Session receiveSession = null;
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
private static final Log log = LogFactory.getLog(JmsClientExporter.class);
private static final Map<String, Map<String, Serializable>> returnPool =
new ConcurrentHashMap<String, Map<String, Serializable>>();
private static final java.util.Queue<String> failureQueue =
new ConcurrentLinkedQueue<String>();
private final ScheduledExecutorService clearMessagePool =
Executors.newScheduledThreadPool(1);
//省略了属性的get/set方法 /**
* 初始化方法。
* @throws Exception
*/
public void init() throws Exception {
//省略属性的检查
connection = getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
StringBuilder buff = new StringBuilder();
buff.append("(");
buff.append(JmsConfig.SERVICE_NAME);
buff.append("='");
buff.append(this.getServiceName());
buff.append("' and ");
buff.append("");
buff.append(JmsConfig.SELF_IDENTITY);
buff.append("='");
buff.append(this.selfIdentity);
buff.append("')");
receiveSession.createConsumer(receiveQueue,
buff.toString()).setMessageListener(new MessageReceiveListener());
connection.start();
clearMessagePool.scheduleWithFixedDelay(
new ClearExpiredMessageTask(),
getClearMessageDelay(),
getClearMessageDelay(),
TimeUnit.SECONDS);
this.ready = true;
} /**
* 销毁方法。
* @throws javax.jms.JMSException
*/
public void destroy() throws JMSException {
if (receiveSession != null) { receiveSession.close(); }
if (connection != null) { connection.close(); }
clearMessagePool.shutdown();
} /**
* 放弃接收消息。
* @param JMSCorrelationID 消息的连接id.
*/
public void abandoned(String JMSCorrelationID) {
boolean action = true;
try {
action = failureQueue.offer(JMSCorrelationID);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
} if (!action) {
//没有成功进入放弃消息列表的处理
}
} /**
* 返回指定ID的消息,如果没有将返回null。
* @param JMSCorrelationID 消息连接ID.
* @return 消息,如果不存在将返回null。
*/
public Map<String, Serializable> findMessage(String JMSCorrelationID) {
if (returnPool.containsKey(JMSCorrelationID)) {
return returnPool.remove(JMSCorrelationID);
} else {
return null;
}
} /**
* 返回对接口的代理实例。
* @return 代理实例。
* @throws Exception 没有正确初始化。
*/
public Object getObject() throws Exception {
if (this.ready) {
InvocationHandler handler = new MethodCallClinetInvocationHandler(
this);
return ProxyFactory.createInterfactProxy(this.getInterfaceClass(),
handler);
} else {
throw new Exception(
serviceName + " has not been properly initialized.");
}
} /**
* 接收服务端回复消息的监听器,只会将类型为ObjectMessage和有JMSCorrelationID属性
* 的消息放入消息池中。
*/
private class MessageReceiveListener implements MessageListener { public void onMessage(Message message) {
try {
String jmsId = message.getJMSCorrelationID(); if (log.isDebugEnabled()) {
log.debug(
CheckUtil.replaceArgs(
"Receive the remote method response message, the message id is {0}.",
jmsId));
} if (jmsId != null && ObjectMessage.class.isInstance(message)) {
ObjectMessage objMessage = (ObjectMessage) message;
Map<String, Serializable> returnCollection =
(Map<String, Serializable>) objMessage.getObject();
returnPool.put(jmsId, returnCollection); }
} catch (JMSException ex) {
log.error(ex.getMessage(), ex);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
} /**
* 清理过期失效的消息任务。
*/
private class ClearExpiredMessageTask implements Runnable { public void run() {
String jmsID = null;
try {
jmsID = failureQueue.poll();
if (jmsID != null) {
returnPool.remove(jmsID);
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
private Object target;
private ConnectionFactory connectionFactory;
private Connection connection;
private Destination sendQueue;//接收消息的队列。
private Destination receiveQueue;//返回消息的队列。
private long timeLive = 0;
private Session receiveSession;//接收消息的Session
private String serviceName;//服务名称
//发送模式,默认不持久
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
//执行的线程池
private final ExecutorService threadPool =
Executors.newCachedThreadPool();
private static final Log log = LogFactory.getLog(JmsServiceExporter.class); /**
* 初始化方法,会检查必要的资源是否已经准备完备。
* @throws java.lang.Exception 初始化异常。
*/
public void init() throws Exception {
Exception ex;
//省略必须的资源检查
this.afterPropertiesSet();
} /**
* 销毁动作,结束所有服务的监听。
* @throws java.lang.Exception 结束服务时出现异常。
*/
public void destroy() throws Exception {
if (receiveSession != null) {
receiveSession.close();
}
if (getConnection() != null) {
getConnection().close();
}
if (getThreadPool() != null) {
getThreadPool().shutdown();
}
} //省略了属性的get/set方法 /**
* 启动消息监听。
* @throws JMSException 启动失败。
*/
protected void afterPropertiesSet() throws JMSException {
connection = this.getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE); StringBuilder serviceSelect = new StringBuilder();
serviceSelect.append("(");
serviceSelect.append(JmsConfig.SERVICE_NAME);
serviceSelect.append("='");
serviceSelect.append(this.getServiceName());
serviceSelect.append("'");
serviceSelect.append(" and ");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_KEY);
serviceSelect.append("='");
serviceSelect.append(JmsConfig.MESSAGE_SELECT_VALUE);
serviceSelect.append("')"); connection.start(); receiveSession.createConsumer(receiveQueue,
serviceSelect.toString()).setMessageListener(
new MethodCallListener(this)); log.info("Start the remote service: " + this.getServiceName());
}
}客户端public class JmsClientExporter {
private String serviceInterface;
private Connection connection;
private ConnectionFactory connectionFactory;
private Queue sendQueue;//发布队列。
private Queue receiveQueue;//接收的队列。
private long sendTimeLive = 0;//发送消息的生命周期,单位毫秒。
private long receiveTimeOut = 60000L;//接收回复消息的超时时间,单位毫秒。
private String serviceName;
private Class interfaceClass;
private boolean ready = false;
private int clearMessageDelay = 1;
private final String selfIdentity = IdGenerate.getUUIDString();
private Session receiveSession = null;
private int deliveryMode = DeliveryMode.NON_PERSISTENT;
private static final Log log = LogFactory.getLog(JmsClientExporter.class);
private static final Map<String, Map<String, Serializable>> returnPool =
new ConcurrentHashMap<String, Map<String, Serializable>>();
private static final java.util.Queue<String> failureQueue =
new ConcurrentLinkedQueue<String>();
private final ScheduledExecutorService clearMessagePool =
Executors.newScheduledThreadPool(1);
//省略了属性的get/set方法 /**
* 初始化方法。
* @throws Exception
*/
public void init() throws Exception {
//省略属性的检查
connection = getConnectionFactory().createConnection();
receiveSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
StringBuilder buff = new StringBuilder();
buff.append("(");
buff.append(JmsConfig.SERVICE_NAME);
buff.append("='");
buff.append(this.getServiceName());
buff.append("' and ");
buff.append("");
buff.append(JmsConfig.SELF_IDENTITY);
buff.append("='");
buff.append(this.selfIdentity);
buff.append("')");
receiveSession.createConsumer(receiveQueue,
buff.toString()).setMessageListener(new MessageReceiveListener());
connection.start();
clearMessagePool.scheduleWithFixedDelay(
new ClearExpiredMessageTask(),
getClearMessageDelay(),
getClearMessageDelay(),
TimeUnit.SECONDS);
this.ready = true;
} /**
* 销毁方法。
* @throws javax.jms.JMSException
*/
public void destroy() throws JMSException {
if (receiveSession != null) { receiveSession.close(); }
if (connection != null) { connection.close(); }
clearMessagePool.shutdown();
} /**
* 放弃接收消息。
* @param JMSCorrelationID 消息的连接id.
*/
public void abandoned(String JMSCorrelationID) {
boolean action = true;
try {
action = failureQueue.offer(JMSCorrelationID);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
} if (!action) {
//没有成功进入放弃消息列表的处理
}
} /**
* 返回指定ID的消息,如果没有将返回null。
* @param JMSCorrelationID 消息连接ID.
* @return 消息,如果不存在将返回null。
*/
public Map<String, Serializable> findMessage(String JMSCorrelationID) {
if (returnPool.containsKey(JMSCorrelationID)) {
return returnPool.remove(JMSCorrelationID);
} else {
return null;
}
} /**
* 返回对接口的代理实例。
* @return 代理实例。
* @throws Exception 没有正确初始化。
*/
public Object getObject() throws Exception {
if (this.ready) {
InvocationHandler handler = new MethodCallClinetInvocationHandler(
this);
return ProxyFactory.createInterfactProxy(this.getInterfaceClass(),
handler);
} else {
throw new Exception(
serviceName + " has not been properly initialized.");
}
} /**
* 接收服务端回复消息的监听器,只会将类型为ObjectMessage和有JMSCorrelationID属性
* 的消息放入消息池中。
*/
private class MessageReceiveListener implements MessageListener { public void onMessage(Message message) {
try {
String jmsId = message.getJMSCorrelationID(); if (log.isDebugEnabled()) {
log.debug(
CheckUtil.replaceArgs(
"Receive the remote method response message, the message id is {0}.",
jmsId));
} if (jmsId != null && ObjectMessage.class.isInstance(message)) {
ObjectMessage objMessage = (ObjectMessage) message;
Map<String, Serializable> returnCollection =
(Map<String, Serializable>) objMessage.getObject();
returnPool.put(jmsId, returnCollection); }
} catch (JMSException ex) {
log.error(ex.getMessage(), ex);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
} /**
* 清理过期失效的消息任务。
*/
private class ClearExpiredMessageTask implements Runnable { public void run() {
String jmsID = null;
try {
jmsID = failureQueue.poll();
if (jmsID != null) {
returnPool.remove(jmsID);
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
private JmsClientExporter clientExporter;
private Log log;
private static final long sleepTime = 30;
/**
* 构造方法,需要客户端构造工厂。
* @param clientExporter 客户端构造工厂。
*/
public MethodCallClinetInvocationHandler(JmsClientExporter clientExporter) {
this.clientExporter = clientExporter;
log = JmsClientExporter.getLogger();
} /**
* 实际的拦截方法,每次调用目标对象方法都会被此方法拦截。转换成JMS通信,获取远程方法的
* 返回结果。
* @param proxy 代理类。
* @param method 调用的方法。
* @param args 调用方法时的参数。
* @return 调用结果。
* @throws java.lang.Throwable
*/
public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
Connection connection = clientExporter.getConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
try {
ObjectMessage requestMessage = buildMessage(session, method, args);
sendMessage(session, requestMessage); Map<String, Serializable> returnCollection = receiveMessage(
requestMessage.getJMSCorrelationID()); Exception returnException =
(Exception) returnCollection.get(JmsConfig.RETURN_EXCEPTION); Object returnValue = returnCollection.get(JmsConfig.RETURN_VALUE); if (returnException != null) {
throw returnException;
} else {
return returnValue;
} } catch (JMSException ex) {
log.error(ex);
throw ex;
} catch (Exception ex) {
log.error(ex);
throw ex;
} finally {
if (session != null) {
session.close();
}
}
} /**
* 根据指定的方法信息和参数信息,构造合式的Message。
* @param session 当前JMS的Session实例。
* @param method 方法信息。
* @param args 参数信息。
* @return 构造好的ObjectMessage.
* @throws javax.jms.JMSException
*/
private ObjectMessage buildMessage(Session session, Method method,
Object[] args)
throws JMSException, Exception {
String methodName = method.getName();
Map<String, Serializable> callInfo = new HashMap<String, Serializable>(3);
callInfo.put(JmsConfig.METHOD_NAME, methodName);
Class[] paramsClass = method.getParameterTypes();
if (paramsClass.length > 0) {
callInfo.put(JmsConfig.PARAMETER_LIST_VALUE, args);
callInfo.put(JmsConfig.PARAMETER_LIST_CLASS,
method.getParameterTypes());
} ObjectMessage message = session.createObjectMessage(
(Serializable) callInfo);
message.setStringProperty(JmsConfig.MESSAGE_SELECT_KEY,
JmsConfig.MESSAGE_SELECT_VALUE);
message.setJMSCorrelationID(IdGenerate.getUUIDString());
message.setStringProperty(JmsConfig.SERVICE_NAME,
clientExporter.getServiceName());
message.setStringProperty(JmsConfig.SELF_IDENTITY,
clientExporter.getSelfIdentity()); if (log.isDebugEnabled()) {
log.debug("Name of the method call " + methodName);
if (args != null) {
log.debug("Parameter type is " + Arrays.toString(args));
} else {
log.debug("Parameter is null.");
}
log.debug("Message is " + message);
} return message;
} /**
* 发送调用信息消息。
* @param session
* @param message
* @throws javax.jms.JMSException
*/
private void sendMessage(Session session, ObjectMessage message) throws
JMSException {
MessageProducer producer = null;
try {
producer = session.createProducer(clientExporter.getSendQueue());
producer.setDeliveryMode(clientExporter.getDeliveryMode()); if (clientExporter.getSendTimeLive() > 0) {
producer.setTimeToLive(clientExporter.getSendTimeLive());
} producer.send(message); if (log.isDebugEnabled()) {
log.debug("Send request message.");
}
} finally {
if (producer != null) {
producer.close();
}
}
} /**
* 阻塞的接收服务端的回复,至到接收到消息或者超时。
* @param messageID 消息的连接ID.
* @return 消息调用结果。
* @throws javax.jms.JMSException
* @throws java.lang.InterruptedException
*/
@SuppressWarnings("unchecked")
private Map<String, Serializable> receiveMessage(String messageID)
throws JMSException, InterruptedException {
Map<String, Serializable> returnCollection = null;
long count = 0;//已经过去毫秒数计时,此值大于timeOut即退出循环。
long timeOut = clientExporter.getReceiveTimeOut();
//开始阻塞方法,直到超时或者接收到消息。
RuntimeException timeOutException = null;
while (true) {
returnCollection = clientExporter.findMessage(messageID);
if (returnCollection == null) {
if (count > timeOut) {
timeOutException = new RuntimeException("Request timed out.");
break;
} else {
if (timeOut > 0) {
count += sleepTime;
}
synchronized (this) {
this.wait(sleepTime);
}
}
} else {
break;
}
}
if (timeOutException != null) {
clientExporter.abandoned(messageID);
throw timeOutException;
}
return returnCollection;
}
}用Spring来远程调用时测试了并发到400左右即出现慢速消费者的问题,而且配置也麻烦些。所以自己实现了一下,每一个客户端保持一个消息消费者来接收分发消息根据JMSCorrelationID来连接消息。