背景:
    文本文件a.txt,里面每行存放了一个URL。
 需求:
    计算出出现频率最多的TOP100个URL。
 NOTE:简单写了个demo ,处理逻辑 1、先把大数据文件按行数分割为多个小文件 2、每个文件启动一个线程分析文件内容
 HELP:100W条数据以下效率1分钟以内,200W以上数据效率很慢,多线程读取文件时出现内存溢出package test;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SortURL {

public static String FilePath = "a.txt";
public static int rows = 10000*10;
public static List<String> fileList = null;
public static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();

public static void main(String[] args) {

//拆分文件
cutFile(FilePath,rows);
//多线程处理文件,排序文件内容
threadFile();
}

/**
 *  排序文件内容
 */
public static void sortMap() {
// 频率排序
List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
// TODO Auto-generated method stub
return o2.getValue().compareTo(o1.getValue());
}
});
//取前100 最多频率
// List<String> url = new ArrayList<>();
for(int i=0;i<sortList.size();i++) {
if (i > 100) {
break;
}
System.out.println("URL:"+sortList.get(i).getKey() +" ---- 出现频率:"+sortList.get(i).getValue());
}
}

/**
 *  创建线程池 ,启动线程处理文件
 */
public static void threadFile() {
System.out.println("多线程分析文件....");
long begin = System.currentTimeMillis();

//线程数 最好依据cpu 分配
ExecutorService es = Executors.newCachedThreadPool();
SortURL su = new SortURL();
for (int i=0 ; i< fileList.size();i++) {
File file = new File(fileList.get(i));
if (file.isFile()) {
es.execute(su.new readFile(file));
} else {
System.out.println("未找到文件");
}
}
es.shutdown();
while(true) {
if (es.isTerminated()) {
long end = System.currentTimeMillis();
System.out.println("解析 "+fileList.size()+" 个文件  耗时:"+(end-begin)+" 毫秒");
sortMap();
break;
}
}
}

/**
 * 线程 解析文件汇总url
 * @author ThinkPad
 *
 */
private class readFile implements Runnable {

File tFile = null;

public readFile(File f) {
// TODO Auto-generated constructor stub
this.tFile = f;
} @Override
public void run() {
resolverFile();
}

/**
 * 解析文件
 */
private void resolverFile() {
FileInputStream fis = null;
Scanner sc = null;

Map<String, Integer> map = new HashMap<>();
try {
fis = new FileInputStream(tFile);
sc = new Scanner(fis);
while(sc.hasNextLine()) {
String len = sc.nextLine().trim();
if (map.containsKey(len)) {
int mValue = map.get(len);
map.put(len, mValue+1);
}else {
map.put(len, 1);
}
}
sc.close();
//合并总得urlMap   ConcurrentHashMap 线程安全
if(urlMap.size() == 0) {
urlMap.putAll(map);
} else {
for (String key : map.keySet()) {
if (urlMap.containsKey(key)) {
int mValue = urlMap.get(key) + map.get(key);
urlMap.put(key, mValue);
}else {
urlMap.put(key, 1);
}
}
}
// //清空临时map内存
// map.clear();
System.out.println(tFile.getName()+"   mapsize:"+map.size() + "   urlmapSize:"+urlMap.size());
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}


/**
 * 大文件切割
 * @param sourceFile
 * @param curRows
 */
public static void cutFile(String sourceFile,int curRows) {
System.out.println("开始拆每  "+curRows+"  行,拆分文件");
FileInputStream inputstream = null;
Scanner sc = null;
StringBuilder sbu = null;
BufferedWriter bw = null;
//
try {
inputstream = new FileInputStream(sourceFile);
sbu = new StringBuilder();
fileList = new ArrayList<>();

long begin = System.currentTimeMillis();
// Scanner 方法消耗内存低
sc = new Scanner(inputstream);
int i = 1;
while(sc.hasNextLine()) {
sbu.append(sc.nextLine()).append("\r\n");
if ((i % curRows) == 0) {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(sourceFile+i+".txt")),"UTF-8"));
bw.write(sbu.toString());
fileList.add(sourceFile+i+".txt");
bw.close();
sbu.setLength(0);
}
i++;
}
// 余下行数生成文件
if(((i-1) % curRows) != 0 ) {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(sourceFile+i+".txt")),"UTF-8"));
bw.write(sbu.toString());
fileList.add(sourceFile+i+".txt");
bw.close();
sbu.setLength(0);
}
long end = System.currentTimeMillis();
System.out.println("切割文件耗时: "+(end - begin)+" 毫秒");
inputstream.close();
sc.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {

}
}
}

