服务端public class JmsServiceExporter {
    private Object target;
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Destination sendQueue;//接收消息的队列。
    private Destination receiveQueue;//返回消息的队列。
    private long timeLive = 0;
    private Session receiveSession;//接收消息的Session
    private String serviceName;//服务名称
    //发送模式,默认不持久
    private int deliveryMode = DeliveryMode.NON_PERSISTENT;
    //执行的线程池
    private final ExecutorService threadPool =
            Executors.newCachedThreadPool();
    private static final Log log = LogFactory.getLog(JmsServiceExporter.class);    /**
     * 初始化方法,会检查必要的资源是否已经准备完备。
     * @throws java.lang.Exception 初始化异常。
     */
    public void init() throws Exception {
        Exception ex;
        //省略必须的资源检查
        this.afterPropertiesSet();
    }    /**
     * 销毁动作,结束所有服务的监听。
     * @throws java.lang.Exception 结束服务时出现异常。
     */
    public void destroy() throws Exception {
        if (receiveSession != null) {
            receiveSession.close();
        }
        if (getConnection() != null) {
            getConnection().close();
        }
        if (getThreadPool() != null) {
            getThreadPool().shutdown();
        }
    }    //省略了属性的get/set方法    /**
     * 启动消息监听。
     * @throws JMSException 启动失败。
     */
    protected void afterPropertiesSet() throws JMSException {
        connection = this.getConnectionFactory().createConnection();
        receiveSession = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);        StringBuilder serviceSelect = new StringBuilder();
        serviceSelect.append("(");
        serviceSelect.append(JmsConfig.SERVICE_NAME);
        serviceSelect.append("='");
        serviceSelect.append(this.getServiceName());
        serviceSelect.append("'");
        serviceSelect.append(" and ");
        serviceSelect.append(JmsConfig.MESSAGE_SELECT_KEY);
        serviceSelect.append("='");
        serviceSelect.append(JmsConfig.MESSAGE_SELECT_VALUE);
        serviceSelect.append("')");        connection.start();        receiveSession.createConsumer(receiveQueue,
                serviceSelect.toString()).setMessageListener(
                new MethodCallListener(this));        log.info("Start the remote service: " + this.getServiceName());
    }
}客户端public class JmsClientExporter {
    private String serviceInterface;
    private Connection connection;
    private ConnectionFactory connectionFactory;
    private Queue sendQueue;//发布队列。
    private Queue receiveQueue;//接收的队列。
    private long sendTimeLive = 0;//发送消息的生命周期,单位毫秒。
    private long receiveTimeOut = 60000L;//接收回复消息的超时时间,单位毫秒。
    private String serviceName;
    private Class interfaceClass;
    private boolean ready = false;
    private int clearMessageDelay = 1;
    private final String selfIdentity = IdGenerate.getUUIDString();
    private Session receiveSession = null;
    private int deliveryMode = DeliveryMode.NON_PERSISTENT;
    private static final Log log = LogFactory.getLog(JmsClientExporter.class);
    private static final Map<String, Map<String, Serializable>> returnPool =
            new ConcurrentHashMap<String, Map<String, Serializable>>();
    private static final java.util.Queue<String> failureQueue =
            new ConcurrentLinkedQueue<String>();
    private final ScheduledExecutorService clearMessagePool =
            Executors.newScheduledThreadPool(1);
    
    //省略了属性的get/set方法    /**
     * 初始化方法。
     * @throws Exception 
     */
    public void init() throws Exception {
        //省略属性的检查
        connection = getConnectionFactory().createConnection();
        receiveSession = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        StringBuilder buff = new StringBuilder();
        buff.append("(");
        buff.append(JmsConfig.SERVICE_NAME);
        buff.append("='");
        buff.append(this.getServiceName());
        buff.append("' and ");
        buff.append("");
        buff.append(JmsConfig.SELF_IDENTITY);
        buff.append("='");
        buff.append(this.selfIdentity);
        buff.append("')");
        receiveSession.createConsumer(receiveQueue,
                buff.toString()).setMessageListener(new MessageReceiveListener());
        connection.start();
        clearMessagePool.scheduleWithFixedDelay(
                new ClearExpiredMessageTask(),
                getClearMessageDelay(),
                getClearMessageDelay(),
                TimeUnit.SECONDS);
        this.ready = true;
    }    /**
     * 销毁方法。
     * @throws javax.jms.JMSException
     */
    public void destroy() throws JMSException {
        if (receiveSession != null) { receiveSession.close(); }
        if (connection != null) { connection.close(); }
        clearMessagePool.shutdown();
    }    /**
     * 放弃接收消息。
     * @param JMSCorrelationID 消息的连接id.
     */
    public void abandoned(String JMSCorrelationID) {
        boolean action = true;
        try {
            action = failureQueue.offer(JMSCorrelationID);
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }        if (!action) {
            //没有成功进入放弃消息列表的处理
        }
    }    /**
     * 返回指定ID的消息,如果没有将返回null。
     * @param JMSCorrelationID 消息连接ID.
     * @return 消息,如果不存在将返回null。
     */
    public Map<String, Serializable> findMessage(String JMSCorrelationID) {
        if (returnPool.containsKey(JMSCorrelationID)) {
            return returnPool.remove(JMSCorrelationID);
        } else {
            return null;
        }
    }    /**
     * 返回对接口的代理实例。
     * @return 代理实例。
     * @throws Exception 没有正确初始化。
     */
    public Object getObject() throws Exception {
        if (this.ready) {
            InvocationHandler handler = new MethodCallClinetInvocationHandler(
                    this);
            return ProxyFactory.createInterfactProxy(this.getInterfaceClass(),
                    handler);
        } else {
            throw new Exception(
                    serviceName + " has not been properly initialized.");
        }
    }    /**
     * 接收服务端回复消息的监听器,只会将类型为ObjectMessage和有JMSCorrelationID属性
     * 的消息放入消息池中。
     */
    private class MessageReceiveListener implements MessageListener {        public void onMessage(Message message) {
            try {
                String jmsId = message.getJMSCorrelationID();                if (log.isDebugEnabled()) {
                    log.debug(
                            CheckUtil.replaceArgs(
                            "Receive the remote method response message, the message id is {0}.",
                            jmsId));
                }                if (jmsId != null && ObjectMessage.class.isInstance(message)) {
                    ObjectMessage objMessage = (ObjectMessage) message;
                    Map<String, Serializable> returnCollection =
                            (Map<String, Serializable>) objMessage.getObject();
                    returnPool.put(jmsId, returnCollection);                }
            } catch (JMSException ex) {
                log.error(ex.getMessage(), ex);
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    }    /**
     * 清理过期失效的消息任务。
     */
    private class ClearExpiredMessageTask implements Runnable {        public void run() {
            String jmsID = null;
            try {
                jmsID = failureQueue.poll();
                if (jmsID != null) {
                    returnPool.remove(jmsID);
                }
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    }
}

解决方案 »

  1.   

    这是客户端的代理实现public class MethodCallClinetInvocationHandler implements InvocationHandler {
        private JmsClientExporter clientExporter;
        private Log log;
        private static final long sleepTime = 30;
        /**
         * 构造方法,需要客户端构造工厂。
         * @param clientExporter 客户端构造工厂。
         */
        public MethodCallClinetInvocationHandler(JmsClientExporter clientExporter) {
            this.clientExporter = clientExporter;
            log = JmsClientExporter.getLogger();
        }    /**
         * 实际的拦截方法,每次调用目标对象方法都会被此方法拦截。转换成JMS通信,获取远程方法的
         * 返回结果。
         * @param proxy 代理类。
         * @param method 调用的方法。
         * @param args 调用方法时的参数。
         * @return 调用结果。
         * @throws java.lang.Throwable
         */
        public Object invoke(Object proxy, Method method, Object[] args) throws
                Throwable {
            Connection connection = clientExporter.getConnection();
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            try {
                ObjectMessage requestMessage = buildMessage(session, method, args);
                sendMessage(session, requestMessage);            Map<String, Serializable> returnCollection = receiveMessage(
                        requestMessage.getJMSCorrelationID());            Exception returnException =
                        (Exception) returnCollection.get(JmsConfig.RETURN_EXCEPTION);            Object returnValue = returnCollection.get(JmsConfig.RETURN_VALUE);            if (returnException != null) {
                    throw returnException;
                } else {
                    return returnValue;
                }        } catch (JMSException ex) {
                log.error(ex);
                throw ex;
            } catch (Exception ex) {
                log.error(ex);
                throw ex;
            } finally {
                if (session != null) {
                    session.close();
                }
            }
        }    /**
         * 根据指定的方法信息和参数信息,构造合式的Message。
         * @param session 当前JMS的Session实例。
         * @param method 方法信息。
         * @param args 参数信息。
         * @return 构造好的ObjectMessage.
         * @throws javax.jms.JMSException
         */
        private ObjectMessage buildMessage(Session session, Method method,
                Object[] args)
                throws JMSException, Exception {
            String methodName = method.getName();
            Map<String, Serializable> callInfo = new HashMap<String, Serializable>(3);
            callInfo.put(JmsConfig.METHOD_NAME, methodName);
            Class[] paramsClass = method.getParameterTypes();
            if (paramsClass.length > 0) {
                callInfo.put(JmsConfig.PARAMETER_LIST_VALUE, args);
                callInfo.put(JmsConfig.PARAMETER_LIST_CLASS,
                        method.getParameterTypes());
            }        ObjectMessage message = session.createObjectMessage(
                    (Serializable) callInfo);
            message.setStringProperty(JmsConfig.MESSAGE_SELECT_KEY,
                    JmsConfig.MESSAGE_SELECT_VALUE);
            message.setJMSCorrelationID(IdGenerate.getUUIDString());
            message.setStringProperty(JmsConfig.SERVICE_NAME,
                    clientExporter.getServiceName());
            message.setStringProperty(JmsConfig.SELF_IDENTITY,
                    clientExporter.getSelfIdentity());        if (log.isDebugEnabled()) {
                log.debug("Name of the method call " + methodName);
                if (args != null) {
                    log.debug("Parameter type is " + Arrays.toString(args));
                } else {
                    log.debug("Parameter is null.");
                }
                log.debug("Message is " + message);
            }        return message;
        }    /**
         * 发送调用信息消息。
         * @param session
         * @param message
         * @throws javax.jms.JMSException
         */
        private void sendMessage(Session session, ObjectMessage message) throws
                JMSException {
            MessageProducer producer = null;
            try {
                producer = session.createProducer(clientExporter.getSendQueue());
                producer.setDeliveryMode(clientExporter.getDeliveryMode());            if (clientExporter.getSendTimeLive() > 0) {
                    producer.setTimeToLive(clientExporter.getSendTimeLive());
                }            producer.send(message);            if (log.isDebugEnabled()) {
                    log.debug("Send request message.");
                }
            } finally {
                if (producer != null) {
                    producer.close();
                }
            }
        }    /**
         * 阻塞的接收服务端的回复,至到接收到消息或者超时。
         * @param messageID 消息的连接ID.
         * @return 消息调用结果。
         * @throws javax.jms.JMSException
         * @throws java.lang.InterruptedException
         */
        @SuppressWarnings("unchecked")
        private Map<String, Serializable> receiveMessage(String messageID)
                throws JMSException, InterruptedException {
            Map<String, Serializable> returnCollection = null;
            long count = 0;//已经过去毫秒数计时,此值大于timeOut即退出循环。
            long timeOut = clientExporter.getReceiveTimeOut();
            //开始阻塞方法,直到超时或者接收到消息。
            RuntimeException timeOutException = null;
            while (true) {
                returnCollection = clientExporter.findMessage(messageID);
                if (returnCollection == null) {
                    if (count > timeOut) {
                        timeOutException = new RuntimeException("Request timed out.");
                        break;
                    } else {
                        if (timeOut > 0) {
                            count += sleepTime;
                        }
                        synchronized (this) {
                            this.wait(sleepTime);
                        }
                    }
                } else {
                    break;
                }
            }
            if (timeOutException != null) {
                clientExporter.abandoned(messageID);
                throw timeOutException;
            }
            return returnCollection;
        }
    }用Spring来远程调用时测试了并发到400左右即出现慢速消费者的问题,而且配置也麻烦些。所以自己实现了一下,每一个客户端保持一个消息消费者来接收分发消息根据JMSCorrelationID来连接消息。