tcpParserHandler()是对对日志的处理,如果tcpParserHandler()里面放一个耗时比较久的运算,问题就出来了。下面这行String tcpString = tcpParserHandler(entry)放在同步里面如就不会丢数据,但是极慢,简直如单线程一样。放在同步外面就很快,但是会丢掉一些话单。请分析一下什么原因。 
主线程: 
Java代码 
import java.io.*;
import java.util.*;import org.logicalcobwebs.proxool.configuration.JAXPConfigurator;import com.macfaq.io.SafeBufferedReader;
import java.sql.*;
import org.apache.commons.io.FileUtils;public class PooledTcplog { private BufferedReader in; private BufferedWriter out; private int numberOfThreads; private List entries = Collections.synchronizedList(new LinkedList()); private boolean finished = false; private int test = 0; public PooledTcplog(InputStream in, OutputStream out, int numberOfThreads) {
this.in = new BufferedReader(new InputStreamReader(in));
this.out = new BufferedWriter(new OutputStreamWriter(out));
this.numberOfThreads = numberOfThreads; } public boolean isFinished() {
return this.finished;
} public int getNumberOfThreads() {
return numberOfThreads;
} public void processLogFile() {
try {
for (int i = 0; i < numberOfThreads; i++) {
Thread t = new TcpParser(entries, this);
t.start();
}

try { String entry = null;
while ((entry = in.readLine()) != null) {

if (entries.size() > numberOfThreads) {
try {
Thread.sleep((long) (1000.0 / numberOfThreads));
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
continue;
} synchronized (entries) {
//System.out.println("test.test:"+(++com.cm.idc.test.test));
entries.add(0, entry);
entries.notifyAll();
} Thread.yield(); } // end while } catch (IOException e) {
System.out.println("Exception: " + e);
} this.finished = true; // finish any threads that are still waiting
synchronized (entries) {
entries.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
this.in.close();
} catch (Exception e) {
e.printStackTrace();
}
} } public void log(String entry) throws IOException {
out.write(entry + System.getProperty("line.separator", "\r\n"));
out.flush();
} public static void main(String[] args) {
int innumberOfThreads = Integer.parseInt(args[0]);
new ConnectionProvider();
File dtnFileDirectory = new File(args[2]); // 制定目标文件路径以及文件名 try {
//while (true) {
try {
File[] fileList = new File(args[1]).listFiles();
if (fileList != null) {
for (int i = 0; i < fileList.length; i++) {
try { FileInputStream fileInputStream = new FileInputStream(
fileList[i]);
PooledTcplog tw = new PooledTcplog(
fileInputStream, System.out,
innumberOfThreads);
tw.processLogFile(); while (!tw.isFinished()) {
try {
Thread
.sleep((long) (1000.0 / innumberOfThreads));
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
continue;
} FileUtils.copyFileToDirectory(fileList[i],
dtnFileDirectory);
fileList[i].delete();
// File dtnFile=new
// File(args[2]+"//"+fileList[i].getName());
// fileList[i].renameTo(dtnFile); } catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
}
}
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
} //}
} catch (ArrayIndexOutOfBoundsException e) {
System.out.println("Usage: java PooledWeblog log文件目录 bak文件目录");
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
} } // end main}
  子线程: 
Java代码 import java.net.*;   
import java.io.*;   
import java.util.*;   
  
import org.logicalcobwebs.proxool.configuration.JAXPConfigurator;   
  
import java.sql.Date;   
import java.sql.*;   
  
import net.sourceforge.jpcap.net.LinkLayers;   
import net.sourceforge.jpcap.net.Packet;   
import net.sourceforge.jpcap.net.PacketFactory;   
import net.sourceforge.jpcap.util.Timeval;   
import net.sourceforge.jpcap.net.TCPPacket;   
import net.sourceforge.jpcap.net.EthernetPacket;   
  
public class TcpParser extends Thread {   
  
    private List entries;   
  
    PooledTcplog log; // used for callbacks   
  
    public TcpParser(List entries, PooledTcplog log) {   
        this.entries = entries;   
        this.log = log;   
    }   
  
    public void run() {   
  
        String entry;   
  
        while (true) {   
  
            synchronized (entries) {   
                while (entries.size() == 0) {   
                    if (log.isFinished())   
                        return;   
                    try {   
                        entries.wait();   
                    } catch (InterruptedException e) {   
                        System.out.println(e);   
                        e.printStackTrace();   
                    }   
                }   
                entry = (String) entries.remove(entries.size() - 1);   
                // System.out.println("sub " + entries.size());   
                /* tcpParserHandler()是对对日志的处理,下面这行放在同步里面如(下面蓝色代码所示)就不会丢数据,但是极慢。放在同步外面(如下面红色代码所示)就不慢,但是会丢掉一些话单。*/  
                                    // String tcpString = tcpParserHandler(entry);   
            }   
            String tcpString = tcpParserHandler(entry);   
            /*  
             * try { log.log(tcpString);  
             *   
             *  } catch (IOException e) { System.out.println(e);  
             * e.printStackTrace(); }  
             */  
            this.yield();   
  
        }   
  
    }  

解决方案 »

  1.   

    在tcpParserHandler()中使用了entry没?
      

  2.   

    在tcpParserHandler()中使用了entries没?
      

  3.   

    代码太长,我就不仔细看了,分析一下多线程日志问题吧
    1 不要每个线程都能直接写文件日志,那样你会出现并发的文件锁定问题。
    2 制作日志队列,FIFO 的那种,做成static的。每个线程只调用增加日志的方法,然后返回
    3 一个单独的线程负责检查队列是否需要写入到文件中,策略你自己定
      a) 一旦发现,马上写,这样不会丢数据
      b) 到10行,或者30秒没有新日志,写一次。
      c) 。 这样,多线程不会有性能问题,也不会造成日志丢失。 
    当然,如果不是一条一写,还是会掉电的时候,可能丢失几个日志的。
    自己权衡吧!
      

  4.   

    对于这段多线程代码说实话不敢恭维,看起来混乱,性能也不是最佳,可以考虑用java.util.concurrent包里的工具来简化你的代码。然后对于你这个问题出错的主原因也应该是tcpParserHandler方法的同步没有处理好,但你却没有这个方法的代码。真是浪费了我看你上面代码的时间。
      

  5.   

    代码太长,不方便读。支持java200_net回复
      

  6.   

    tcpParserHandler()中使用了entry,但是没有用到entries。tcpParserHandler()代码如下,就是做了一个耗时比较长的计算:public String tcpParserHandler(String stringPacket) {
    test.primenumber();//计算2000以内质数。
    }
      

  7.   

    To:youfly,我的tcpParserHandler(entry)中只传入了entry,未涉及对entries同步的操作。里面即使只放一个最简单的计算质数的运算,打印执行次数,发现都是丢话单的,也就是说tcpParserHandler(entry)的执行次数少于话单的总条数。
      

  8.   

    比如你的日志是不是指调用了public void log(String entry) throws IOException这个方法进行了打印,如果是,这个方法里面的out对象就应该考虑同步了。还有你的可考虑研究一下java.util.concurrent.ThreadPoolExecutor类,感觉这个线程池挺符合你的应用的,用了以后,代码会简单很多,而且可以省掉spleep方法。它对线程的调度比较高效的。