本帖最后由 cloudeagle_bupt 于 2015-02-16 12:53:30 编辑

解决方案 »

  1.   

    为了节省内存空间,我把MsgWriter修改如下:
    package ringBuffer;import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.LinkedHashMap;
    import java.util.Map;public class MsgWriter { /**
     * @param args
     */
    public static void main(String[] args) {
    long start = System.currentTimeMillis() ;
    MsgWriter mc = new MsgWriter() ;
    Consumer cr = mc.new Consumer() ;
    cr.start() ;

    for (Integer j = 0; j < 5; j++) {
    LinkedHashMap<Integer, String> msgs =cr.cMsgs ; //节省空间开销 
    for (Integer i = 0; i < 100000; i++) {
    msgs.put(i,j.toString()) ;
    }
    cr.addToMsgBuf(msgs);
    }
    cr.interrupt() ;
    System.out.println(" last : "+ ( System.currentTimeMillis() -start));
    }

    class Consumer extends Thread {
    int bufferSize = 2 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
            private int totalLength = 0;
         RandomAccessFile raf;
           
         public LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ;    ; public Consumer(){
        try {
         raf = new RandomAccessFile(outputFile, "rw");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void run() {
    try {
       while (!Thread.interrupted()) {
       synchronized (cMsgs){
       while (cMsgs.isEmpty()) {
       cMsgs.wait();
                   }
       StringBuffer sb = new StringBuffer();
       for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
       sb.append(v.getKey()).append(v.getValue()).append("V") ;
       }
       sb.append("S");  
       byte[] buf = sb.toString().getBytes();
       int length = buf.length ;
       pos = prePos + length ;
       int bufPos = 0;
       if (pos > bufferSize && prePos < bufferSize) {   
        bufPos = bufferSize - prePos;
    System.arraycopy(buf, 0, buffer, prePos, bufPos); // 写满剩余的buf
    prePos = pos = 0;
    long fileLength = raf.length();  
    raf.seek(fileLength);  
    raf.write(buffer);  
       }else 
        System.arraycopy(buf, 0, buffer, prePos, length); // 写满剩余的buf
       if(bufPos==0){  //说明整段buf被复制
       prePos = pos;
       }else{   //此时 缓冲区应清空,存入后续buf
       System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); // 写满剩余的buf
       prePos = prePos +  length-bufPos ;
       pos =prePos ;
       }
       cMsgs.clear() ;
       }
       }
       long fileLength = raf.length(); // 文件长度,字节数
       raf.seek(fileLength); // 将写文件指针移到文件尾
       long start =  System.currentTimeMillis() ;
       raf.write(buffer, 0, prePos);  //溢写剩余数据
       raf.close();
        }catch (IOException e) {
    e.printStackTrace();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }  
    }

    public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
     synchronized (cMsgs) {
      cMsgs.putAll(currentStepMsgs);
      cMsgs.notifyAll();
            }
    }
    }
    }如上面红色部分,但是会报java.util.ConcurrentModificationException, 可是我有写同步代码啊,还是不知道咋回事,求指点了
      

  2.   

    http://blog.csdn.net/it_man/article/details/8225477
      

  3.   

    又修改了下代码,性能测试结果仍然如此,package ringBuffer;import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.LinkedHashMap;
    import java.util.Map;public class Producer {
     
    public static void main(String[] args) {
    long start = System.currentTimeMillis() ;
    Producer mc = new Producer() ;
    Consumer cr = mc.new Consumer() ;
    cr.start() ;

    for (Integer j = 0; j < 5; j++) {
    LinkedHashMap<Integer, String> msgs = new LinkedHashMap<Integer, String>() ;
    for (Integer i = 0; i < 100000; i++) {
    msgs.put(i,j.toString()) ;
    }
    cr.addToMsgBuf(msgs);
    }
    try {
    cr.interrupt() ;
    cr.join() ;   //这里主线程会否比子线程先跑完? 加上join
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(" last : "+ ( System.currentTimeMillis() -start));
    }

    class Consumer extends Thread {
    int bufferSize = 2 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
         RandomAccessFile raf;
           
         LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ;    ; public Consumer(){
        try {
         raf = new RandomAccessFile(outputFile, "rw");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void run() {
    try {
       while (!Thread.interrupted()) {
       synchronized (cMsgs){
       while (cMsgs.isEmpty()) {
       cMsgs.wait();
                   }
       StringBuffer sb = new StringBuffer();
       for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
       sb.append(v.getKey()).append(v.getValue()).append("V") ;
       }
       sb.append("S");  
       byte[] buf = sb.toString().getBytes();
       int length = buf.length ;
       pos = prePos + length ;
       int bufPos = 0;
       if (pos > bufferSize && prePos < bufferSize) {   
        bufPos = bufferSize - prePos;
    System.arraycopy(buf, 0, buffer, prePos, bufPos);  
    prePos = pos = 0;
    long fileLength = raf.length();  
    raf.seek(fileLength);  
    raf.write(buffer);  
       }else 
        System.arraycopy(buf, 0, buffer, prePos, length);  
       if(bufPos==0){   
       prePos = pos;
       }else{    
       System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); 
       prePos = prePos +  length-bufPos ;
       pos =prePos ;
       }
       cMsgs.clear() ;
       }
       }
       long fileLength = raf.length();  
       raf.seek(fileLength);  
       raf.write(buffer, 0, prePos);  
       raf.close();
        }catch (IOException e) {
    e.printStackTrace();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }  
    }

    public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
     synchronized (cMsgs) {
      cMsgs.putAll(currentStepMsgs);
      cMsgs.notify() ;
            }
    }
    }
    }
    package ringBuffer;import java.io.IOException;
    import java.io.RandomAccessFile;public class Producer2 {
     
    public static void main(String[] args) {
    long start = System.currentTimeMillis() ;
    Producer2 pr = new Producer2() ;
    Consumer cr = pr.new Consumer() ;
    cr.start() ;
    for (Integer j = 0; j < 5; j++) {
    StringBuffer sb = new StringBuffer();
    for (Integer i = 0; i < 100000; i++) {
    sb.append(i.toString()+ j.toString() + "V");
    }
    sb.append("S");
    byte[] msgs = sb.toString().getBytes();
    // System.out.println(msgs.length);
    cr.addToMsgBuf(msgs);
    }

    try {
    cr.interrupt() ; 
    cr.join() ;  //这里主线程会否比子线程先跑完? 加上join
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(" last : "+ ( System.currentTimeMillis() -start));
    }

    class Consumer extends Thread {
    int bufferSize = 2 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
            private int totalLength = 0;
         RandomAccessFile raf; public Consumer(){
        try {
         raf = new RandomAccessFile(outputFile, "rw");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void run() {
    try {
    while (!Thread.interrupted()) {
    synchronized (buffer) {
    while (pos > bufferSize ) { 
    totalLength = totalLength + prePos ;
         long fileLength = raf.length();     
         raf.seek(fileLength);     
         raf.write(buffer) ;  
    prePos = prePos % bufferSize ;
    pos = prePos;  
    buffer.notifyAll() ;   
    }
    }
    }
    totalLength = totalLength + prePos ;
    long fileLength = raf.length();    
         raf.seek(fileLength);    
         raf.write(buffer, 0, prePos) ;  
    raf.close() ;
    } catch (IOException e) {
    e.printStackTrace();
    }
    } public void addToMsgBuf(byte[] buf) {
    if (buf == null || buf.length == 0)
    return;
    try {
    synchronized (buffer) {
    pos = prePos + buf.length ;
    int bufPos = 0 ;
    while (pos > bufferSize && prePos<bufferSize) {  
    bufPos = bufferSize-prePos ;
    System.arraycopy(buf, 0, buffer, prePos, bufPos);  
    prePos = bufferSize;
    System.out.println("Wait Until there is enough space! ");
    buffer.wait();
    }
    System.arraycopy(buf,bufPos, buffer, prePos, buf.length - bufPos);
    if(pos==0&&pos==prePos){
    pos = prePos + buf.length ;
    }
    prePos = pos ;
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
      

  4.   

    大家有啥好的技术论坛推荐下? csdn上面回复太慢了!
      

  5.   

    理论上说,Producer 把这个步骤:
    StringBuffer sb = new StringBuffer();
                               for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
                                   sb.append(v.getKey()).append(v.getValue()).append("V") ;
                               }
                               sb.append("S");  
                               byte[] buf = sb.toString().getBytes();放到Consumer线程中,并行度提高了,应该更快才对,但是明显结果是Producer 慢,且内存消耗大(这个很容易理解),但是性能上差异很大,是内存的原因么?Producer2无论是时间还是空间都比Producer 好,为啥呢?   求大牛指点下