解决方案 »
- 正则表达式的问题
- executeBatch()的问题,急啊!!望高手指教
- JTable使用TableRowSorter排序后增加行越界问题
- if(a=='A').... 与 if('A'==a).... 有区别吗?那个好?
- 这个最简单的applet用appletview能显示,但在html上就显示不了了?
- Eclipse运行输出“Hello world”小程序出错
- 哪里有wsad5的侠载地址?
- 哪个大哥大姐能帮我一下?一定加分!
- java文档说TreeMap使用的储存结构是Red-Black树,请问什么是Red-Black树,和Btree一样吗?
- 求SQLServer的JDBC驱动程序
- protected 在 继承中 的理解问题
- 经典算法 看大家的基本功是否扎实
例如,5个xml文件,['a.xml', 'b.xml', 'c.xml', 'd.xml', 'e.xml']
这5个文件放在一个Stack中,5个线程从stack中获取数据???如果是这个意思,我的想法是,5个线程 t-1, t-2, t-3, t-4, t-5在Stack中取数据的时候,先去检测有没有该文件的lock文件,如果有a.lock,说明a.xml在用,要等待, 如果没有,生成文件对应的lock文件,再去取数据,极端情况两个线程同时去生成lock文件,先生成的占先,后生成的覆盖失败,继续等待。一个文件一个锁。或者利用ReentrantLock进行锁的获取和释放,来控制线程的调度问题也许有说法欠佳的地方,互相学习
一. 一个线程读取文件的速度,和多个线程读取文件的速度,在吞吐量上,理论上是差不多的,因为,这取决于硬盘的读取速度。
二. 读取出来的数据,需要解析和分析,这个操作是否消耗大量的CPU时间,如果消耗的话用多线程比较靠谱。
三. 与文件的读取相仿,文件的写入也存在硬盘的瓶颈问题,采用多线程提高写入速度,理论上意义不大。其次,要将这个任务拆分成多个步骤,针对某个带有瓶颈的步骤进行优化。
一. 步骤大致分四步:1.递归遍历目录提取要处理的文件;2.读取文件数据;3.解析文件数据,处理文件数据;4.写入结果数据。
二.分析上述四个步骤,其中1.2.4都是硬盘操作,多线程提升的性能不会太显著,重点在第3步,那么多线程的内容一定包含第3步
三.由于2.3.4这三个连续步骤,用一个线程处理一个文件,多个线程可以并发处理,在处理过程中不存在数据冲突或者说互斥现象, 并且,第1步的编程复杂度较大,所以,第1步可以单独一个线程来完成,其余2.3.4建立一个线程来做,后者可以多线程并发234。第三,解决生产者和消费者之间的数据传递问题。
很显然我们需要一个数据结构来完成这个事情,但是,教科书中的生产者消费者模式已经不太适用这个场景。
楼主是用的Stack或者HashTable,我们一般会用阻塞队列这种数据结构。第四,阻塞队列的使用技巧。
1. 为避免生产过剩而爆掉内存,我们可采用带有上限封顶的阻塞队列。
2. 为了能够在任务完成时平滑的关闭消费线程,我们通常会向队列里放置一些特殊数据来关闭消费线程。
这些特殊数据,我们称之为“致命毒药”,一般会声明成常量。第五,并发程度。
1. 多线程并非线程越多,效率越高,他会出现一个峰值,达到峰值后线程再多,反而略有下降。
2. 多线程一般都是用来处理CPU密集型运算的,这个是最佳的选择。当然,也有用来执行异步操作或者阻塞操作的。
比如上述的2.3.4步骤中,一个线程执行3的时候,不访问硬盘,这时可以有其他线程执行2步骤。
我这个程序最大的瓶颈不在读文件,而是在解析的时候要从网页上获取信息,这个过程比较慢,程序在实际运行的时候,尤其是运行了一段时间后,经常出现read timed out的错误,查看了一下,就是在从网页链接获取数据的时候产生的。
试了一下,把数据分开来用比较快,比起互锁线程从同一个栈中取数据来说。
我就写成这样了,先遍历文件夹,把xml文件路径hash到10个stack中,然后建10个线程来分别取数据。
traverDirs();
LOG.info("There are "+count+"xml files!");
stackList.add(stack0);
stackList.add(stack1);
stackList.add(stack2);
stackList.add(stack3);
stackList.add(stack4);
stackList.add(stack5);
stackList.add(stack6);
stackList.add(stack7);
stackList.add(stack8);
stackList.add(stack9);
try {
for (int i = 0; i < 10; i++) {
//计算最恰当的间隔长度
final int p = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
LOG.info("Thread "+p+" Started!");
while(!stackList.get(p).isEmpty())
extractXML(stackList.get(p).pop());
if(stackList.get(p).isEmpty())
LOG.info("Stack "+p+" is Empty!");
} catch (Exception e) {
// TODO Auto-generated catch block
LOG.error("",e);
}
}
}).start(); }
LOG.info("Threads have fineshed!");
} catch (Exception e) {
e.printStackTrace();
}
一、多线程抓取。这就是我上面说的,用多线程技术来解决阻塞问题。
就是有些大材小用,我其实并不太赞成这么用,只是编程复杂度要相对低一些。
二、NIO抓取。异步IO进行数据的抓取,这样的话对线程的使用率是最高的,但由于该技术不常使用,可能稍微困难点。抓网页,一般都用Apache的HttpClient,同步和异步IO都有实现,并且还内置线程池,关键看你会不会用了。
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;/**
* 多线程抓取数据的简单程序
*/
public class MultithreadFetcher { /** 阻塞队列的最大长度,防止内存溢出。 */
public static final int MAX_QUEUE_SIZE = 100;
/** 最大递归深度,防止递归死循环 */
public static final int RECURSION_LEVEL = Integer.MAX_VALUE;
/** 致命毒药,优雅关闭后续的工作线程 */
private static final File DEADLY_POISON = new File("./deadly.tmp");
/**
* 递归遍历文件夹,将遍历的文件放入队列中。
* @param folder 目标文件夹
* @param fileQueue 文件队列
* @param level 递归深度
*/
private static void visitFolder(File folder, BlockingQueue<File> fileQueue, int level) throws InterruptedException{
if(level<=0){//控制递归深度,防止递归死循环。
return;
}
File[] files = folder.listFiles();
for(File file : files){
if(file.isDirectory()){
visitFolder(file,fileQueue,level-1);
}else if(file.getName().toLowerCase().endsWith(".xml")){
fileQueue.put(file);
}else{
//do nothing ...
}
}
}
/**
* 创建目标文件。通过原文件的名称创建一个新的文件。
* @param file 原始文件
* @param targetFolder 目标文件夹
* @return 新的文件,目标文件
*/
private static File createTargetFile(File file, File targetFolder){
String targetFileName = file.getName();
return new File(targetFolder,targetFileName);
}
/**
* 处理文件的操作,可以在这个里面读取文件数据,解析文件,抓取网页,写入备份。
* @param file 原始文件,待解析的文件
* @param target 目标文件,备份文件
*/
private static void travelFile(File file, File target) throws Throwable{
//详细操作从略
}
/** 递归文件夹的线程。不支持多线程并发递归。 */
static class VisitFolderThread extends Thread{
private File folder;
private BlockingQueue<File> fileQueue;
public VisitFolderThread(File folder, BlockingQueue<File> fileQueue) {
super("visit-folder-thread");
this.folder = folder;
this.fileQueue = fileQueue;
}
@Override
public void run() {
try {
visitFolder(folder, fileQueue, RECURSION_LEVEL);
fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
} catch (InterruptedException e) {
// 在这里可以做一些异常处理
e.printStackTrace();
}
}
}
/** 处理文件的线程,可以多线程并发处理,每个线程处理一个文件 */
static class TravelFileThread extends Thread{
private static final AtomicInteger ThreadCount = new AtomicInteger();
private File targetFolder;
private BlockingQueue<File> fileQueue;
public TravelFileThread(File targetFolder, BlockingQueue<File> fileQueue) {
super("travel-file-thread-"+ThreadCount.incrementAndGet());
this.targetFolder = targetFolder;
this.fileQueue = fileQueue;
}
@Override
public void run() {
File file = null;
try {
while((file=fileQueue.take())!=DEADLY_POISON){
File target = createTargetFile(file, targetFolder);
try {
travelFile(file, target);
} catch (Throwable e) {
onException(e,file,target);
}
}
fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
} catch (InterruptedException e) {
// 在这里可以做一些异常处理
e.printStackTrace();
}
}
/** 在处理文件的过程中,如果抛出异常,则进入下面的处理程序,从略。 */
private void onException(Throwable e, File file, File target) {
// 如果travelFile抛出异常,则在此处进行处理。
e.printStackTrace();
}
}
private BlockingQueue<File> fileQueue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE);
private Thread visitFolderThread;
private Thread[] travelFileThreads;
public MultithreadFetcher(File sourceFolder, File targetFolder, int travelThreads) {
super();
visitFolderThread = new VisitFolderThread(sourceFolder, fileQueue);
travelFileThreads = new TravelFileThread[travelThreads];
for(int i=0;i<travelFileThreads.length;i++){
travelFileThreads[i] = new TravelFileThread(targetFolder, fileQueue);
}
}
/**
* 开始执行
*/
public void start(){
visitFolderThread.start();
for(int i=0;i<travelFileThreads.length;i++){
travelFileThreads[i].start();
}
}
/**
* 强行终止。请慎用。程序会自动关闭
*/
public void terminate(){
visitFolderThread.interrupt();
for(int i=0;i<travelFileThreads.length;i++){
travelFileThreads[i].interrupt();
}
}
/**
* 测试用例
*/
public static void main(String[] args) {
final File sourceFolder = new File("");
final File targetFolder = new File("");
final int travelThreads = 20;
MultithreadFetcher fetcher = new MultithreadFetcher(sourceFolder,targetFolder,travelThreads);
fetcher.start();
}}
用您的方法尝试改写了程序,放到服务器上试了一下,选了一秒的数据来看,
处理量从23条 per sec 达到 了 241 per sec
提高了10倍!!!
原来用单线程的时候处理量大概在3-5条,太神速了,这个代码我也要好好研究,继续学习
大神您好,我想问下,你这里会不会出现 VisitFolderThread 还没往BlockingQueue<File> fileQueue 里面放文件对象,TravelFileThread就开始执行fileQueue.take()这个方法呢,是不是要确保VisitFolderThread先执行?
这个问题分析起来比较复杂,除了问题瓶颈以外,还有代码质量等因素,手上没有代码做比较,看不出真正的问题。
还有,Java多线程编程中,我比较习惯用阻塞队列而不是栈这种数据结构,因为两种数据结构的用法也会产生效率问题。之后回答17楼的问题:生产线程和消费线程之间的阻塞队列中,是否要先确保生产线程先放入产品再启动消费线程。
这个没有必要,因为是阻塞队列,并且,我的示例代码中,是具有最大容量的阻塞队列,
也就是说,当队列里面没有对象时,调用take方法的线程会被阻塞;当队列里的对象达到最大容量时,调用put方法的线程被阻塞。
楼主那个代码的效率,相对比较低的原因,我大致看到两点:
一、 同步块(synchronized代码块)包含的代码过于宽泛。(主要原因)
我们知道,被同步原语包含的代码块,是同时只允许一个线程进入执行的,那么,当一个线程进入该块时,其他线程被阻塞。由于同步块涉及的操作太多,拖延的时间过长,造成其他线程一时无法进入代码块执行代码,所以,造成多线程的执行效率降低,因为大部分线程,都处于阻塞状态。
经验而谈,这种状况,先设置一个局部变量,同步块中将等待处理的数据对象传给局部变量,同步块结束,进入局部变量的处理过程,由于局部变量不存在线程安全的问题,是完全可以并发执行的。楼主的方法,是将数据的读取和处理都放在一个同步块中,从而使得其他线程缺乏执行处理代码的机会。二、其他的一些需要注意的问题。
wait和notify一般会成对出现在代码里面,楼主的代码中,没发现notify方法的调用之处,容易产生僵尸线程。
while循环和Thread.sleep配合使用的检测功能,这种效率在多线程中是极其低下的,当然,楼主代码里未发现外层的循环结构,sleep方法使当前线程让出执行权力的同时还会等待一定的延时,之后才能进入runnable状态等待下次被执行的机会,而使用阻塞队列或者使用wait和notify线程不会有等待延时,它会直接被唤醒,进行代码的执行。
看你的个性图片就知道是大神,我想问大神,java中生产者生产获得固定数量的产品后(比如30个产品),消费者才开始消费,怎么理解?请给个思路或者实例,结合队列BlockQueue