解决方案 »

  1.   

    代码我就不看了,直接给你两个网站自己参考吧http://blog.csdn.net/dengqiaodey/article/details/7856814http://wgslucky.blog.163.com/blog/static/97562532201332324639689/睡觉
      

  2.   

    嘿,这种大文件倒是用hadoop大数据处理的好例子
      

  3.   

    思路是对的,按照首字母hash之后分成若干个小文件排序之后再合起来。不过读文件没这么慢的,你可以先把你的逻辑部分去掉,看看到底是哪一步太慢
      

  4.   

    代码有问题
    1.不要在while里面打开关闭流   
     
      while (scaner.hasNext()) {
                buf.flip();
                String line =  scaner.nextLine();
                int no;
                if (line.split(",")[0].length() == 1) {
                    no = Integer.valueOf(line.split(",")[0]);
                } else {
                    no = Integer.valueOf(line.split(",")[0].substring(0, 2));
                }
                FileChannel fr =  new RandomAccessFile(createFilePath + no,"rw").getChannel();
                fr.write(ByteBuffer.wrap(line.getBytes()), fr.size());
                fr.write(ByteBuffer.wrap("\r\n".getBytes()),fr.size());
                buf.clear();
                fr.close();
            }
      

  5.   

    你写的读取方式FileChannel显然是最正确的。但是问题出在:
    FileChannel fr =  new RandomAccessFile(createFilePath + no,"rw").getChannel();
                fr.write(ByteBuffer.wrap(line.getBytes()), fr.size());
                fr.write(ByteBuffer.wrap("\r\n".getBytes()),fr.size());
                buf.clear();
                fr.close();
    FileChannel fr2 = new RandomAccessFile(createFilePath + i,"rw").getChannel();
                Scanner sc = new Scanner(fr2);
                // 将每个文件每行存入到内存中
                while (sc.hasNext()) {
                    list.add(sc.nextLine());
                }
                // 排序
                Collections.sort(list);
                for (String line : list) {
                    fw2.write(ByteBuffer.wrap(line.getBytes()),fw2.size());
                    fw2.write(ByteBuffer.wrap("\r\n".getBytes()),fw2.size());
                }
                list = null;
                System.gc();
                fr2.close();
    在这两个循环里面,打开关闭FileChannel很不效率,尤其是循环次数很多的时候,会浪费大量时间。
      

  6.   

    我要对N个文件进行读写操作,每次不开一个新的FileChannel不行啊。有什么解决方案吗?
      

  7.   

    修改了一下读写方式,思路完全不变。测试了一下1G大小的文件读写时间为20秒左右。
    我发现你使用FileChannel和RandomAccessFile的方式不太正确,使效率大大降低了。
    public void readFile() {
    BufferedReader reader = null;
    Map<String, FileOutputStream> map = null;
    try {
    reader = new BufferedReader(new FileReader(filePath));
    map = new HashMap<String, FileOutputStream>();
    String key = null, line = null;
    int count = 0;
    while ((line = reader.readLine()) != null) {
    int idx = line.indexOf(","), len = line.length();
    if (idx + len <= 1) {
    key = line;
    } else {
    key = line.substring(0, (idx == 1 ? idx : 2));
    }

    if (!map.containsKey(key)) {
    map.put(key, new FileOutputStream(createFilePath + key));
    }
    map.get(key).write((line + "\r\n").getBytes());

    if (++count <= 10000) continue;
    try {
    for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
    FileOutputStream out = map.get(it.next());
    if (out != null) out.flush();
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    count = 0;
    }
    } catch (FileNotFoundException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    try {
    if (reader != null)
    reader.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    if (map != null && !map.isEmpty()) {
    try {
    for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
    FileOutputStream out = map.get(it.next());
    if (out != null) out.close();
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    }
      

  8.   

    您用心了,能否指点我一下,我的用法哪里有问题吗?
    我刚才摆弄了一下,找到核心的东西,就是
                    for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
                        FileOutputStream out = map.get(it.next());
                        if (out != null) out.close();
                    }
    在外边循环做流的关闭,按这个思路改了之后,确实比之前的要快了许多。
    但 FileChannel和RandomAccessFile 这两个的用法,请指教一下。
      

  9.   

    sorry! 周末就是玩的时间,所以没有关注哦,周一上班等手里的工作完成后再修改一个FileChannel和RandomAccessFile的版本
      

  10.   

    木有找到满意的答案,感觉NIO 如果不是在SOCKET的环境下,好像发挥不出非阻塞的优势。
    单纯的读写文档,Channel的模式,比起BufferReader等,没觉得有速度上的优势。
    准备结贴,谢谢每一位为我解答的朋友们。
      

  11.   

    不好意思,说做个nio读写的,结果没有完成就被另一个小组拉过去做紧急ios开发了。
    发一个未完成的事例,你可以继续研究一下。不好意思代码没有注释,忙过了这段时间继续研究一下nio包(ps:nio之前我也没用过)import java.io.BufferedOutputStream;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.nio.ByteBuffer;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.util.ArrayDeque;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Queue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;public class SplitFile { public static final String FILEPATH = "./src/a_1g.txt";
    public static final String CREATEFILEPATH = "./src/file/";
    // 存放文件映射
    Queue<ItemBuff> itemBuffQueue = new ArrayDeque<ItemBuff>();
    final static int MAX_ITEMS_SIZE = 6;
    // 存放分割好的ByteBuffer
    Queue<ByteBuffer> items = new ArrayDeque<ByteBuffer>();
    AtomicInteger buffSize = new AtomicInteger(), writeThreadCnt = new AtomicInteger();
    AtomicLong fileSize = new AtomicLong();
    private long startTime = 0;

    public long getStartTime() {
    return startTime;
    } public void setStartTime(long startTime) {
    this.startTime = startTime;
    } public Map<String, BufferedOutputStream> map = null;
    public BufferedOutputStream getStreamByName(String name) {
    BufferedOutputStream stream = null;
    if (!map.containsKey(name)) {
    try {
    stream = new BufferedOutputStream(new FileOutputStream(
    CREATEFILEPATH + name));
    map.put(name, stream);
    } catch (FileNotFoundException e) {
    System.out.println("file path:" + CREATEFILEPATH + name
    + " not exists!");
    }
    } else {
    stream = map.get(name);
    }
    return stream;
    }

    public boolean flush() {
    if (map == null || map.isEmpty()) return true;
    try {
    for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
    BufferedOutputStream out = map.get(it.next());
    if (out != null) out.flush();
    }
    } catch (IOException e) {
    return false;
    }
    return true;
    }

    public boolean close() {
    if (map == null || map.isEmpty()) return true;
    try {
    for (Iterator<String> it = map.keySet().iterator(); it.hasNext();) {
    BufferedOutputStream out = map.get(it.next());
    if (out != null) out.close();
    map.remove(it);
    }
    } catch (IOException e) {
    return false;
    } finally {
    map.clear();
    map = null;
    System.out.println("total times: " + (System.currentTimeMillis() - getStartTime()));
    }
    return true;
    }

    public void split() {
    RandomAccessFile raf = null;
    try {
    deleteFiles();
    raf = new RandomAccessFile(FILEPATH, "r");
    FileChannel fw = raf.getChannel();
    fileSize.set(fw.size());
    map = new HashMap<String, BufferedOutputStream>();
    MappedByteBuffer mapBuf = null;
    long position = 0, length = fw.size();
    int size = 1 << 25, _size = size, endIdx = 0;
    while (true) {
    mapBuf = fw.map(FileChannel.MapMode.READ_ONLY, position, size);
    endIdx = size - 1;
    while ((mapBuf.get(endIdx--) != 10) && (size == _size) && endIdx > 0);
    itemBuffQueue.add(this.new ItemBuff(mapBuf, endIdx + 2));

    if ((position += size) >= length) {
    break;
    }
    position -= (size - endIdx - 2);
    if (position + size > length) {
    size = (int) (length - position);
    }
    fw.position(position);
    } if (size != _size) {
    itemBuffQueue.add(this.new ItemBuff(mapBuf, endIdx + 1));
    }
    new Thread(this.new SynchRead(), "read").start();
    try {
    Thread.sleep(10);
    } catch (InterruptedException e1) {
    e1.printStackTrace();
    } new Thread(this.new SynchWrite()).start();
    new Thread(this.new SynchWrite()).start();
    new Thread(this.new SynchWrite()).start();
    writeThreadCnt.set(3);

    } catch (FileNotFoundException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    try {
    if (raf != null)
    raf.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }

    public void deleteFiles() {
    File file = new File(CREATEFILEPATH);
    if (!file.isDirectory()) return;
    File[] childrenFiles = file.listFiles();
    for (File f : childrenFiles) {
    f.delete();
    }
    }

    class ItemBuff {
    private MappedByteBuffer mapBuf;
    private int endIdx;
    static final int LIMIT = 1 << 22;
    private int position = 0;
    public ItemBuff() {
    this(null, -1);
    }
    public ItemBuff(MappedByteBuffer mapBuf, int endIdx) {
    this.mapBuf = mapBuf;
    this.endIdx = endIdx;
    }
    public ByteBuffer getBuff() {
    if (empty()) {
    mapBuf.clear();
    return mapBuf = null;
    }
    int _limit = (position + LIMIT > endIdx || endIdx - position - LIMIT < 1 << 10 ? endIdx - position : LIMIT);
    int index = position + _limit;
    mapBuf.limit(index);
    mapBuf.position(position);
    while (mapBuf.get(--index) != 10 && index > 0);
    mapBuf.limit(++index);
    position += (index - position);
    return mapBuf.slice();
    }
    public boolean empty() {
    return position >= endIdx;
    }
    }

    class SynchRead implements Runnable { private ItemBuff itemBuff;

    public boolean readEnd() {
    return itemBuffQueue.isEmpty();
    }

    public final int read() {
    if (itemBuff == null) {
    itemBuff = itemBuffQueue.poll();
    }
    ByteBuffer buff = itemBuff.getBuff();
    if (buff == null) {
    itemBuff = itemBuffQueue.poll();
    buff = itemBuff.getBuff();
    }
    items.add(buff);
    buffSize.incrementAndGet();
    return buff.capacity();
    }

    public boolean buffFull() {
    return buffSize.get() == MAX_ITEMS_SIZE;
    }

    @Override
    public void run() {
    try {
    while (!readEnd()) {
    synchronized (items) {
    if (buffFull()) {
    Thread.sleep(1000);
    } else {
    read();
    items.notifyAll();
    }
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    }

    class SynchWrite implements Runnable {
    ByteBuffer byteBuf; public boolean buffEmpty() {
    return buffSize.get() == 0;
    }

    public void write() {
    byteBuf = items.poll();
    int index = -1, capacity = byteBuf.capacity();
    byte[] wb = new byte[100];
    try {
    while (capacity-- > 0) {
    wb[++index] = byteBuf.get();
    if (wb[index] == 10) {
    String key = new String(wb, 0, (index == 1 | wb[1] == 44) ? 1 : 2);
    getStreamByName(key).write(wb, 0, index + 1);
    index = -1;
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    buffSize.decrementAndGet();
    fileSize.getAndAdd(0 - byteBuf.capacity());
    }
    }

    @Override
    public void run() {
    try {
    while (fileSize.get() > 0.0) {
    synchronized (items) {
    if (buffEmpty()) {
    items.wait(500);
    } else {
    write();
    }
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    writeThreadCnt.decrementAndGet();
    if (writeThreadCnt.get() == 0) {
    close();
    }
    }
    }
    }

    /**
     * 测试类
     * @param args
     */
    public static void main(String[] args) {
    SplitFile sf = new SplitFile();
    sf.setStartTime(System.currentTimeMillis());
    sf.split();
    }
    }