解决方案 »

  1.   

    是不是文件流没关闭的原因?用mat工具分析一下。还有你这多线程有问题,每次统计结果都不一样。
    大文件建议用nio操作
    我这边用nio写了个例子。package com.test.nio;import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;public class UrlCount implements Runnable {    private final static String FILE = "/source";
        private final static int SIZE = 10000 * 10;
        private final static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();
        private final static ExecutorService execute = Executors.newCachedThreadPool();    private long skip;
        private long limit;
        public UrlCount(long skip, long limit) {
            this.skip = skip;
            this.limit = limit;
        }    public static void sortMap() {
            // 频率排序
            List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
            Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
                @Override
                public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                    // TODO Auto-generated method stub
                    return o2.getValue().compareTo(o1.getValue());
                }
            });
            //取前100 最多频率
            for (int i = 0; i < sortList.size(); i++) {
                if (i > 100) {
                    break;
                }
                System.out.println("URL:" + sortList.get(i).getKey() + " ---- 出现频率:" + sortList.get(i).getValue());
            }
        }    @Override
        public void run() {
            System.out.println(String.format("分析行数:%s - %s", skip, (skip / SIZE + 1) * SIZE - 1));
            try {
                Files.lines(Paths.get(FILE)).skip(skip).limit(limit).forEach(line -> {
                    synchronized (urlMap) {
                        urlMap.put(line, urlMap.containsKey(line) ? urlMap.get(line) + 1 : 1);
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }    public static void main(String[] args) throws Exception {
            long lines = Files.lines(Paths.get(FILE)).count();        long begin = System.currentTimeMillis();
            for (int i = 0; i <= lines / SIZE; i++) {
                if (i < lines / SIZE) {
                    execute.execute(new UrlCount(SIZE * i, SIZE));
                } else if (lines % SIZE > 0) {
                    execute.execute(new UrlCount(SIZE * i, lines % SIZE));
                }
            }
            execute.shutdown();        while (true) {
                if (execute.isTerminated()) {
                    long end = System.currentTimeMillis();
                    sortMap();
                    System.out.println("耗时:" + (end - begin) + " 毫秒");
                    break;
                }
            }
        }
    }
      

  2.   

    Java code?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    package com.test.nio;
     
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class UrlCount implements Runnable {
     
        private final static String FILE = "/source";
        private final static int SIZE = 10000 * 10;
        private final static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();
        private final static ExecutorService execute = Executors.newCachedThreadPool();
     
        private long skip;
        private long limit;
        public UrlCount(long skip, long limit) {
            this.skip = skip;
            this.limit = limit;
        }
     
        public static void sortMap() {
            // 频率排序
            List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
            Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
                @Override
                public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                    // TODO Auto-generated method stub
                    return o2.getValue().compareTo(o1.getValue());
                }
            });
            //取前100 最多频率
            for (int i = 0; i < sortList.size(); i++) {
                if (i > 100) {
                    break;
                }
                System.out.println("URL:" + sortList.get(i).getKey() + " ---- 出现频率:" + sortList.get(i).getValue());
            }
        }
     
        @Override
        public void run() {
            System.out.println(String.format("分析行数:%s - %s", skip, (skip / SIZE + 1) * SIZE - 1));
            try {
                Files.lines(Paths.get(FILE)).skip(skip).limit(limit).forEach(line -> {
                    synchronized (urlMap) {
                        urlMap.put(line, urlMap.containsKey(line) ? urlMap.get(line) + 1 : 1);
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
     
        public static void main(String[] args) throws Exception {
            long lines = Files.lines(Paths.get(FILE)).count();
     
            long begin = System.currentTimeMillis();
            for (int i = 0; i <= lines / SIZE; i++) {
                if (i < lines / SIZE) {
                    execute.execute(new UrlCount(SIZE * i, SIZE));
                } else if (lines % SIZE > 0) {
                    execute.execute(new UrlCount(SIZE * i, lines % SIZE));
                }
            }
            execute.shutdown();
     
            while (true) {
                if (execute.isTerminated()) {
                    long end = System.currentTimeMillis();
                    sortMap();
                    System.out.println("耗时:" + (end - begin) + " 毫秒");
                    break;
                }
            }
        }
    }