解决方案 »

  1.   

    你的意思是不是这样的:
    例如,5个xml文件,['a.xml', 'b.xml', 'c.xml', 'd.xml', 'e.xml']
    这5个文件放在一个Stack中,5个线程从stack中获取数据???如果是这个意思,我的想法是,5个线程 t-1, t-2, t-3, t-4, t-5在Stack中取数据的时候,先去检测有没有该文件的lock文件,如果有a.lock,说明a.xml在用,要等待, 如果没有,生成文件对应的lock文件,再去取数据,极端情况两个线程同时去生成lock文件,先生成的占先,后生成的覆盖失败,继续等待。一个文件一个锁。或者利用ReentrantLock进行锁的获取和释放,来控制线程的调度问题也许有说法欠佳的地方,互相学习
      

  2.   

    首先,楼主你要弄清楚,单线程执行的瓶颈是什么。
    一. 一个线程读取文件的速度,和多个线程读取文件的速度,在吞吐量上,理论上是差不多的,因为,这取决于硬盘的读取速度。
    二. 读取出来的数据,需要解析和分析,这个操作是否消耗大量的CPU时间,如果消耗的话用多线程比较靠谱。
    三. 与文件的读取相仿,文件的写入也存在硬盘的瓶颈问题,采用多线程提高写入速度,理论上意义不大。其次,要将这个任务拆分成多个步骤,针对某个带有瓶颈的步骤进行优化。
    一. 步骤大致分四步:1.递归遍历目录提取要处理的文件;2.读取文件数据;3.解析文件数据,处理文件数据;4.写入结果数据。
    二.分析上述四个步骤,其中1.2.4都是硬盘操作,多线程提升的性能不会太显著,重点在第3步,那么多线程的内容一定包含第3步
    三.由于2.3.4这三个连续步骤,用一个线程处理一个文件,多个线程可以并发处理,在处理过程中不存在数据冲突或者说互斥现象, 并且,第1步的编程复杂度较大,所以,第1步可以单独一个线程来完成,其余2.3.4建立一个线程来做,后者可以多线程并发234。第三,解决生产者和消费者之间的数据传递问题。
    很显然我们需要一个数据结构来完成这个事情,但是,教科书中的生产者消费者模式已经不太适用这个场景。
    楼主是用的Stack或者HashTable,我们一般会用阻塞队列这种数据结构。第四,阻塞队列的使用技巧。
    1. 为避免生产过剩而爆掉内存,我们可采用带有上限封顶的阻塞队列。
    2. 为了能够在任务完成时平滑的关闭消费线程,我们通常会向队列里放置一些特殊数据来关闭消费线程。
    这些特殊数据,我们称之为“致命毒药”,一般会声明成常量。第五,并发程度。
    1. 多线程并非线程越多,效率越高,他会出现一个峰值,达到峰值后线程再多,反而略有下降。
    2. 多线程一般都是用来处理CPU密集型运算的,这个是最佳的选择。当然,也有用来执行异步操作或者阻塞操作的。
    比如上述的2.3.4步骤中,一个线程执行3的时候,不访问硬盘,这时可以有其他线程执行2步骤。
      

  3.   

    看了回复明白了一些,很详细。
    我这个程序最大的瓶颈不在读文件,而是在解析的时候要从网页上获取信息,这个过程比较慢,程序在实际运行的时候,尤其是运行了一段时间后,经常出现read timed out的错误,查看了一下,就是在从网页链接获取数据的时候产生的。
    试了一下,把数据分开来用比较快,比起互锁线程从同一个栈中取数据来说。
    我就写成这样了,先遍历文件夹,把xml文件路径hash到10个stack中,然后建10个线程来分别取数据。
    traverDirs();
    LOG.info("There are "+count+"xml files!");
    stackList.add(stack0);
    stackList.add(stack1);
    stackList.add(stack2);
    stackList.add(stack3);
    stackList.add(stack4);
    stackList.add(stack5);
    stackList.add(stack6);
    stackList.add(stack7);
    stackList.add(stack8);
    stackList.add(stack9);
    try {
    for (int i = 0; i < 10; i++) {
    //计算最恰当的间隔长度
    final int p = i;
    new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    LOG.info("Thread "+p+" Started!");
    while(!stackList.get(p).isEmpty())
    extractXML(stackList.get(p).pop());
    if(stackList.get(p).isEmpty())
    LOG.info("Stack "+p+" is Empty!");
    } catch (Exception e) {
    // TODO Auto-generated catch block
    LOG.error("",e);
    }

    }
    }).start(); }
    LOG.info("Threads have fineshed!");
    } catch (Exception e) {
    e.printStackTrace();
    }
      

  4.   

    既然瓶颈是网页的抓取操作(IO读取),那么,有两种方案可以解决这个瓶颈:
    一、多线程抓取。这就是我上面说的,用多线程技术来解决阻塞问题。
            就是有些大材小用,我其实并不太赞成这么用,只是编程复杂度要相对低一些。
    二、NIO抓取。异步IO进行数据的抓取,这样的话对线程的使用率是最高的,但由于该技术不常使用,可能稍微困难点。抓网页,一般都用Apache的HttpClient,同步和异步IO都有实现,并且还内置线程池,关键看你会不会用了。
      

  5.   

    简单写了一个例子,代码没进行测试。这BIO方式我是不太赞成的。
    import java.io.File;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;/**
     * 多线程抓取数据的简单程序
     */
    public class MultithreadFetcher { /** 阻塞队列的最大长度,防止内存溢出。  */
    public static final int MAX_QUEUE_SIZE = 100;
    /** 最大递归深度,防止递归死循环  */
    public static final int RECURSION_LEVEL = Integer.MAX_VALUE;
    /** 致命毒药,优雅关闭后续的工作线程  */
    private static final File DEADLY_POISON = new File("./deadly.tmp");

    /**
     * 递归遍历文件夹,将遍历的文件放入队列中。
     * @param folder 目标文件夹
     * @param fileQueue 文件队列
     * @param level 递归深度
     */
    private static void visitFolder(File folder, BlockingQueue<File> fileQueue, int level) throws InterruptedException{
    if(level<=0){//控制递归深度,防止递归死循环。
    return;
    }
    File[] files = folder.listFiles();
    for(File file : files){
    if(file.isDirectory()){
    visitFolder(file,fileQueue,level-1);
    }else if(file.getName().toLowerCase().endsWith(".xml")){
    fileQueue.put(file);
    }else{
    //do nothing ...
    }
    }
    }
    /**
     * 创建目标文件。通过原文件的名称创建一个新的文件。
     * @param file 原始文件
     * @param targetFolder 目标文件夹
     * @return 新的文件,目标文件
     */
    private static File createTargetFile(File file, File targetFolder){
    String targetFileName = file.getName();
    return new File(targetFolder,targetFileName);
    }
    /**
     * 处理文件的操作,可以在这个里面读取文件数据,解析文件,抓取网页,写入备份。
     * @param file 原始文件,待解析的文件
     * @param target 目标文件,备份文件
     */
    private static void travelFile(File file, File target) throws Throwable{
    //详细操作从略
    }

    /** 递归文件夹的线程。不支持多线程并发递归。 */
    static class VisitFolderThread extends Thread{
    private File folder;
    private BlockingQueue<File> fileQueue;
    public VisitFolderThread(File folder, BlockingQueue<File> fileQueue) {
    super("visit-folder-thread");
    this.folder = folder;
    this.fileQueue = fileQueue;
    }
    @Override
    public void run() {
    try {
    visitFolder(folder, fileQueue, RECURSION_LEVEL);
    fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
    } catch (InterruptedException e) {
    // 在这里可以做一些异常处理
    e.printStackTrace();
    }
    }
    }

    /** 处理文件的线程,可以多线程并发处理,每个线程处理一个文件  */
    static class TravelFileThread extends Thread{
    private static final AtomicInteger ThreadCount = new AtomicInteger();
    private File targetFolder;
    private BlockingQueue<File> fileQueue;
    public TravelFileThread(File targetFolder, BlockingQueue<File> fileQueue) {
    super("travel-file-thread-"+ThreadCount.incrementAndGet());
    this.targetFolder = targetFolder;
    this.fileQueue = fileQueue;
    }
    @Override
    public void run() {
    File file = null;
    try {
    while((file=fileQueue.take())!=DEADLY_POISON){
    File target = createTargetFile(file, targetFolder);
    try {
    travelFile(file, target);
    } catch (Throwable e) {
    onException(e,file,target);
    }
    }
    fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
    } catch (InterruptedException e) {
    // 在这里可以做一些异常处理
    e.printStackTrace();
    }
    }
    /** 在处理文件的过程中,如果抛出异常,则进入下面的处理程序,从略。 */
    private void onException(Throwable e, File file, File target) {
    // 如果travelFile抛出异常,则在此处进行处理。
    e.printStackTrace();
    }
    }

    private BlockingQueue<File> fileQueue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE);
    private Thread visitFolderThread;
    private Thread[] travelFileThreads;

    public MultithreadFetcher(File sourceFolder, File targetFolder, int travelThreads) {
    super();
    visitFolderThread = new VisitFolderThread(sourceFolder, fileQueue);
    travelFileThreads = new TravelFileThread[travelThreads];
    for(int i=0;i<travelFileThreads.length;i++){
    travelFileThreads[i] = new TravelFileThread(targetFolder, fileQueue);
    }
    }

    /**
     * 开始执行
     */
    public void start(){
    visitFolderThread.start();
    for(int i=0;i<travelFileThreads.length;i++){
    travelFileThreads[i].start();
    }
    }
    /**
     * 强行终止。请慎用。程序会自动关闭
     */
    public void terminate(){
    visitFolderThread.interrupt();
    for(int i=0;i<travelFileThreads.length;i++){
    travelFileThreads[i].interrupt();
    }
    }

    /**
     * 测试用例
     */
    public static void main(String[] args) {
    final File sourceFolder = new File("");
    final File targetFolder = new File("");
    final int travelThreads = 20;
    MultithreadFetcher fetcher = new MultithreadFetcher(sourceFolder,targetFolder,travelThreads);
    fetcher.start();
    }}
      

  6.   

    Java我不会。我谈谈自己VC++怎么处理吧。也许对你有帮助。1,对 “ 我想的是把获取的xml路径放入stack中,每个线程都从这个stack中获取数据,只到栈空。”这个 stack单独用一个类来管理。记得一定要设计成单例模式(很重要)。 Class MyStack;2,在其他线程中创建一个该类的实例  MyStack m_threadStack;      在线程中即可访问这个栈中数据,判断栈是否为空等等。     每个线程都可以更改该栈的数据,该类数据改变对于每个线程是同步的。这样就能解决楼主的问题了。注:这个是VC的办法。但是技术思想的相通的。希望对楼主有帮助。
      

  7.   

    膜拜一下大神
    用您的方法尝试改写了程序,放到服务器上试了一下,选了一秒的数据来看,
    处理量从23条 per sec 达到 了 241 per sec
    提高了10倍!!!
    原来用单线程的时候处理量大概在3-5条,太神速了,这个代码我也要好好研究,继续学习
      

  8.   


    大神您好,我想问下,你这里会不会出现 VisitFolderThread 还没往BlockingQueue<File> fileQueue 里面放文件对象,TravelFileThread就开始执行fileQueue.take()这个方法呢,是不是要确保VisitFolderThread先执行?
      

  9.   

    先回答楼主的问题:同样是多线程,为什么有的快有的慢。
    这个问题分析起来比较复杂,除了问题瓶颈以外,还有代码质量等因素,手上没有代码做比较,看不出真正的问题。
    还有,Java多线程编程中,我比较习惯用阻塞队列而不是栈这种数据结构,因为两种数据结构的用法也会产生效率问题。之后回答17楼的问题:生产线程和消费线程之间的阻塞队列中,是否要先确保生产线程先放入产品再启动消费线程。
    这个没有必要,因为是阻塞队列,并且,我的示例代码中,是具有最大容量的阻塞队列,
    也就是说,当队列里面没有对象时,调用take方法的线程会被阻塞;当队列里的对象达到最大容量时,调用put方法的线程被阻塞。
      

  10.   

    哦。抱歉,好几天没上论坛了,忘了楼主的帖子上有代码了。
    楼主那个代码的效率,相对比较低的原因,我大致看到两点:
    一、 同步块(synchronized代码块)包含的代码过于宽泛。(主要原因)
          我们知道,被同步原语包含的代码块,是同时只允许一个线程进入执行的,那么,当一个线程进入该块时,其他线程被阻塞。由于同步块涉及的操作太多,拖延的时间过长,造成其他线程一时无法进入代码块执行代码,所以,造成多线程的执行效率降低,因为大部分线程,都处于阻塞状态。
           经验而谈,这种状况,先设置一个局部变量,同步块中将等待处理的数据对象传给局部变量,同步块结束,进入局部变量的处理过程,由于局部变量不存在线程安全的问题,是完全可以并发执行的。楼主的方法,是将数据的读取和处理都放在一个同步块中,从而使得其他线程缺乏执行处理代码的机会。二、其他的一些需要注意的问题。
    wait和notify一般会成对出现在代码里面,楼主的代码中,没发现notify方法的调用之处,容易产生僵尸线程。
    while循环和Thread.sleep配合使用的检测功能,这种效率在多线程中是极其低下的,当然,楼主代码里未发现外层的循环结构,sleep方法使当前线程让出执行权力的同时还会等待一定的延时,之后才能进入runnable状态等待下次被执行的机会,而使用阻塞队列或者使用wait和notify线程不会有等待延时,它会直接被唤醒,进行代码的执行。
      

  11.   


    看你的个性图片就知道是大神,我想问大神,java中生产者生产获得固定数量的产品后(比如30个产品),消费者才开始消费,怎么理解?请给个思路或者实例,结合队列BlockQueue