技术菜鸟一枚,现在接到一个需求,用Modbus协议去读取底层设备数据,比如说现有100台设备,每个设备的端口和ip都不一样,就是每个设备是独立的然后会不断产生数据需要不断读取,这个肯定是要用到多线程的,但是对多线程仅仅知道片面,这个需要用线程池吗?我觉得好像不需要,我每个线程对应一个设备是要不断循环的,也就是不会销毁一直读,现在遇到的问题是------------------------------其中一个线程抛出一个异常,异常也捕获了,但整个程序会停两秒左右再运行,就是其他线程要求是要不断读取,但是现在的情况是会停两秒左右再读,这个是怎么回事。
 
线程代码如下:
package SlaveReadTest;import java.util.ArrayList;
import java.util.Date;
import java.util.List;import com.serotonin.modbus4j.ModbusFactory;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.serotonin.modbus4j.ip.IpParameters;
import com.serotonin.modbus4j.msg.ModbusRequest;
import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadCoilsRequest;
import com.serotonin.modbus4j.msg.ReadDiscreteInputsRequest;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest;
import com.serotonin.modbus4j.msg.ReadInputRegistersRequest;
import com.serotonin.util.queue.ByteQueue;import entity.ModbusParams;public class ThreadModbusTCPRead extends Thread { ModbusParams modbusParam;
static Object obj = new Object();
// 存放每个设备的掉线时刻和再次连上时刻
List<Long> timeList = new ArrayList<>(); public ThreadModbusTCPRead(ModbusParams modbusParam) {
super();
this.modbusParam = modbusParam;
} public ModbusParams getModbusParam() {
return modbusParam;
} public void setModbusParam(ModbusParams modbusParam) {
this.modbusParam = modbusParam;
} public void run() { ModbusMaster tcpMaster = null;
ByteQueue byteQueue = null; while (GlobalVariable.restart == 82) {
// 获取连接
tcpMaster = modbusTCPinit(modbusParam); // 判断是否连接上,如果不为空表示已连接进入读取循环
if (tcpMaster != null) { // 循环请求和获得响应
while (GlobalVariable.restart == 82) { synchronized (obj) { // 先检测timeList中是否有掉线时间,再存入连上时刻,单数表示断线,双数表示已连接
if (timeList.size() % 2 == 1) {
timeList.add(new Date().getTime());
long Duration = timeList.get(0) - timeList.get(1);
timeList.clear();
} // 获取请求响应的返回值
byteQueue = TCPRequestaResponse(tcpMaster, modbusParam); // 判断是否中途断开
if (byteQueue != null) { // 把在线设备的id添加进集合
GlobalVariable.onlineSet.add(modbusParam.getId()); // 保存从byteQueue取出的值
List<Integer> list = new ArrayList<Integer>(); // byteQueue前3个值为从站地址,功能码和响应值大小,需排除 for (int i = 3; i < byteQueue.size(); i++) {
int val = getUnsignedByte(byteQueue.peek(i));
list.add(val);
} // 功能码01和02保存的值是线圈和开关的状态,值是0或1,byteQueue检出的值将每八位进行了合并
if (modbusParam.getFunction() == 1 || modbusParam.getFunction() == 2) {
// 保存转换后的0,1值
List<Integer> resultList = new ArrayList<>(); for (int a = 0; a < list.size(); a++) {
int value = list.get(a);
// 将每八位合并成的数转成2进制
for (int b = 0; b < 8; b++) {
int value01 = value & (0x01 << b);
if (value01 != 0) {
value01 = 1;
}
resultList.add(value01);
} } GlobalVariable.valuemap.put(modbusParam.getId(), resultList); if (GlobalVariable.consoleLog == 80) {
System.out.println(Thread.currentThread().getName() + ":" + resultList);
} } // 功能码03和04检出的值转换
if (modbusParam.getFunction() == 3 || modbusParam.getFunction() == 4) { List<Integer> resultList = new ArrayList<>();
// byteQueue中的两位合并成一位,前面为高位<<8+后面一位低位
for (int i = 0; i < list.size(); i = i + 2) {
int res16 = (list.get(i) << 8) + list.get(i + 1);
resultList.add(res16); } GlobalVariable.valuemap.put(modbusParam.getId(), resultList); if (GlobalVariable.consoleLog == 80) {
System.out.println(Thread.currentThread().getName() + ":" + resultList);
}
} } else {
break;
} } try {
Thread.currentThread();
// 线程休眠,每多少秒读取一次
Thread.sleep(modbusParam.getScanRate()); } catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
} } else {
// 设备掉线时刻
if (timeList.size() % 2 == 0) {
timeList.add(new Date().getTime());
} // 从在线设备集合中移除掉线设备
GlobalVariable.onlineSet.remove(modbusParam.getId());
// 把valuemap中断线设备的最后一次结果清除
GlobalVariable.valuemap.remove(modbusParam.getId()); int errorport = modbusParam.getPort();
System.out.println(Thread.currentThread().getName() + "---" + errorport + "端口断开______________两秒后重连");
try {
Thread.currentThread();
// 线程休眠,七秒后重连
Thread.sleep(7000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "+++++++++++++++++++++++++线程睡醒"); }
}


if (GlobalVariable.restart == 83) {
// 从在线设备集合中移除所有设备
GlobalVariable.onlineSet.clear();
// 把valuemap中断线设备的最后一次结果清除
GlobalVariable.valuemap.remove(modbusParam.getId());
this.modbusParam = null;

//摧毁连接
tcpMaster.destroy();
} } // 获取连接
public static ModbusMaster modbusTCPinit(ModbusParams modbusParam) { ModbusFactory modbusFactory = new ModbusFactory();
// 设备ModbusTCP的Ip与端口,如果不设定端口则默认为502
IpParameters params = new IpParameters(); params.setHost(modbusParam.getIp());

// 设置端口,默认502
params.setPort(modbusParam.getPort());
ModbusMaster tcpMaster = null;
tcpMaster = modbusFactory.createTcpMaster(params, true);
try {
tcpMaster.init();
} catch (ModbusInitException e) {
// 出现连接异常把tcpMaster置为空表示未连接上
tcpMaster = null;
}
return tcpMaster;
}
// 发送请求,获得响应
public static ByteQueue TCPRequestaResponse(ModbusMaster tcpMaster, ModbusParams modbusParam) { ModbusRequest modbusRequest = null; // 判断功能码
if (modbusParam.getFunction() == 1) {
try {
// 功能码01
modbusRequest = new ReadCoilsRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
modbusParam.getQuantity());
} catch (ModbusTransportException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
if (modbusParam.getFunction() == 2) {
try {
// 功能码02
modbusRequest = new ReadDiscreteInputsRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
modbusParam.getQuantity());
} catch (ModbusTransportException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
}
if (modbusParam.getFunction() == 3) {
try {
// 功能码03
modbusRequest = new ReadHoldingRegistersRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
modbusParam.getQuantity());
} catch (ModbusTransportException e) { e.printStackTrace();
}
}
if (modbusParam.getFunction() == 4) {
try {
// 功能码04
modbusRequest = new ReadInputRegistersRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
modbusParam.getQuantity());
} catch (ModbusTransportException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
} ModbusResponse modbusResponse = null;
ByteQueue byteQueue = new ByteQueue(1024);
try {
modbusResponse = tcpMaster.send(modbusRequest);
} catch (ModbusTransportException e) {
e.printStackTrace();
// 响应出现问题,把byteQueue置为空,表示中途断开
tcpMaster.destroy();
byteQueue = null; }
if (byteQueue != null) {
// 响应值保存到字节队列
modbusResponse.write(byteQueue);
}
return byteQueue; } public static int getUnsignedByte(byte data) { // 将data字节型数据转换为0~255 (0xFF
// 即BYTE)。
return data & 0x0FF;
}}

