【服务器系统】
Microsoft Windows Server 2003 R2 Enterprise x64  Edition Service Pack2
【问题】
服务器上,我通过用JProfiler直接启动Tomcat,从而启动项目。项目启动时,执行代码:MqttClientUtil.getInstance(),去连接mqtt服务器broker。但项目连接上broker后,立即又会断开。因为我在MqttClientUtil中写了断开重连机制,故看到的问题现象为:项目连接上broker,断开,重连成功,断开,重连成功,断开......一直反复下去。这个问题该如何修复呐???
【问题补充】
1、通过安装包启动项目,可正常连接broker,不会出现断开、重连现象。
2、我自身机器是Win7 32位系统,在我机器上通过JProfiler直接启动Tomcat,可正常连接broker,不会出现断开、重连现象。
MqttClientUtil类:
public class MqttClientUtil {    private static Logger logger = Logger.getLogger(MqttClientUtil.class);
    private final static String CONNECTION_STRING = "tcp://localhost:1883";
    private final static boolean CLEAN_START = true; //true:不记忆连接中的任何状态
    private final static short KEEP_ALIVE = 60;      //设置心跳60s  
    private final static String CLIENT_ID = "client_sqjw";//客户端标识  
//    private final static String[] TOPICS = {
//        "client/keepalive","Test/TestTopics/Topic2", "Test/TestTopics/Topic3"};
    /**
     * 服务质量: 0:“至多一次”,消息发布完全依赖底层 TCP/IP
     * 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
     * 1:“至少一次”,确保消息到达,但消息重复可能会发生。
     * 2:“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
     */
    private final static int[] QOS_VALUES = {0, 1, 2};
    private final static boolean saveflag = false;//true:服务器 broker保存该信息;false:服务器 broker不保存该信息
    private static MqttClientUtil instance = new MqttClientUtil();
    private MqttClient mqttClient;
    private mqttConnectionThread mqttConnectionThread;    private MqttClientUtil() {
        mqttConnectionThread = new mqttConnectionThread();
        mqttConnectionThread.setDaemon(true);
        mqttConnectionThread.start();
    }    public void close() {
        if (mqttConnectionThread != null) {
            mqttConnectionThread.close();
        }
    }    class mqttConnectionThread extends Thread {        //判断线程是否启动
        private boolean isrun = true;        public void close() {
            isrun = false;
        }        @Override
        public void run() {
            int index = 0;
            int sleepTime = 1000;
            while (isrun) {
                try {
                    index++;
                    //创建MqttClient对象,连接到指定的地址  
                    mqttClient = new MqttClient(CONNECTION_STRING);
                    //创建回调处理器
                    SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
                    // 注册接收消息方法
                    mqttClient.registerSimpleHandler(simpleCallbackHandler);
                    //创建连接 
                    mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);                    Thread.sleep(sleepTime);                    logger.debug("后台mqtt客户端连接服务器 broker成功!");                    break;
                } catch (Exception e) {
                    if (index == 50) {
                        sleepTime = 60000;
                        logger.debug("后台mqtt客户端连接服务器,一分钟一次");
                    }
                    if (index == 60) {//退出,不再进行自动连接
                        close();
                    }
                    logger.error("后台mqtt客户端连接服务器 broker失败!", e);
                }
            }
            logger.debug("MqttClient自动连接服务器 broker结束");
        }
    }    /**
     * 返回实例对象
     *
     * @return
     */
    public static MqttClientUtil getInstance() {
        return instance;
    }    /**
     * 发送消息
     *
     * @param clientId
     * @param messageId
     */
    public void sendMessage(String clientId, String message) {
        try {
            logger.debug("发送消息:" + "主题:" + clientId + " 内容:" + message);
            mqttClient.publish(clientId, message.getBytes(),
                    QOS_VALUES[0], saveflag);
        } catch (MqttException e) {
            logger.error("发送消息(" + message + ")失败!", e);
        }
    }    /**
     * 简单回调函数,处理client接收到的主题消息
     */
    class SimpleCallbackHandler implements MqttSimpleCallback {        /**
         * 当客户机和broker意外断开时触发 可以在此处理重新订阅
         */
        @Override
        public void connectionLost() throws Exception {
            logger.error("客户机和服务器broker已经断开,进行重新注册客户端");
            new mqttConnectionThread().start();
        }        /**
         * 客户端订阅消息后,该方法负责回调接收处理消息
         */
        @Override
        public void publishArrived(String topicName, byte[] payload, int Qos,
                boolean retained) throws Exception {
            logger.info("订阅主题: " + topicName);
            logger.info("消息数据: " + new String(payload));
            logger.info("消息级别(0,1,2): " + Qos);
            logger.info("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);
        }
    }

解决方案 »

  1.   

    我遇到过 貌似是因为是你的消息队列中消息太多  还有种情况是同一个ID 有多个设备用这个id连接
      

  2.   


    1、我刚启动项目就连不上,我的项目就相当于一个mqttclient,还没有发消息,消息太多该如何理解?
    2、mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE),这个方法中的CLIENT_ID就是mqttclient的id,我已做了修改,保证了CLIENT_ID是唯一的,依然无效
      

  3.   


    1、我刚启动项目就连不上,我的项目就相当于一个mqttclient,还没有发消息,消息太多该如何理解?
    2、mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE),这个方法中的CLIENT_ID就是mqttclient的id,我已做了修改,保证了CLIENT_ID是唯一的,依然无效mqtt支持离线的呀 你之前测试的消息  在这个id订阅了主题之后  就会全部发过来 你broker用的什么?
      

  4.   

    mqtt支持离线的呀 你之前测试的消息  在这个id订阅了主题之后  就会全部发过来 你broker用的什么?
    我的broker是个exe程序“broker.exe”,双击后显示信息框(和cmd指令框类似)。我的整体框架是,一个后台,多个前端。后台注册为“client_sqjw”,多个前端分别使用自己的编号(唯一)注册为客户端。我发消息的方向也是单向的,“client_sqjw”只负责发消息,前端客户端只负责收。这个问题非常奇怪的是:
    1、通过安装包启动后台,可正常连接broker,不会出现断开、重连现象。
    2、我自身机器是Win7 32位系统,在我机器上通过JProfiler直接启动Tomcat,可正常连接broker,不会出现断开、重连现象。
    感觉和系统、启动方式有关,不明白怎么回事???
      

  5.   

    前辈们好 ,本人新手小白做j2ee服务器的框架用的是ssh  这mqtt 搭建在tomcat 怎么实现啊? 我在网上看到的 都是apollo 的例子 我按照步骤实现了向手机推送消息但是感觉了解还是很少 。  希望大家多多关照。如果有dome最好发一份给我邮箱386740421@qq.com 。在此感激不尽。
      

  6.   

    不是指mqttutil内部,主程序中getinstance以及后续操作都重新放入一个新线程处理