public void deleteFiles() { File file = new File(CREATEFILEPATH); if (!file.isDirectory()) return; File[] childrenFiles = file.listFiles(); for (File f : childrenFiles) { f.delete(); } }
class ItemBuff { private MappedByteBuffer mapBuf; private int endIdx; static final int LIMIT = 1 << 22; private int position = 0; public ItemBuff() { this(null, -1); } public ItemBuff(MappedByteBuffer mapBuf, int endIdx) { this.mapBuf = mapBuf; this.endIdx = endIdx; } public ByteBuffer getBuff() { if (empty()) { mapBuf.clear(); return mapBuf = null; } int _limit = (position + LIMIT > endIdx || endIdx - position - LIMIT < 1 << 10 ? endIdx - position : LIMIT); int index = position + _limit; mapBuf.limit(index); mapBuf.position(position); while (mapBuf.get(--index) != 10 && index > 0); mapBuf.limit(++index); position += (index - position); return mapBuf.slice(); } public boolean empty() { return position >= endIdx; } }
class SynchRead implements Runnable { private ItemBuff itemBuff;
public boolean readEnd() { return itemBuffQueue.isEmpty(); }
public final int read() { if (itemBuff == null) { itemBuff = itemBuffQueue.poll(); } ByteBuffer buff = itemBuff.getBuff(); if (buff == null) { itemBuff = itemBuffQueue.poll(); buff = itemBuff.getBuff(); } items.add(buff); buffSize.incrementAndGet(); return buff.capacity(); }
public boolean buffFull() { return buffSize.get() == MAX_ITEMS_SIZE; }
1.不要在while里面打开关闭流
while (scaner.hasNext()) {
buf.flip();
String line = scaner.nextLine();
int no;
if (line.split(",")[0].length() == 1) {
no = Integer.valueOf(line.split(",")[0]);
} else {
no = Integer.valueOf(line.split(",")[0].substring(0, 2));
}
FileChannel fr = new RandomAccessFile(createFilePath + no,"rw").getChannel();
fr.write(ByteBuffer.wrap(line.getBytes()), fr.size());
fr.write(ByteBuffer.wrap("\r\n".getBytes()),fr.size());
buf.clear();
fr.close();
}
FileChannel fr = new RandomAccessFile(createFilePath + no,"rw").getChannel();
fr.write(ByteBuffer.wrap(line.getBytes()), fr.size());
fr.write(ByteBuffer.wrap("\r\n".getBytes()),fr.size());
buf.clear();
fr.close();
FileChannel fr2 = new RandomAccessFile(createFilePath + i,"rw").getChannel();
Scanner sc = new Scanner(fr2);
// 将每个文件每行存入到内存中
while (sc.hasNext()) {
list.add(sc.nextLine());
}
// 排序
Collections.sort(list);
for (String line : list) {
fw2.write(ByteBuffer.wrap(line.getBytes()),fw2.size());
fw2.write(ByteBuffer.wrap("\r\n".getBytes()),fw2.size());
}
list = null;
System.gc();
fr2.close();
在这两个循环里面,打开关闭FileChannel很不效率,尤其是循环次数很多的时候,会浪费大量时间。
我发现你使用FileChannel和RandomAccessFile的方式不太正确,使效率大大降低了。
public void readFile() {
BufferedReader reader = null;
Map<String, FileOutputStream> map = null;
try {
reader = new BufferedReader(new FileReader(filePath));
map = new HashMap<String, FileOutputStream>();
String key = null, line = null;
int count = 0;
while ((line = reader.readLine()) != null) {
int idx = line.indexOf(","), len = line.length();
if (idx + len <= 1) {
key = line;
} else {
key = line.substring(0, (idx == 1 ? idx : 2));
}
if (!map.containsKey(key)) {
map.put(key, new FileOutputStream(createFilePath + key));
}
map.get(key).write((line + "\r\n").getBytes());
if (++count <= 10000) continue;
try {
for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
FileOutputStream out = map.get(it.next());
if (out != null) out.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
count = 0;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (reader != null)
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
if (map != null && !map.isEmpty()) {
try {
for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
FileOutputStream out = map.get(it.next());
if (out != null) out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
我刚才摆弄了一下,找到核心的东西,就是
for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
FileOutputStream out = map.get(it.next());
if (out != null) out.close();
}
在外边循环做流的关闭,按这个思路改了之后,确实比之前的要快了许多。
但 FileChannel和RandomAccessFile 这两个的用法,请指教一下。
单纯的读写文档,Channel的模式,比起BufferReader等,没觉得有速度上的优势。
准备结贴,谢谢每一位为我解答的朋友们。
发一个未完成的事例,你可以继续研究一下。不好意思代码没有注释,忙过了这段时间继续研究一下nio包(ps:nio之前我也没用过)import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;public class SplitFile { public static final String FILEPATH = "./src/a_1g.txt";
public static final String CREATEFILEPATH = "./src/file/";
// 存放文件映射
Queue<ItemBuff> itemBuffQueue = new ArrayDeque<ItemBuff>();
final static int MAX_ITEMS_SIZE = 6;
// 存放分割好的ByteBuffer
Queue<ByteBuffer> items = new ArrayDeque<ByteBuffer>();
AtomicInteger buffSize = new AtomicInteger(), writeThreadCnt = new AtomicInteger();
AtomicLong fileSize = new AtomicLong();
private long startTime = 0;
public long getStartTime() {
return startTime;
} public void setStartTime(long startTime) {
this.startTime = startTime;
} public Map<String, BufferedOutputStream> map = null;
public BufferedOutputStream getStreamByName(String name) {
BufferedOutputStream stream = null;
if (!map.containsKey(name)) {
try {
stream = new BufferedOutputStream(new FileOutputStream(
CREATEFILEPATH + name));
map.put(name, stream);
} catch (FileNotFoundException e) {
System.out.println("file path:" + CREATEFILEPATH + name
+ " not exists!");
}
} else {
stream = map.get(name);
}
return stream;
}
public boolean flush() {
if (map == null || map.isEmpty()) return true;
try {
for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
BufferedOutputStream out = map.get(it.next());
if (out != null) out.flush();
}
} catch (IOException e) {
return false;
}
return true;
}
public boolean close() {
if (map == null || map.isEmpty()) return true;
try {
for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
BufferedOutputStream out = map.get(it.next());
if (out != null) out.close();
map.remove(it);
}
} catch (IOException e) {
return false;
} finally {
map.clear();
map = null;
System.out.println("total times: " + (System.currentTimeMillis() - getStartTime()));
}
return true;
}
public void split() {
RandomAccessFile raf = null;
try {
deleteFiles();
raf = new RandomAccessFile(FILEPATH, "r");
FileChannel fw = raf.getChannel();
fileSize.set(fw.size());
map = new HashMap<String, BufferedOutputStream>();
MappedByteBuffer mapBuf = null;
long position = 0, length = fw.size();
int size = 1 << 25, _size = size, endIdx = 0;
while (true) {
mapBuf = fw.map(FileChannel.MapMode.READ_ONLY, position, size);
endIdx = size - 1;
while ((mapBuf.get(endIdx--) != 10) && (size == _size) && endIdx > 0);
itemBuffQueue.add(this.new ItemBuff(mapBuf, endIdx + 2));
if ((position += size) >= length) {
break;
}
position -= (size - endIdx - 2);
if (position + size > length) {
size = (int) (length - position);
}
fw.position(position);
} if (size != _size) {
itemBuffQueue.add(this.new ItemBuff(mapBuf, endIdx + 1));
}
new Thread(this.new SynchRead(), "read").start();
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
e1.printStackTrace();
} new Thread(this.new SynchWrite()).start();
new Thread(this.new SynchWrite()).start();
new Thread(this.new SynchWrite()).start();
writeThreadCnt.set(3);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (raf != null)
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void deleteFiles() {
File file = new File(CREATEFILEPATH);
if (!file.isDirectory()) return;
File[] childrenFiles = file.listFiles();
for (File f : childrenFiles) {
f.delete();
}
}
class ItemBuff {
private MappedByteBuffer mapBuf;
private int endIdx;
static final int LIMIT = 1 << 22;
private int position = 0;
public ItemBuff() {
this(null, -1);
}
public ItemBuff(MappedByteBuffer mapBuf, int endIdx) {
this.mapBuf = mapBuf;
this.endIdx = endIdx;
}
public ByteBuffer getBuff() {
if (empty()) {
mapBuf.clear();
return mapBuf = null;
}
int _limit = (position + LIMIT > endIdx || endIdx - position - LIMIT < 1 << 10 ? endIdx - position : LIMIT);
int index = position + _limit;
mapBuf.limit(index);
mapBuf.position(position);
while (mapBuf.get(--index) != 10 && index > 0);
mapBuf.limit(++index);
position += (index - position);
return mapBuf.slice();
}
public boolean empty() {
return position >= endIdx;
}
}
class SynchRead implements Runnable { private ItemBuff itemBuff;
public boolean readEnd() {
return itemBuffQueue.isEmpty();
}
public final int read() {
if (itemBuff == null) {
itemBuff = itemBuffQueue.poll();
}
ByteBuffer buff = itemBuff.getBuff();
if (buff == null) {
itemBuff = itemBuffQueue.poll();
buff = itemBuff.getBuff();
}
items.add(buff);
buffSize.incrementAndGet();
return buff.capacity();
}
public boolean buffFull() {
return buffSize.get() == MAX_ITEMS_SIZE;
}
@Override
public void run() {
try {
while (!readEnd()) {
synchronized (items) {
if (buffFull()) {
Thread.sleep(1000);
} else {
read();
items.notifyAll();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class SynchWrite implements Runnable {
ByteBuffer byteBuf; public boolean buffEmpty() {
return buffSize.get() == 0;
}
public void write() {
byteBuf = items.poll();
int index = -1, capacity = byteBuf.capacity();
byte[] wb = new byte[100];
try {
while (capacity-- > 0) {
wb[++index] = byteBuf.get();
if (wb[index] == 10) {
String key = new String(wb, 0, (index == 1 | wb[1] == 44) ? 1 : 2);
getStreamByName(key).write(wb, 0, index + 1);
index = -1;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
buffSize.decrementAndGet();
fileSize.getAndAdd(0 - byteBuf.capacity());
}
}
@Override
public void run() {
try {
while (fileSize.get() > 0.0) {
synchronized (items) {
if (buffEmpty()) {
items.wait(500);
} else {
write();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeThreadCnt.decrementAndGet();
if (writeThreadCnt.get() == 0) {
close();
}
}
}
}
/**
* 测试类
* @param args
*/
public static void main(String[] args) {
SplitFile sf = new SplitFile();
sf.setStartTime(System.currentTimeMillis());
sf.split();
}
}