解决方案 »

  1.   

    额,初次发帖,直接从eclipse中拷进来的,没有注意。麻烦帮忙看一下,有思路吗?
      

  2.   

    是怎么调用thread,把这部分代码贴出来
      

  3.   

    调用的思路是解析设备参数,有多少设备就开启多少个线程,然后就让每个线程循环执行。
    下面是代码:
    //开启每个模拟机的线程
    public static void startThreadModbus(){
    List<ModbusParams> mpList=new ArrayList<>();
    ParamsObj paramsObj=new ParamsObj();
    //获得参数对象的集合
    mpList=paramsObj.getParamsObj();
    for (int a = 0; a < mpList.size(); a++) {
    ThreadModbusTCPRead thead = new ThreadModbusTCPRead(mpList.get(a));
    thead.start();
    }
    }
      

  4.   


    package SlaveReadTest;import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;import com.serotonin.modbus4j.ModbusFactory;
    import com.serotonin.modbus4j.ModbusMaster;
    import com.serotonin.modbus4j.exception.ModbusInitException;
    import com.serotonin.modbus4j.exception.ModbusTransportException;
    import com.serotonin.modbus4j.ip.IpParameters;
    import com.serotonin.modbus4j.msg.ModbusRequest;
    import com.serotonin.modbus4j.msg.ModbusResponse;
    import com.serotonin.modbus4j.msg.ReadCoilsRequest;
    import com.serotonin.modbus4j.msg.ReadDiscreteInputsRequest;
    import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest;
    import com.serotonin.modbus4j.msg.ReadInputRegistersRequest;
    import com.serotonin.util.queue.ByteQueue;import entity.ModbusParams;public class ThreadModbusTCPRead extends Thread {    ModbusParams modbusParam;
        static Object obj = new Object();
        // 存放每个设备的掉线时刻和再次连上时刻
        List<Long> timeList = new ArrayList<>();    public ThreadModbusTCPRead(ModbusParams modbusParam) {
            super();
            this.modbusParam = modbusParam;
        }    public ModbusParams getModbusParam() {
            return modbusParam;
        }    public void setModbusParam(ModbusParams modbusParam) {
            this.modbusParam = modbusParam;
        }    public void run() {        ModbusMaster tcpMaster = null;
            ByteQueue byteQueue = null;        while (GlobalVariable.restart == 82) {
    // 获取连接
                tcpMaster = modbusTCPinit(modbusParam);// 判断是否连接上,如果不为空表示已连接进入读取循环
                if (tcpMaster != null) {// 循环请求和获得响应
                    while (GlobalVariable.restart == 82) {                    synchronized (obj) {// 先检测timeList中是否有掉线时间,再存入连上时刻,单数表示断线,双数表示已连接
                            if (timeList.size() % 2 == 1) {
                                timeList.add(new Date().getTime());
                                long Duration = timeList.get(0) - timeList.get(1);
                                timeList.clear();
                            }// 获取请求响应的返回值
                            byteQueue = TCPRequestaResponse(tcpMaster, modbusParam);// 判断是否中途断开
                            if (byteQueue != null) {// 把在线设备的id添加进集合
                                GlobalVariable.onlineSet.add(modbusParam.getId());// 保存从byteQueue取出的值
                                List<Integer> list = new ArrayList<Integer>();// byteQueue前3个值为从站地址,功能码和响应值大小,需排除                            for (int i = 3; i < byteQueue.size(); i++) {
                                    int val = getUnsignedByte(byteQueue.peek(i));
                                    list.add(val);
                                }// 功能码01和02保存的值是线圈和开关的状态,值是0或1,byteQueue检出的值将每八位进行了合并
                                if (modbusParam.getFunction() == 1 || modbusParam.getFunction() == 2) {
    // 保存转换后的0,1值
                                    List<Integer> resultList = new ArrayList<>();                                for (int a = 0; a < list.size(); a++) {
                                        int value = list.get(a);
    // 将每八位合并成的数转成2进制
                                        for (int b = 0; b < 8; b++) {
                                            int value01 = value & (0x01 << b);
                                            if (value01 != 0) {
                                                value01 = 1;
                                            }
                                            resultList.add(value01);
                                        }                                }                                GlobalVariable.valuemap.put(modbusParam.getId(), resultList);                                if (GlobalVariable.consoleLog == 80) {
                                        System.out.println(Thread.currentThread().getName() + ":" + resultList);
                                    }                            }// 功能码03和04检出的值转换
                                if (modbusParam.getFunction() == 3 || modbusParam.getFunction() == 4) {                                List<Integer> resultList = new ArrayList<>();
    // byteQueue中的两位合并成一位,前面为高位<<8+后面一位低位
                                    for (int i = 0; i < list.size(); i = i + 2) {
                                        int res16 = (list.get(i) << 8) + list.get(i + 1);
                                        resultList.add(res16);                                }                                GlobalVariable.valuemap.put(modbusParam.getId(), resultList);                                if (GlobalVariable.consoleLog == 80) {
                                        System.out.println(Thread.currentThread().getName() + ":" + resultList);
                                    }
                                }                        } else {
                                break;
                            }                    }                    try {
                            Thread.currentThread();
    // 线程休眠,每多少秒读取一次
                            Thread.sleep(modbusParam.getScanRate());                    } catch (InterruptedException e1) {
    // TODO Auto-generated catch block
                            e1.printStackTrace();
                        }
                    }            } else {
    // 设备掉线时刻
                    if (timeList.size() % 2 == 0) {
                        timeList.add(new Date().getTime());
                    }// 从在线设备集合中移除掉线设备
                    GlobalVariable.onlineSet.remove(modbusParam.getId());
    // 把valuemap中断线设备的最后一次结果清除
                    GlobalVariable.valuemap.remove(modbusParam.getId());                int errorport = modbusParam.getPort();
                    System.out.println(Thread.currentThread().getName() + "---" + errorport + "端口断开______________两秒后重连");
                    try {
                        Thread.currentThread();
    // 线程休眠,七秒后重连
                        Thread.sleep(7000);
                    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "+++++++++++++++++++++++++线程睡醒");            }
            }
            if (GlobalVariable.restart == 83) {
    // 从在线设备集合中移除所有设备
                GlobalVariable.onlineSet.clear();
    // 把valuemap中断线设备的最后一次结果清除
                GlobalVariable.valuemap.remove(modbusParam.getId());
                this.modbusParam = null;//摧毁连接
                tcpMaster.destroy();
            }    }    // 获取连接
        public static ModbusMaster modbusTCPinit(ModbusParams modbusParam) {        ModbusFactory modbusFactory = new ModbusFactory();
    // 设备ModbusTCP的Ip与端口,如果不设定端口则默认为502
            IpParameters params = new IpParameters();        params.setHost(modbusParam.getIp());// 设置端口,默认502
            params.setPort(modbusParam.getPort());
            ModbusMaster tcpMaster = null;
            tcpMaster = modbusFactory.createTcpMaster(params, true);
            try {
                tcpMaster.init();
            } catch (ModbusInitException e) {
    // 出现连接异常把tcpMaster置为空表示未连接上
                tcpMaster = null;
            }
            return tcpMaster;
        }
        // 发送请求,获得响应
        public static ByteQueue TCPRequestaResponse(ModbusMaster tcpMaster, ModbusParams modbusParam) {        ModbusRequest modbusRequest = null;// 判断功能码
            if (modbusParam.getFunction() == 1) {
                try {
    // 功能码01
                    modbusRequest = new ReadCoilsRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
                            modbusParam.getQuantity());
                } catch (ModbusTransportException e1) {
    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
            if (modbusParam.getFunction() == 2) {
                try {
    // 功能码02
                    modbusRequest = new ReadDiscreteInputsRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
                            modbusParam.getQuantity());
                } catch (ModbusTransportException e2) {
    // TODO Auto-generated catch block
                    e2.printStackTrace();
                }
            }
            if (modbusParam.getFunction() == 3) {
                try {
    // 功能码03
                    modbusRequest = new ReadHoldingRegistersRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
                            modbusParam.getQuantity());
                } catch (ModbusTransportException e) {                e.printStackTrace();
                }
            }
            if (modbusParam.getFunction() == 4) {
                try {
    // 功能码04
                    modbusRequest = new ReadInputRegistersRequest(modbusParam.getSlaveID(), modbusParam.getAddress(),
                            modbusParam.getQuantity());
                } catch (ModbusTransportException e1) {
    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }        ModbusResponse modbusResponse = null;
            ByteQueue byteQueue = new ByteQueue(1024);
            try {
                modbusResponse = tcpMaster.send(modbusRequest);
            } catch (ModbusTransportException e) {
                e.printStackTrace();
    // 响应出现问题,把byteQueue置为空,表示中途断开
                tcpMaster.destroy();
                byteQueue = null;        }
            if (byteQueue != null) {
    // 响应值保存到字节队列
                modbusResponse.write(byteQueue);
            }
            return byteQueue;    }    public static int getUnsignedByte(byte data) { // 将data字节型数据转换为0~255 (0xFF
    // 即BYTE)。
            return data & 0x0FF;
        }}
      

  5.   

    但整个程序会停两秒左右再运行
    说一下我对你这段代码的理解:
    因为obj是定义为static,则obj被所有的线程共享。所有的线程执行到了synchronized时将会只有一个线程进行同步块
    基它的线程都在等待获取锁,线程释放锁之后会进入睡眠状态。整个程序会停两秒左右,你是怎么知道,是不是通过观察同步块的代码修改变量值来发现的?
    推测synchronized同步块的代码执行的非常耗时,线程执行这部分代码是排队执行的,导致观察好像要停两秒左右。
      

  6.   


    每个线程读取一个设备,如果连接上设备就循环读取,执行同步代码块确实是排队执行的。观察到停两秒左右是我在控制台打印读取到的值,正常连接上,从线程1到100这样轮询打印读取值是没问题的不会有停顿,现在问题是我断开一个设备,然后断开设备的线程会抛出异常,我对线程异常try catch了然后把ByteQueue置为null 表示连接断开,然后控制台会暂停两秒左右才打印,就好像是当前线程出现异常后,其他线程也停了一样。