new Thread(read, "线程1").start(); new Thread(read, "线程2").start(); new Thread(read, "线程3").start(); new Thread(read, "线程4").start(); new Thread(read, "线程5").start(); 这几个线程执行顺序我们是无法控制的,线程并发也是无法避免的。要实现你的功能那就只有一个办法让这5个线程之间有一个共识,就是让他们都知道除他之外的线程是否已经对某个文件进行了读取操作,如果已读取就不再读了。这就要用到线程之间的通信来完在。这里最好的办法就是定义一个全局变量,当一个线程一开始读取文件就把这个文件名信息存入这个全局变量中,别的线程读取时来判断一下那文件是否己经存到了全局变量中,如果已存在就不在讯取这个文件了。大体思路是这样的。所以你这个代码就得改一下在main中建一个全局变量LIST,然后把这个LIST传进这5个线程中来达到效果,当然也可以把list设成static这个好做一些。需要注意的是在每个线程中需要给这个全局的list加上线程锁,当任意一个线程对list进行操作时,让其他线程都处理等待状态。
这是在你代码基础上进行最小改动的结果,你看看 import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.util.ArrayList; import java.util.List;import org.jdom2.Document; import org.jdom2.Element; import org.jdom2.input.SAXBuilder;public class CompareTest { public static void main(String args[]) { read read = new read(); new Thread(read, "线程1").start(); new Thread(read, "线程2").start(); new Thread(read, "线程3").start(); new Thread(read, "线程4").start(); new Thread(read, "线程5").start(); } }class read implements Runnable { List<File> filePathsList = new ArrayList<File>(); int index = 0; public read() { File f = new File("d:" + File.separator + "tmp"); getFileList(f); } private void getFileList(File f) { File[] filePaths = f.listFiles(); for (File s : filePaths) { if (s.isDirectory()) { getFileList(s); } else { if (s.isFile() && -1 != s.getName().lastIndexOf(".xml")) { filePathsList.add(s); } } } } @Override public void run() { File file = null; while (index < filePathsList.size()) { synchronized (this) { file = filePathsList.get(index); if (file == null) { continue; } index++; } // 解析xml SAXBuilder builder = new SAXBuilder(); List<String> xmlList = new ArrayList<String>(); try { Thread.sleep(300); } catch (InterruptedException e2) { e2.printStackTrace(); } try { InputStream is = new FileInputStream(file.getPath()); System.out.println("当前使用的线程是:" + Thread.currentThread().getName() + ",正在读文件:" + filePathsList.indexOf(file) + ",列表当前长度:" + filePathsList.size()); Document doc = builder.build(is); Element root = doc.getRootElement(); List<Element> list = root.getChildren(); for (Element e : list) { xmlList.add(e.getChildTextTrim("ERROR_FEEDBACK_ID")); } xmlList.add("--------------------------"); } catch (Exception e1) { e1.printStackTrace(); } } } }
上面代码还有个小bug,如下是在你基础上进行最小改动的 public class CompareTest { public static void main(String args[]) { read read = new read(); new Thread(read, "线程1").start(); new Thread(read, "线程2").start(); new Thread(read, "线程3").start(); new Thread(read, "线程4").start(); new Thread(read, "线程5").start(); } }class read implements Runnable { List<File> filePathsList = new ArrayList<File>(); int index = 0; public read() { File f = new File("d:" + File.separator + "tmp"); getFileList(f); } private void getFileList(File f) { File[] filePaths = f.listFiles(); for (File s : filePaths) { if (s.isDirectory()) { getFileList(s); } else { if (-1 != s.getName().lastIndexOf(".xml")) { filePathsList.add(s); } } } } @Override public void run() { File file = null; while (index < filePathsList.size()) { synchronized (this) { if (index >= filePathsList.size()) { continue; } file = filePathsList.get(index); index++; } // 解析xml SAXBuilder builder = new SAXBuilder(); List<String> xmlList = new ArrayList<String>(); try { Thread.sleep(300); } catch (InterruptedException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } try { InputStream is = new FileInputStream(file.getPath()); System.out.println("当前使用的线程是:" + Thread.currentThread().getName() + ",正在读文件:" + filePathsList.indexOf(file) + ",列表当前长度:" + filePathsList.size()); Document doc = builder.build(is); Element root = doc.getRootElement(); List<Element> list = root.getChildren(); for (Element e : list) { xmlList.add(e.getChildTextTrim("ERROR_FEEDBACK_ID")); } xmlList.add("--------------------------"); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } }
import java.io.File; import java.util.List; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue;import org.jdom2.Document; import org.jdom2.Element; import org.jdom2.input.SAXBuilder; public class Test_15 { public static final int THREAD_POOL_SIZE = 5; private final File POISON = new File(""); private final BlockingQueue<File> files = new LinkedBlockingQueue<File>(1000); private final ConcurrentLinkedQueue<String> infos = new ConcurrentLinkedQueue<String>(); private Thread[] pool = null; private volatile boolean running = false;
public Test_15() { this(THREAD_POOL_SIZE); }
public Test_15(int poolSize) { pool = new Thread[poolSize]; FileWorker worker = new FileWorker(); for(int i=0;i<pool.length;i++){ pool[i] = new Thread(worker,"线程"+(i+1)); pool[i].start(); } running = true; }
new Thread(read, "线程2").start();
new Thread(read, "线程3").start();
new Thread(read, "线程4").start();
new Thread(read, "线程5").start();
这几个线程执行顺序我们是无法控制的,线程并发也是无法避免的。要实现你的功能那就只有一个办法让这5个线程之间有一个共识,就是让他们都知道除他之外的线程是否已经对某个文件进行了读取操作,如果已读取就不再读了。这就要用到线程之间的通信来完在。这里最好的办法就是定义一个全局变量,当一个线程一开始读取文件就把这个文件名信息存入这个全局变量中,别的线程读取时来判断一下那文件是否己经存到了全局变量中,如果已存在就不在讯取这个文件了。大体思路是这样的。所以你这个代码就得改一下在main中建一个全局变量LIST,然后把这个LIST传进这5个线程中来达到效果,当然也可以把list设成static这个好做一些。需要注意的是在每个线程中需要给这个全局的list加上线程锁,当任意一个线程对list进行操作时,让其他线程都处理等待状态。
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.input.SAXBuilder;public class CompareTest {
public static void main(String args[]) {
read read = new read();
new Thread(read, "线程1").start();
new Thread(read, "线程2").start();
new Thread(read, "线程3").start();
new Thread(read, "线程4").start();
new Thread(read, "线程5").start();
}
}class read implements Runnable {
List<File> filePathsList = new ArrayList<File>();
int index = 0; public read() {
File f = new File("d:" + File.separator + "tmp");
getFileList(f);
} private void getFileList(File f) { File[] filePaths = f.listFiles();
for (File s : filePaths) {
if (s.isDirectory()) {
getFileList(s);
} else {
if (s.isFile() && -1 != s.getName().lastIndexOf(".xml")) {
filePathsList.add(s);
}
}
}
} @Override
public void run() {
File file = null;
while (index < filePathsList.size()) {
synchronized (this) {
file = filePathsList.get(index);
if (file == null) {
continue;
}
index++;
} // 解析xml
SAXBuilder builder = new SAXBuilder();
List<String> xmlList = new ArrayList<String>(); try {
Thread.sleep(300);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
try {
InputStream is = new FileInputStream(file.getPath());
System.out.println("当前使用的线程是:"
+ Thread.currentThread().getName() + ",正在读文件:"
+ filePathsList.indexOf(file) + ",列表当前长度:"
+ filePathsList.size());
Document doc = builder.build(is);
Element root = doc.getRootElement();
List<Element> list = root.getChildren();
for (Element e : list) {
xmlList.add(e.getChildTextTrim("ERROR_FEEDBACK_ID"));
}
xmlList.add("--------------------------");
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public class CompareTest {
public static void main(String args[]) {
read read = new read();
new Thread(read, "线程1").start();
new Thread(read, "线程2").start();
new Thread(read, "线程3").start();
new Thread(read, "线程4").start();
new Thread(read, "线程5").start();
}
}class read implements Runnable {
List<File> filePathsList = new ArrayList<File>();
int index = 0; public read() {
File f = new File("d:" + File.separator + "tmp");
getFileList(f);
} private void getFileList(File f) {
File[] filePaths = f.listFiles();
for (File s : filePaths) {
if (s.isDirectory()) {
getFileList(s);
} else {
if (-1 != s.getName().lastIndexOf(".xml")) {
filePathsList.add(s);
}
}
}
} @Override
public void run() {
File file = null;
while (index < filePathsList.size()) {
synchronized (this) {
if (index >= filePathsList.size()) {
continue;
}
file = filePathsList.get(index);
index++;
}
// 解析xml
SAXBuilder builder = new SAXBuilder();
List<String> xmlList = new ArrayList<String>(); try {
Thread.sleep(300);
} catch (InterruptedException e2) {
// TODO Auto-generated catch block
e2.printStackTrace();
}
try {
InputStream is = new FileInputStream(file.getPath());
System.out.println("当前使用的线程是:"
+ Thread.currentThread().getName() + ",正在读文件:"
+ filePathsList.indexOf(file) + ",列表当前长度:"
+ filePathsList.size());
Document doc = builder.build(is);
Element root = doc.getRootElement();
List<Element> list = root.getChildren();
for (Element e : list) {
xmlList.add(e.getChildTextTrim("ERROR_FEEDBACK_ID"));
}
xmlList.add("--------------------------");
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.input.SAXBuilder;
public class Test_15 { public static final int THREAD_POOL_SIZE = 5;
private final File POISON = new File("");
private final BlockingQueue<File> files = new LinkedBlockingQueue<File>(1000);
private final ConcurrentLinkedQueue<String> infos = new ConcurrentLinkedQueue<String>();
private Thread[] pool = null;
private volatile boolean running = false;
public Test_15() {
this(THREAD_POOL_SIZE);
}
public Test_15(int poolSize) {
pool = new Thread[poolSize];
FileWorker worker = new FileWorker();
for(int i=0;i<pool.length;i++){
pool[i] = new Thread(worker,"线程"+(i+1));
pool[i].start();
}
running = true;
}
private class FileWorker implements Runnable {
@Override
public void run() {
File file = null;
try {
while((file=files.take()) != POISON){
try {
doWork(file);
} catch (Exception e) {
onException(e,file);
}
}
files.put(POISON);
} catch (InterruptedException e) {
}
} private void onException(Exception e, File file) {
e.printStackTrace();
} private void doWork(File file) {
// 解析XML
SAXBuilder builder = new SAXBuilder();
try {
System.out.println("当前使用的线程是:"
+ Thread.currentThread().getName() + ",正在读文件:"
+ file.getName() + ",列表当前长度:"
+ files.size());
Document doc = builder.build(file);
Element root = doc.getRootElement();
List<Element> list = root.getChildren();
for (Element e : list) {
infos.add(e.getChildTextTrim("ERROR_FEEDBACK_ID"));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void addFile(File file) throws InterruptedException{
files.put(file);
}
public void addFilesIgnoreInterrupted(File[] files) {
for(File file : files){
try {
this.files.put(file);
} catch (InterruptedException e) {
}
}
}
public void shutdown(){
try {
if(running){
running = false;
files.put(POISON);
}
} catch (InterruptedException e) {
}
}
public void waiting(){
if(running || !files.contains(POISON)){
throw new IllegalStateException("You must call shutdown() function before.");
}
for(Thread t : pool){
try {
t.join();
} catch (InterruptedException e) {
}
}
}
public Queue<String> getInfos(){
return infos;
}
public static void main(String[] args) {
Test_15 instance = new Test_15();
File folder = new File("d:\\comparetest");
instance.addFilesIgnoreInterrupted(folder.listFiles());
instance.shutdown();
instance.waiting();
for(String info : instance.getInfos()){
System.out.println(info);
}
}}