下面是小弟的一个数据处理类,运行1个小时左右,run()函数就会死锁,process仍然继续运行!各位老大帮忙看看,急
核心类为run(),process()
具体调用方式请看main()函数package com.mvtech.vasomp.slv.communication;import java.util.*;import org.apache.log4j.*;
import com.mvtech.vasomp.slv.utils.StopWatch;
import com.mvtech.vasomp.slv.datadeal.ParseCPcontent;
import com.mvtech.vasomp.slv.datadeal.ParseCPfrequence;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import com.mvtech.vasomp.slv.datadeal.ParseCPstat;
import com.mvtech.vasomp.slv.datadeal.ParseFrontAlert;
public class XmlProcessor
    implements Runnable {
    //主要的数据缓存区
    public static LinkedList xmlQueue = new LinkedList();
    public static long indicator = 0;    //the capacity of buf is set to 100M
    public static final long BUF_LIMIT = 500000000;
    public static String notice = "";    //the number of xml file inserted into queue
    static int inXmls = 0;    //the number of xml file removed from queue
    static int outXmls = 0;    String name = "";    static Category log = Category.getInstance(XmlProcessor.class);    public XmlProcessor() {}    public XmlProcessor(String ThreadName) {
        this.name = ThreadName;
    }    public void initName() {
        this.name = Thread.currentThread().getName();
    }    /**
     * 往xmlQueue数据缓存区存入数据
     * 从XML二级缓冲区插入一个XML文件到队列末尾。
     * put xml file content to the end of queue
     * @param data String:xml filecontent
     * @param ip String
     * @param type String
     */
    public void process(String data, String ip, String type) {
        XmlEntity newXml = new XmlEntity();
        newXml.setContent(data);
        newXml.setFrontIP(ip);
        newXml.setFrontType(type);        //if inesert wait operation is wake and the buf is full,
        //keep waiting
        synchronized (xmlQueue) {
            for (; ; ) {
                if (this.indicator > BUF_LIMIT ||
                    this.indicator + data.length() > BUF_LIMIT) {
                    try {
                        xmlQueue.wait();
                    }
                    catch (InterruptedException ie) {
                        log.warn(name + " " + ip + ":" + ie.getMessage());
                    }
                }
                else {
                    //add new xml to the end of queue
                    xmlQueue.add(newXml);
                    //newXml
                    this.indicator += newXml.length();
                    inXmls++;
                    //log.info("nitify xml reader!");
                    xmlQueue.notifyAll();                    break;
                }
            }
        }        log.info("新xml文件进入队列,已经插入的文件个数为:" + inXmls +
                 ",当前xml文件队列大小:" + (float)this.indicator / 1000000 + "M");    }    /**
     * 解析处理xmlQueue数据缓存区的数据
     * 从XML文件处理二级缓冲区取出一个XML文件,并
     * 交由XML文件解析器处理。
     */
    public void run() {
        for (; ; ) {
            XmlEntity xml = null;
            synchronized (xmlQueue) {
                for (; ; ) {
                    try {
                        if (indicator <= 0) {
                            // wait for data writer
                            xmlQueue.wait();
                        }
                        else {
                            xml = (XmlEntity) xmlQueue.removeFirst();
                            this.indicator -= xml.length();
                            outXmls++;
                            xmlQueue.notifyAll();
                            break;
                        }
                    }
                    catch (InterruptedException ex) {
                    }
                }
            }
            if (xml == null) {
                continue;
            }
            StopWatch watch = new StopWatch(true);
            xml.persist();
            watch.stop();
            log.info("线程:" + name + ":" + "解析xml文件开销时间:" +
                     watch);        }
    }    public static void main(String[] args) {        for (int i = 0; i < 30; i++) {
            XmlProcessor xp = new XmlProcessor("Thread" + i);
            Thread t = new Thread(xp);
            t.setPriority(Thread.MAX_PRIORITY);
            t.setName("Thread" + i);            t.start();
        }        byte[] b = null;
        try {
            FileInputStream fi = new FileInputStream(
                "D:\\My Documents\\put_dataCompressed");
            int totalLen = fi.available();
            log.info("totalLen=" + totalLen);
            b = new byte[totalLen];
            fi.read(b);
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }        if (b == null) {
            return;
        }        for (; ; ) {
            StopWatch watch = new StopWatch(true);
            XmlProcessor xp = XmlProcessorFactory.getXmlProcess(
                "192.168.0.1");
            xp.process(new String(b), "192.168.0.1", "cp");
            watch.stop();
            log.info("The time of process a xml " +
                     watch);            try {
                Thread.currentThread().sleep(1000 * 3);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }}

解决方案 »

  1.   

    XmlEntity类为定义的数据格式
    //内部类,数据结果xml文件实体
        class XmlEntity {
            private String frontType = "";
            private String frontIP = "";
            private String content = "";
            void put_dataRaw(String data, String ip, String type) {
                ByteArrayInputStream is = null;
                try {
                    //String str = new String(data.getBytes("ISO8859-1"), "GB2312");
                    //log.debug("put_dataRaw: data is " + data);
                    byte dataByte[] = data.getBytes();                is = new ByteArrayInputStream(dataByte);
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                dealData(ip, type, is);        }        private void dealData(String ip, String type, InputStream is) {
                //处理cp探针处理的数据
                if (type.equalsIgnoreCase("cp_normal")) {
                    ParseCPcontent deal = new ParseCPcontent(is, ip);
                    deal.runIt(null);
                }
                else if (type.equalsIgnoreCase("cp_freq")) {
                    //log.info("recieved cp_freq!");
                    ParseCPfrequence deal = new ParseCPfrequence(is, ip);
                    deal.runIt(null);
                }
                else if (type.equalsIgnoreCase("cp_sysdata")) {
                    log.info("recieved cp_sysdata!");
                    ParseCPstat deal = new ParseCPstat(is, ip);
                    deal.runIt(null);
                }
                else if (type.equalsIgnoreCase("cp_alert")) {
                    log.info("recieved cp_alert!");
                    ParseFrontAlert deal = new ParseFrontAlert(is, ip);
                    deal.runIt(null);
                }
            }        void persist() {
                //log.info("persist xml of "+frontIP);
                put_dataRaw(content,
                            this.frontIP,
                            this.frontType);
            }        XmlEntity() {
            }        public void setContent(String content) {
                this.content = content;
            }        public String getContent() {
                return content;
            }        public int length() {
                return content.length();
            }        public void setFrontIP(String frontIP) {
                this.frontIP = frontIP;
            }        public String getFrontIP() {
                return frontIP;
            }        public void setFrontType(String frontType) {
                this.frontType = frontType;
            }        public String getFrontType() {
                return frontType;
            }
        }
      

  2.   

    确实感觉有点乱;要是csdn贴代码也有 代码块 帮忙整理格式就好了
      

  3.   

    核心的内容就是如下:
    public void run() {
            for (; ; ) {
                XmlEntity xml = null;
                synchronized (xmlQueue) {
                    for (; ; ) {
                        try {
                            if (indicator <= 0) {
                                // wait for data writer
                                xmlQueue.wait();
                            }
                            else {
                                xml = (XmlEntity) xmlQueue.removeFirst();
                                this.indicator -= xml.length();
                                outXmls++;
                                xmlQueue.notifyAll();
                                break;
                            }
                        }
                        catch (InterruptedException ex) {
                        }
                    }
                }
                if (xml == null) {
                    continue;
                }
                StopWatch watch = new StopWatch(true);
                xml.persist();
                watch.stop();
                log.info("线程:" + name + ":" + "解析xml文件开销时间:" +
                         watch);        }
        }
      

  4.   

    public void process(String data, String ip, String type) {
            XmlEntity newXml = new XmlEntity();
            newXml.setContent(data);
            newXml.setFrontIP(ip);
            newXml.setFrontType(type);        //if inesert wait operation is wake and the buf is full,
            //keep waiting
            synchronized (xmlQueue) {
                for (; ; ) {
                    if (this.indicator > BUF_LIMIT ||
                        this.indicator + data.length() > BUF_LIMIT) {
                        try {
                            xmlQueue.wait();
                        }
                        catch (InterruptedException ie) {
                            log.warn(name + " " + ip + ":" + ie.getMessage());
                        }
                    }
                    else {
                        //add new xml to the end of queue
                        xmlQueue.add(newXml);
                        //newXml
                        this.indicator += newXml.length();
                        inXmls++;
                        //log.info("nitify xml reader!");
                        xmlQueue.notifyAll();                    break;
                    }
                }
            }        log.info("新xml文件进入队列,已经插入的文件个数为:" + inXmls +
                     ",当前xml文件队列大小:" + (float)this.indicator / 1000000 + "M");    }
      

  5.   

    应该使用join才能确保一个线程等带另一个线程结束以后才开始运行。
    现在的问题是,一个线程还没有进入到wait,另一个就开始了。
      

  6.   

    好长
    JAVA群,欢迎大家加入讨论!QQ群8595489