下面是小弟的一个数据处理类,运行1个小时左右,run()函数就会死锁,process仍然继续运行!各位老大帮忙看看,急
核心类为run(),process()
具体调用方式请看main()函数package com.mvtech.vasomp.slv.communication;import java.util.*;import org.apache.log4j.*;
import com.mvtech.vasomp.slv.utils.StopWatch;
import com.mvtech.vasomp.slv.datadeal.ParseCPcontent;
import com.mvtech.vasomp.slv.datadeal.ParseCPfrequence;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import com.mvtech.vasomp.slv.datadeal.ParseCPstat;
import com.mvtech.vasomp.slv.datadeal.ParseFrontAlert;
public class XmlProcessor
implements Runnable {
//主要的数据缓存区
public static LinkedList xmlQueue = new LinkedList();
public static long indicator = 0; //the capacity of buf is set to 100M
public static final long BUF_LIMIT = 500000000;
public static String notice = ""; //the number of xml file inserted into queue
static int inXmls = 0; //the number of xml file removed from queue
static int outXmls = 0; String name = ""; static Category log = Category.getInstance(XmlProcessor.class); public XmlProcessor() {} public XmlProcessor(String ThreadName) {
this.name = ThreadName;
} public void initName() {
this.name = Thread.currentThread().getName();
} /**
* 往xmlQueue数据缓存区存入数据
* 从XML二级缓冲区插入一个XML文件到队列末尾。
* put xml file content to the end of queue
* @param data String:xml filecontent
* @param ip String
* @param type String
*/
public void process(String data, String ip, String type) {
XmlEntity newXml = new XmlEntity();
newXml.setContent(data);
newXml.setFrontIP(ip);
newXml.setFrontType(type); //if inesert wait operation is wake and the buf is full,
//keep waiting
synchronized (xmlQueue) {
for (; ; ) {
if (this.indicator > BUF_LIMIT ||
this.indicator + data.length() > BUF_LIMIT) {
try {
xmlQueue.wait();
}
catch (InterruptedException ie) {
log.warn(name + " " + ip + ":" + ie.getMessage());
}
}
else {
//add new xml to the end of queue
xmlQueue.add(newXml);
//newXml
this.indicator += newXml.length();
inXmls++;
//log.info("nitify xml reader!");
xmlQueue.notifyAll(); break;
}
}
} log.info("新xml文件进入队列,已经插入的文件个数为:" + inXmls +
",当前xml文件队列大小:" + (float)this.indicator / 1000000 + "M"); } /**
* 解析处理xmlQueue数据缓存区的数据
* 从XML文件处理二级缓冲区取出一个XML文件,并
* 交由XML文件解析器处理。
*/
public void run() {
for (; ; ) {
XmlEntity xml = null;
synchronized (xmlQueue) {
for (; ; ) {
try {
if (indicator <= 0) {
// wait for data writer
xmlQueue.wait();
}
else {
xml = (XmlEntity) xmlQueue.removeFirst();
this.indicator -= xml.length();
outXmls++;
xmlQueue.notifyAll();
break;
}
}
catch (InterruptedException ex) {
}
}
}
if (xml == null) {
continue;
}
StopWatch watch = new StopWatch(true);
xml.persist();
watch.stop();
log.info("线程:" + name + ":" + "解析xml文件开销时间:" +
watch); }
} public static void main(String[] args) { for (int i = 0; i < 30; i++) {
XmlProcessor xp = new XmlProcessor("Thread" + i);
Thread t = new Thread(xp);
t.setPriority(Thread.MAX_PRIORITY);
t.setName("Thread" + i); t.start();
} byte[] b = null;
try {
FileInputStream fi = new FileInputStream(
"D:\\My Documents\\put_dataCompressed");
int totalLen = fi.available();
log.info("totalLen=" + totalLen);
b = new byte[totalLen];
fi.read(b);
}
catch (Exception ex) {
ex.printStackTrace();
} if (b == null) {
return;
} for (; ; ) {
StopWatch watch = new StopWatch(true);
XmlProcessor xp = XmlProcessorFactory.getXmlProcess(
"192.168.0.1");
xp.process(new String(b), "192.168.0.1", "cp");
watch.stop();
log.info("The time of process a xml " +
watch); try {
Thread.currentThread().sleep(1000 * 3);
}
catch (Exception e) {
e.printStackTrace();
}
}
}}
核心类为run(),process()
具体调用方式请看main()函数package com.mvtech.vasomp.slv.communication;import java.util.*;import org.apache.log4j.*;
import com.mvtech.vasomp.slv.utils.StopWatch;
import com.mvtech.vasomp.slv.datadeal.ParseCPcontent;
import com.mvtech.vasomp.slv.datadeal.ParseCPfrequence;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import com.mvtech.vasomp.slv.datadeal.ParseCPstat;
import com.mvtech.vasomp.slv.datadeal.ParseFrontAlert;
public class XmlProcessor
implements Runnable {
//主要的数据缓存区
public static LinkedList xmlQueue = new LinkedList();
public static long indicator = 0; //the capacity of buf is set to 100M
public static final long BUF_LIMIT = 500000000;
public static String notice = ""; //the number of xml file inserted into queue
static int inXmls = 0; //the number of xml file removed from queue
static int outXmls = 0; String name = ""; static Category log = Category.getInstance(XmlProcessor.class); public XmlProcessor() {} public XmlProcessor(String ThreadName) {
this.name = ThreadName;
} public void initName() {
this.name = Thread.currentThread().getName();
} /**
* 往xmlQueue数据缓存区存入数据
* 从XML二级缓冲区插入一个XML文件到队列末尾。
* put xml file content to the end of queue
* @param data String:xml filecontent
* @param ip String
* @param type String
*/
public void process(String data, String ip, String type) {
XmlEntity newXml = new XmlEntity();
newXml.setContent(data);
newXml.setFrontIP(ip);
newXml.setFrontType(type); //if inesert wait operation is wake and the buf is full,
//keep waiting
synchronized (xmlQueue) {
for (; ; ) {
if (this.indicator > BUF_LIMIT ||
this.indicator + data.length() > BUF_LIMIT) {
try {
xmlQueue.wait();
}
catch (InterruptedException ie) {
log.warn(name + " " + ip + ":" + ie.getMessage());
}
}
else {
//add new xml to the end of queue
xmlQueue.add(newXml);
//newXml
this.indicator += newXml.length();
inXmls++;
//log.info("nitify xml reader!");
xmlQueue.notifyAll(); break;
}
}
} log.info("新xml文件进入队列,已经插入的文件个数为:" + inXmls +
",当前xml文件队列大小:" + (float)this.indicator / 1000000 + "M"); } /**
* 解析处理xmlQueue数据缓存区的数据
* 从XML文件处理二级缓冲区取出一个XML文件,并
* 交由XML文件解析器处理。
*/
public void run() {
for (; ; ) {
XmlEntity xml = null;
synchronized (xmlQueue) {
for (; ; ) {
try {
if (indicator <= 0) {
// wait for data writer
xmlQueue.wait();
}
else {
xml = (XmlEntity) xmlQueue.removeFirst();
this.indicator -= xml.length();
outXmls++;
xmlQueue.notifyAll();
break;
}
}
catch (InterruptedException ex) {
}
}
}
if (xml == null) {
continue;
}
StopWatch watch = new StopWatch(true);
xml.persist();
watch.stop();
log.info("线程:" + name + ":" + "解析xml文件开销时间:" +
watch); }
} public static void main(String[] args) { for (int i = 0; i < 30; i++) {
XmlProcessor xp = new XmlProcessor("Thread" + i);
Thread t = new Thread(xp);
t.setPriority(Thread.MAX_PRIORITY);
t.setName("Thread" + i); t.start();
} byte[] b = null;
try {
FileInputStream fi = new FileInputStream(
"D:\\My Documents\\put_dataCompressed");
int totalLen = fi.available();
log.info("totalLen=" + totalLen);
b = new byte[totalLen];
fi.read(b);
}
catch (Exception ex) {
ex.printStackTrace();
} if (b == null) {
return;
} for (; ; ) {
StopWatch watch = new StopWatch(true);
XmlProcessor xp = XmlProcessorFactory.getXmlProcess(
"192.168.0.1");
xp.process(new String(b), "192.168.0.1", "cp");
watch.stop();
log.info("The time of process a xml " +
watch); try {
Thread.currentThread().sleep(1000 * 3);
}
catch (Exception e) {
e.printStackTrace();
}
}
}}
//内部类,数据结果xml文件实体
class XmlEntity {
private String frontType = "";
private String frontIP = "";
private String content = "";
void put_dataRaw(String data, String ip, String type) {
ByteArrayInputStream is = null;
try {
//String str = new String(data.getBytes("ISO8859-1"), "GB2312");
//log.debug("put_dataRaw: data is " + data);
byte dataByte[] = data.getBytes(); is = new ByteArrayInputStream(dataByte);
}
catch (Exception e) {
e.printStackTrace();
}
dealData(ip, type, is); } private void dealData(String ip, String type, InputStream is) {
//处理cp探针处理的数据
if (type.equalsIgnoreCase("cp_normal")) {
ParseCPcontent deal = new ParseCPcontent(is, ip);
deal.runIt(null);
}
else if (type.equalsIgnoreCase("cp_freq")) {
//log.info("recieved cp_freq!");
ParseCPfrequence deal = new ParseCPfrequence(is, ip);
deal.runIt(null);
}
else if (type.equalsIgnoreCase("cp_sysdata")) {
log.info("recieved cp_sysdata!");
ParseCPstat deal = new ParseCPstat(is, ip);
deal.runIt(null);
}
else if (type.equalsIgnoreCase("cp_alert")) {
log.info("recieved cp_alert!");
ParseFrontAlert deal = new ParseFrontAlert(is, ip);
deal.runIt(null);
}
} void persist() {
//log.info("persist xml of "+frontIP);
put_dataRaw(content,
this.frontIP,
this.frontType);
} XmlEntity() {
} public void setContent(String content) {
this.content = content;
} public String getContent() {
return content;
} public int length() {
return content.length();
} public void setFrontIP(String frontIP) {
this.frontIP = frontIP;
} public String getFrontIP() {
return frontIP;
} public void setFrontType(String frontType) {
this.frontType = frontType;
} public String getFrontType() {
return frontType;
}
}
public void run() {
for (; ; ) {
XmlEntity xml = null;
synchronized (xmlQueue) {
for (; ; ) {
try {
if (indicator <= 0) {
// wait for data writer
xmlQueue.wait();
}
else {
xml = (XmlEntity) xmlQueue.removeFirst();
this.indicator -= xml.length();
outXmls++;
xmlQueue.notifyAll();
break;
}
}
catch (InterruptedException ex) {
}
}
}
if (xml == null) {
continue;
}
StopWatch watch = new StopWatch(true);
xml.persist();
watch.stop();
log.info("线程:" + name + ":" + "解析xml文件开销时间:" +
watch); }
}
XmlEntity newXml = new XmlEntity();
newXml.setContent(data);
newXml.setFrontIP(ip);
newXml.setFrontType(type); //if inesert wait operation is wake and the buf is full,
//keep waiting
synchronized (xmlQueue) {
for (; ; ) {
if (this.indicator > BUF_LIMIT ||
this.indicator + data.length() > BUF_LIMIT) {
try {
xmlQueue.wait();
}
catch (InterruptedException ie) {
log.warn(name + " " + ip + ":" + ie.getMessage());
}
}
else {
//add new xml to the end of queue
xmlQueue.add(newXml);
//newXml
this.indicator += newXml.length();
inXmls++;
//log.info("nitify xml reader!");
xmlQueue.notifyAll(); break;
}
}
} log.info("新xml文件进入队列,已经插入的文件个数为:" + inXmls +
",当前xml文件队列大小:" + (float)this.indicator / 1000000 + "M"); }
现在的问题是,一个线程还没有进入到wait,另一个就开始了。
JAVA群,欢迎大家加入讨论!QQ群8595489