背景:
文本文件a.txt,里面每行存放了一个URL。
需求:
计算出出现频率最多的TOP100个URL。
NOTE:简单写了个demo ,处理逻辑 1、先把大数据文件按行数分割为多个小文件 2、每个文件启动一个线程分析文件内容
HELP:100W条数据以下效率1分钟以内,200W以上数据效率很慢,多线程读取文件时出现内存溢出package test;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SortURL {
public static String FilePath = "a.txt";
public static int rows = 10000*10;
public static List<String> fileList = null;
public static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
//拆分文件
cutFile(FilePath,rows);
//多线程处理文件,排序文件内容
threadFile();
}
/**
* 排序文件内容
*/
public static void sortMap() {
// 频率排序
List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
// TODO Auto-generated method stub
return o2.getValue().compareTo(o1.getValue());
}
});
//取前100 最多频率
// List<String> url = new ArrayList<>();
for(int i=0;i<sortList.size();i++) {
if (i > 100) {
break;
}
System.out.println("URL:"+sortList.get(i).getKey() +" ---- 出现频率:"+sortList.get(i).getValue());
}
}
/**
* 创建线程池 ,启动线程处理文件
*/
public static void threadFile() {
System.out.println("多线程分析文件....");
long begin = System.currentTimeMillis();
//线程数 最好依据cpu 分配
ExecutorService es = Executors.newCachedThreadPool();
SortURL su = new SortURL();
for (int i=0 ; i< fileList.size();i++) {
File file = new File(fileList.get(i));
if (file.isFile()) {
es.execute(su.new readFile(file));
} else {
System.out.println("未找到文件");
}
}
es.shutdown();
while(true) {
if (es.isTerminated()) {
long end = System.currentTimeMillis();
System.out.println("解析 "+fileList.size()+" 个文件 耗时:"+(end-begin)+" 毫秒");
sortMap();
break;
}
}
}
/**
* 线程 解析文件汇总url
* @author ThinkPad
*
*/
private class readFile implements Runnable {
File tFile = null;
public readFile(File f) {
// TODO Auto-generated constructor stub
this.tFile = f;
} @Override
public void run() {
resolverFile();
}
/**
* 解析文件
*/
private void resolverFile() {
FileInputStream fis = null;
Scanner sc = null;
Map<String, Integer> map = new HashMap<>();
try {
fis = new FileInputStream(tFile);
sc = new Scanner(fis);
while(sc.hasNextLine()) {
String len = sc.nextLine().trim();
if (map.containsKey(len)) {
int mValue = map.get(len);
map.put(len, mValue+1);
}else {
map.put(len, 1);
}
}
sc.close();
//合并总得urlMap ConcurrentHashMap 线程安全
if(urlMap.size() == 0) {
urlMap.putAll(map);
} else {
for (String key : map.keySet()) {
if (urlMap.containsKey(key)) {
int mValue = urlMap.get(key) + map.get(key);
urlMap.put(key, mValue);
}else {
urlMap.put(key, 1);
}
}
}
// //清空临时map内存
// map.clear();
System.out.println(tFile.getName()+" mapsize:"+map.size() + " urlmapSize:"+urlMap.size());
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 大文件切割
* @param sourceFile
* @param curRows
*/
public static void cutFile(String sourceFile,int curRows) {
System.out.println("开始拆每 "+curRows+" 行,拆分文件");
FileInputStream inputstream = null;
Scanner sc = null;
StringBuilder sbu = null;
BufferedWriter bw = null;
//
try {
inputstream = new FileInputStream(sourceFile);
sbu = new StringBuilder();
fileList = new ArrayList<>();
long begin = System.currentTimeMillis();
// Scanner 方法消耗内存低
sc = new Scanner(inputstream);
int i = 1;
while(sc.hasNextLine()) {
sbu.append(sc.nextLine()).append("\r\n");
if ((i % curRows) == 0) {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(sourceFile+i+".txt")),"UTF-8"));
bw.write(sbu.toString());
fileList.add(sourceFile+i+".txt");
bw.close();
sbu.setLength(0);
}
i++;
}
// 余下行数生成文件
if(((i-1) % curRows) != 0 ) {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(sourceFile+i+".txt")),"UTF-8"));
bw.write(sbu.toString());
fileList.add(sourceFile+i+".txt");
bw.close();
sbu.setLength(0);
}
long end = System.currentTimeMillis();
System.out.println("切割文件耗时: "+(end - begin)+" 毫秒");
inputstream.close();
sc.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
}
}
}
文本文件a.txt,里面每行存放了一个URL。
需求:
计算出出现频率最多的TOP100个URL。
NOTE:简单写了个demo ,处理逻辑 1、先把大数据文件按行数分割为多个小文件 2、每个文件启动一个线程分析文件内容
HELP:100W条数据以下效率1分钟以内,200W以上数据效率很慢,多线程读取文件时出现内存溢出package test;import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SortURL {
public static String FilePath = "a.txt";
public static int rows = 10000*10;
public static List<String> fileList = null;
public static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
//拆分文件
cutFile(FilePath,rows);
//多线程处理文件,排序文件内容
threadFile();
}
/**
* 排序文件内容
*/
public static void sortMap() {
// 频率排序
List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
// TODO Auto-generated method stub
return o2.getValue().compareTo(o1.getValue());
}
});
//取前100 最多频率
// List<String> url = new ArrayList<>();
for(int i=0;i<sortList.size();i++) {
if (i > 100) {
break;
}
System.out.println("URL:"+sortList.get(i).getKey() +" ---- 出现频率:"+sortList.get(i).getValue());
}
}
/**
* 创建线程池 ,启动线程处理文件
*/
public static void threadFile() {
System.out.println("多线程分析文件....");
long begin = System.currentTimeMillis();
//线程数 最好依据cpu 分配
ExecutorService es = Executors.newCachedThreadPool();
SortURL su = new SortURL();
for (int i=0 ; i< fileList.size();i++) {
File file = new File(fileList.get(i));
if (file.isFile()) {
es.execute(su.new readFile(file));
} else {
System.out.println("未找到文件");
}
}
es.shutdown();
while(true) {
if (es.isTerminated()) {
long end = System.currentTimeMillis();
System.out.println("解析 "+fileList.size()+" 个文件 耗时:"+(end-begin)+" 毫秒");
sortMap();
break;
}
}
}
/**
* 线程 解析文件汇总url
* @author ThinkPad
*
*/
private class readFile implements Runnable {
File tFile = null;
public readFile(File f) {
// TODO Auto-generated constructor stub
this.tFile = f;
} @Override
public void run() {
resolverFile();
}
/**
* 解析文件
*/
private void resolverFile() {
FileInputStream fis = null;
Scanner sc = null;
Map<String, Integer> map = new HashMap<>();
try {
fis = new FileInputStream(tFile);
sc = new Scanner(fis);
while(sc.hasNextLine()) {
String len = sc.nextLine().trim();
if (map.containsKey(len)) {
int mValue = map.get(len);
map.put(len, mValue+1);
}else {
map.put(len, 1);
}
}
sc.close();
//合并总得urlMap ConcurrentHashMap 线程安全
if(urlMap.size() == 0) {
urlMap.putAll(map);
} else {
for (String key : map.keySet()) {
if (urlMap.containsKey(key)) {
int mValue = urlMap.get(key) + map.get(key);
urlMap.put(key, mValue);
}else {
urlMap.put(key, 1);
}
}
}
// //清空临时map内存
// map.clear();
System.out.println(tFile.getName()+" mapsize:"+map.size() + " urlmapSize:"+urlMap.size());
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 大文件切割
* @param sourceFile
* @param curRows
*/
public static void cutFile(String sourceFile,int curRows) {
System.out.println("开始拆每 "+curRows+" 行,拆分文件");
FileInputStream inputstream = null;
Scanner sc = null;
StringBuilder sbu = null;
BufferedWriter bw = null;
//
try {
inputstream = new FileInputStream(sourceFile);
sbu = new StringBuilder();
fileList = new ArrayList<>();
long begin = System.currentTimeMillis();
// Scanner 方法消耗内存低
sc = new Scanner(inputstream);
int i = 1;
while(sc.hasNextLine()) {
sbu.append(sc.nextLine()).append("\r\n");
if ((i % curRows) == 0) {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(sourceFile+i+".txt")),"UTF-8"));
bw.write(sbu.toString());
fileList.add(sourceFile+i+".txt");
bw.close();
sbu.setLength(0);
}
i++;
}
// 余下行数生成文件
if(((i-1) % curRows) != 0 ) {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File(sourceFile+i+".txt")),"UTF-8"));
bw.write(sbu.toString());
fileList.add(sourceFile+i+".txt");
bw.close();
sbu.setLength(0);
}
long end = System.currentTimeMillis();
System.out.println("切割文件耗时: "+(end - begin)+" 毫秒");
inputstream.close();
sc.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
}
}
}
解决方案 »
- jdbc连接informix返回中文是乱码
- 请问为什么感觉代码木有问题,而且数据库中也有数据,可是却查不出来啊?
- 项目由jbuilder迁移到myeclipse上的GUI问题
- JAVA的小问题
- 大虾们好啊,小弟刚入java,有一菜鸟问题,请大家帮忙,谢谢阿!
- 紧急求助,我想获得当前的日期值,和三个月后的日期值,请各位帮忙,谢谢
- 关于ViewPort的问题,最后20分了,大家一定要帮我啊
- 早来早得
- 如何从string中提取自己需要的一部分字符
- 帮帮妹妹吧!!!
- Curator的DistributedAtomicValue是否提供绝对原子性?
- 为什么我的bianli方法进去就直接跳到最后一句了啊?还有为什么的主方法中s只能写具体文件名,而不能写文件夹名?一写就报错,哪位大神能教教我
大文件建议用nio操作
我这边用nio写了个例子。package com.test.nio;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class UrlCount implements Runnable { private final static String FILE = "/source";
private final static int SIZE = 10000 * 10;
private final static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();
private final static ExecutorService execute = Executors.newCachedThreadPool(); private long skip;
private long limit;
public UrlCount(long skip, long limit) {
this.skip = skip;
this.limit = limit;
} public static void sortMap() {
// 频率排序
List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
// TODO Auto-generated method stub
return o2.getValue().compareTo(o1.getValue());
}
});
//取前100 最多频率
for (int i = 0; i < sortList.size(); i++) {
if (i > 100) {
break;
}
System.out.println("URL:" + sortList.get(i).getKey() + " ---- 出现频率:" + sortList.get(i).getValue());
}
} @Override
public void run() {
System.out.println(String.format("分析行数:%s - %s", skip, (skip / SIZE + 1) * SIZE - 1));
try {
Files.lines(Paths.get(FILE)).skip(skip).limit(limit).forEach(line -> {
synchronized (urlMap) {
urlMap.put(line, urlMap.containsKey(line) ? urlMap.get(line) + 1 : 1);
}
});
} catch (IOException e) {
e.printStackTrace();
}
} public static void main(String[] args) throws Exception {
long lines = Files.lines(Paths.get(FILE)).count(); long begin = System.currentTimeMillis();
for (int i = 0; i <= lines / SIZE; i++) {
if (i < lines / SIZE) {
execute.execute(new UrlCount(SIZE * i, SIZE));
} else if (lines % SIZE > 0) {
execute.execute(new UrlCount(SIZE * i, lines % SIZE));
}
}
execute.shutdown(); while (true) {
if (execute.isTerminated()) {
long end = System.currentTimeMillis();
sortMap();
System.out.println("耗时:" + (end - begin) + " 毫秒");
break;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.test.nio;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UrlCount implements Runnable {
private final static String FILE = "/source";
private final static int SIZE = 10000 * 10;
private final static ConcurrentHashMap<String, Integer> urlMap = new ConcurrentHashMap<>();
private final static ExecutorService execute = Executors.newCachedThreadPool();
private long skip;
private long limit;
public UrlCount(long skip, long limit) {
this.skip = skip;
this.limit = limit;
}
public static void sortMap() {
// 频率排序
List<Map.Entry<String, Integer>> sortList = new ArrayList<>(urlMap.entrySet());
Collections.sort(sortList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
// TODO Auto-generated method stub
return o2.getValue().compareTo(o1.getValue());
}
});
//取前100 最多频率
for (int i = 0; i < sortList.size(); i++) {
if (i > 100) {
break;
}
System.out.println("URL:" + sortList.get(i).getKey() + " ---- 出现频率:" + sortList.get(i).getValue());
}
}
@Override
public void run() {
System.out.println(String.format("分析行数:%s - %s", skip, (skip / SIZE + 1) * SIZE - 1));
try {
Files.lines(Paths.get(FILE)).skip(skip).limit(limit).forEach(line -> {
synchronized (urlMap) {
urlMap.put(line, urlMap.containsKey(line) ? urlMap.get(line) + 1 : 1);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
long lines = Files.lines(Paths.get(FILE)).count();
long begin = System.currentTimeMillis();
for (int i = 0; i <= lines / SIZE; i++) {
if (i < lines / SIZE) {
execute.execute(new UrlCount(SIZE * i, SIZE));
} else if (lines % SIZE > 0) {
execute.execute(new UrlCount(SIZE * i, lines % SIZE));
}
}
execute.shutdown();
while (true) {
if (execute.isTerminated()) {
long end = System.currentTimeMillis();
sortMap();
System.out.println("耗时:" + (end - begin) + " 毫秒");
break;
}
}
}
}