tcpParserHandler()是对对日志的处理,如果tcpParserHandler()里面放一个耗时比较久的运算,问题就出来了。下面这行String tcpString = tcpParserHandler(entry)放在同步里面如就不会丢数据,但是极慢,简直如单线程一样。放在同步外面就很快,但是会丢掉一些话单。请分析一下什么原因。
主线程:
Java代码
import java.io.*;
import java.util.*;import org.logicalcobwebs.proxool.configuration.JAXPConfigurator;import com.macfaq.io.SafeBufferedReader;
import java.sql.*;
import org.apache.commons.io.FileUtils;public class PooledTcplog { private BufferedReader in; private BufferedWriter out; private int numberOfThreads; private List entries = Collections.synchronizedList(new LinkedList()); private boolean finished = false; private int test = 0; public PooledTcplog(InputStream in, OutputStream out, int numberOfThreads) {
this.in = new BufferedReader(new InputStreamReader(in));
this.out = new BufferedWriter(new OutputStreamWriter(out));
this.numberOfThreads = numberOfThreads; } public boolean isFinished() {
return this.finished;
} public int getNumberOfThreads() {
return numberOfThreads;
} public void processLogFile() {
try {
for (int i = 0; i < numberOfThreads; i++) {
Thread t = new TcpParser(entries, this);
t.start();
}
try { String entry = null;
while ((entry = in.readLine()) != null) {
if (entries.size() > numberOfThreads) {
try {
Thread.sleep((long) (1000.0 / numberOfThreads));
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
continue;
} synchronized (entries) {
//System.out.println("test.test:"+(++com.cm.idc.test.test));
entries.add(0, entry);
entries.notifyAll();
} Thread.yield(); } // end while } catch (IOException e) {
System.out.println("Exception: " + e);
} this.finished = true; // finish any threads that are still waiting
synchronized (entries) {
entries.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
this.in.close();
} catch (Exception e) {
e.printStackTrace();
}
} } public void log(String entry) throws IOException {
out.write(entry + System.getProperty("line.separator", "\r\n"));
out.flush();
} public static void main(String[] args) {
int innumberOfThreads = Integer.parseInt(args[0]);
new ConnectionProvider();
File dtnFileDirectory = new File(args[2]); // 制定目标文件路径以及文件名 try {
//while (true) {
try {
File[] fileList = new File(args[1]).listFiles();
if (fileList != null) {
for (int i = 0; i < fileList.length; i++) {
try { FileInputStream fileInputStream = new FileInputStream(
fileList[i]);
PooledTcplog tw = new PooledTcplog(
fileInputStream, System.out,
innumberOfThreads);
tw.processLogFile(); while (!tw.isFinished()) {
try {
Thread
.sleep((long) (1000.0 / innumberOfThreads));
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
continue;
} FileUtils.copyFileToDirectory(fileList[i],
dtnFileDirectory);
fileList[i].delete();
// File dtnFile=new
// File(args[2]+"//"+fileList[i].getName());
// fileList[i].renameTo(dtnFile); } catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
}
}
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
} //}
} catch (ArrayIndexOutOfBoundsException e) {
System.out.println("Usage: java PooledWeblog log文件目录 bak文件目录");
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
} } // end main}
子线程:
Java代码 import java.net.*;
import java.io.*;
import java.util.*;
import org.logicalcobwebs.proxool.configuration.JAXPConfigurator;
import java.sql.Date;
import java.sql.*;
import net.sourceforge.jpcap.net.LinkLayers;
import net.sourceforge.jpcap.net.Packet;
import net.sourceforge.jpcap.net.PacketFactory;
import net.sourceforge.jpcap.util.Timeval;
import net.sourceforge.jpcap.net.TCPPacket;
import net.sourceforge.jpcap.net.EthernetPacket;
public class TcpParser extends Thread {
private List entries;
PooledTcplog log; // used for callbacks
public TcpParser(List entries, PooledTcplog log) {
this.entries = entries;
this.log = log;
}
public void run() {
String entry;
while (true) {
synchronized (entries) {
while (entries.size() == 0) {
if (log.isFinished())
return;
try {
entries.wait();
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
}
entry = (String) entries.remove(entries.size() - 1);
// System.out.println("sub " + entries.size());
/* tcpParserHandler()是对对日志的处理,下面这行放在同步里面如(下面蓝色代码所示)就不会丢数据,但是极慢。放在同步外面(如下面红色代码所示)就不慢,但是会丢掉一些话单。*/
// String tcpString = tcpParserHandler(entry);
}
String tcpString = tcpParserHandler(entry);
/*
* try { log.log(tcpString);
*
* } catch (IOException e) { System.out.println(e);
* e.printStackTrace(); }
*/
this.yield();
}
}
主线程:
Java代码
import java.io.*;
import java.util.*;import org.logicalcobwebs.proxool.configuration.JAXPConfigurator;import com.macfaq.io.SafeBufferedReader;
import java.sql.*;
import org.apache.commons.io.FileUtils;public class PooledTcplog { private BufferedReader in; private BufferedWriter out; private int numberOfThreads; private List entries = Collections.synchronizedList(new LinkedList()); private boolean finished = false; private int test = 0; public PooledTcplog(InputStream in, OutputStream out, int numberOfThreads) {
this.in = new BufferedReader(new InputStreamReader(in));
this.out = new BufferedWriter(new OutputStreamWriter(out));
this.numberOfThreads = numberOfThreads; } public boolean isFinished() {
return this.finished;
} public int getNumberOfThreads() {
return numberOfThreads;
} public void processLogFile() {
try {
for (int i = 0; i < numberOfThreads; i++) {
Thread t = new TcpParser(entries, this);
t.start();
}
try { String entry = null;
while ((entry = in.readLine()) != null) {
if (entries.size() > numberOfThreads) {
try {
Thread.sleep((long) (1000.0 / numberOfThreads));
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
continue;
} synchronized (entries) {
//System.out.println("test.test:"+(++com.cm.idc.test.test));
entries.add(0, entry);
entries.notifyAll();
} Thread.yield(); } // end while } catch (IOException e) {
System.out.println("Exception: " + e);
} this.finished = true; // finish any threads that are still waiting
synchronized (entries) {
entries.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
this.in.close();
} catch (Exception e) {
e.printStackTrace();
}
} } public void log(String entry) throws IOException {
out.write(entry + System.getProperty("line.separator", "\r\n"));
out.flush();
} public static void main(String[] args) {
int innumberOfThreads = Integer.parseInt(args[0]);
new ConnectionProvider();
File dtnFileDirectory = new File(args[2]); // 制定目标文件路径以及文件名 try {
//while (true) {
try {
File[] fileList = new File(args[1]).listFiles();
if (fileList != null) {
for (int i = 0; i < fileList.length; i++) {
try { FileInputStream fileInputStream = new FileInputStream(
fileList[i]);
PooledTcplog tw = new PooledTcplog(
fileInputStream, System.out,
innumberOfThreads);
tw.processLogFile(); while (!tw.isFinished()) {
try {
Thread
.sleep((long) (1000.0 / innumberOfThreads));
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
continue;
} FileUtils.copyFileToDirectory(fileList[i],
dtnFileDirectory);
fileList[i].delete();
// File dtnFile=new
// File(args[2]+"//"+fileList[i].getName());
// fileList[i].renameTo(dtnFile); } catch (Exception e) {
System.out.println(e);
e.printStackTrace();
}
}
}
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
} //}
} catch (ArrayIndexOutOfBoundsException e) {
System.out.println("Usage: java PooledWeblog log文件目录 bak文件目录");
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
} } // end main}
子线程:
Java代码 import java.net.*;
import java.io.*;
import java.util.*;
import org.logicalcobwebs.proxool.configuration.JAXPConfigurator;
import java.sql.Date;
import java.sql.*;
import net.sourceforge.jpcap.net.LinkLayers;
import net.sourceforge.jpcap.net.Packet;
import net.sourceforge.jpcap.net.PacketFactory;
import net.sourceforge.jpcap.util.Timeval;
import net.sourceforge.jpcap.net.TCPPacket;
import net.sourceforge.jpcap.net.EthernetPacket;
public class TcpParser extends Thread {
private List entries;
PooledTcplog log; // used for callbacks
public TcpParser(List entries, PooledTcplog log) {
this.entries = entries;
this.log = log;
}
public void run() {
String entry;
while (true) {
synchronized (entries) {
while (entries.size() == 0) {
if (log.isFinished())
return;
try {
entries.wait();
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
}
entry = (String) entries.remove(entries.size() - 1);
// System.out.println("sub " + entries.size());
/* tcpParserHandler()是对对日志的处理,下面这行放在同步里面如(下面蓝色代码所示)就不会丢数据,但是极慢。放在同步外面(如下面红色代码所示)就不慢,但是会丢掉一些话单。*/
// String tcpString = tcpParserHandler(entry);
}
String tcpString = tcpParserHandler(entry);
/*
* try { log.log(tcpString);
*
* } catch (IOException e) { System.out.println(e);
* e.printStackTrace(); }
*/
this.yield();
}
}
1 不要每个线程都能直接写文件日志,那样你会出现并发的文件锁定问题。
2 制作日志队列,FIFO 的那种,做成static的。每个线程只调用增加日志的方法,然后返回
3 一个单独的线程负责检查队列是否需要写入到文件中,策略你自己定
a) 一旦发现,马上写,这样不会丢数据
b) 到10行,或者30秒没有新日志,写一次。
c) 。 这样,多线程不会有性能问题,也不会造成日志丢失。
当然,如果不是一条一写,还是会掉电的时候,可能丢失几个日志的。
自己权衡吧!
test.primenumber();//计算2000以内质数。
}