修改了下代码,  可是还是不行:
package ringBuffer;import java.io.BufferedWriter;
import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
MemConsume mc = new MemConsume() ;
Consumer cr = mc.new Consumer() ;
Producer pr = mc.new Producer(cr) ;
pr.start() ;
cr.start() ;
} class Producer extends Thread {
Consumer cr = null; public Producer(Consumer consumer) {
cr = consumer;
} public void run() {
for (int j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 1000000; i++) {
sb.append(j+i.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
}
} class Consumer extends Thread {
int bufferSize = 20 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer left =0  ;
String outputFile = "F:\\test\\ringBuf.txt";
        private int totalLength = 0;

public Consumer(){
   left = bufferSize;
}

public void run() {
while (!Thread.interrupted()) {
try {
synchronized (buffer) {
if (pos > bufferSize * 0.7) { // 大于70%则溢写
BufferedWriter des = new BufferedWriter(new FileWriter(
outputFile));
totalLength = totalLength + pos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
des.write(new String(buffer));  // 
System.out.println("write End Time: " + System.currentTimeMillis());
pos = 0;
left = bufferSize ;
buffer.notifyAll() ;  //清空buffer后通知等待的线程
}
buffer.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
while ((pos + buf.length)> bufferSize ) {  
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf, 0, buffer, pos, buf.length);
pos = pos + buf.length ;
left = left - buf.length ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

解决方案 »

  1.   

    又一次改进代码,可是还是不行:
    package ringBuffer;import java.io.BufferedWriter;
    import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
    MemConsume mc = new MemConsume() ;
    Consumer cr = mc.new Consumer() ;
    Producer pr = mc.new Producer(cr) ;
    pr.start() ;
    cr.start() ;
    } class Producer extends Thread {
    Consumer cr = null; public Producer(Consumer consumer) {
    cr = consumer;
    } public void run() {
    for (int j = 0; j < 5; j++) {
    StringBuffer sb = new StringBuffer();
    for (Integer i = 0; i < 1000000; i++) {
    sb.append(j+i.toString() + "V");
    }
    sb.append("S");
    byte[] msgs = sb.toString().getBytes();
    // System.out.println(msgs.length);
    cr.addToMsgBuf(msgs);
    }
    }
    } class Consumer extends Thread {
    int bufferSize = 20 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
            private int totalLength = 0;

    public Consumer(){
    }

    public void run() {
    while (!Thread.interrupted()) {
    try {
    synchronized (buffer) {
    if (pos > bufferSize ) { 
    BufferedWriter des = new BufferedWriter(new FileWriter(
    outputFile));
    totalLength = totalLength + prePos ;
    System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
    des.write(new String(buffer));  // 
    System.out.println("write End Time: " + System.currentTimeMillis());
    prePos = pos = 0;
    buffer.notifyAll() ;  //清空buffer后通知等待的线程
    }
    buffer.wait();
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    } public void addToMsgBuf(byte[] buf) {
    if (buf == null || buf.length == 0)
    return;
    try {
    synchronized (buffer) {
    pos = prePos + buf.length ;
    while (pos > bufferSize ) {  
    System.out.println("Wait Until there is enough space! ");
    buffer.wait();
    }
    System.arraycopy(buf, 0, buffer, prePos, buf.length);
    prePos = pos ;
    buffer.notifyAll() ;
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
      

  2.   

    这是目前改进的代码,可是最后的数据还是无法写入:求大神指点了
    package ringBuffer;import java.io.BufferedWriter;
    import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
    MemConsume mc = new MemConsume() ;
    Consumer cr = mc.new Consumer() ;
    Producer pr = mc.new Producer(cr) ;
    pr.start() ;
    cr.start() ;
    } class Producer extends Thread {
    Consumer cr = null; public Producer(Consumer consumer) {
    cr = consumer;
    } public void run() {
    for (int j = 0; j < 3; j++) {
    StringBuffer sb = new StringBuffer();
    for (Integer i = 0; i < 1000000; i++) {
    sb.append(j+i.toString() + "V");
    }
    sb.append("S");
    byte[] msgs = sb.toString().getBytes();
    // System.out.println(msgs.length);
    cr.addToMsgBuf(msgs);
    }
    }
    } class Consumer extends Thread {
    int bufferSize = 20 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
            private int totalLength = 0;

    public Consumer(){
    }

    public void run() {
    while (!Thread.interrupted()) {
    try {
    synchronized (buffer) {
    if (pos > bufferSize ) { 
    BufferedWriter des = new BufferedWriter(new FileWriter(
    outputFile,true));
    totalLength = totalLength + prePos ;
    System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
    des.write(new String(buffer));   
    System.out.println("write End Time: " + System.currentTimeMillis());
    prePos = pos = 0;
    buffer.notifyAll() ;  //清空buffer后通知等待的线程
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    } public void addToMsgBuf(byte[] buf) {
    if (buf == null || buf.length == 0)
    return;
    try {
    synchronized (buffer) {
    pos = prePos + buf.length ;
    while (pos > bufferSize ) {  
    System.out.println("Wait Until there is enough space! ");
    buffer.wait();
    }
    System.arraycopy(buf, 0, buffer, prePos, buf.length);
    if(pos==0&&pos==prePos){
    pos = prePos + buf.length ;
    }
    prePos = pos ;
    buffer.notifyAll() ;
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
      

  3.   

    楼主参考一下:import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;//----- 增加引入。
     
    public class MemConsume {
     
        public static void main(String[] args) {
            MemConsume mc = new MemConsume() ;
            Consumer cr = mc.new Consumer() ;
            Producer pr = mc.new Producer(cr) ;
            pr.start() ;
            cr.start() ;
        }
     
        class Producer extends Thread {
            Consumer cr = null;
     
            public Producer(Consumer consumer) {
                cr = consumer;
            }
     
            public void run() {
                for (int j = 0; j < 3; j++) {
                    StringBuffer sb = new StringBuffer();
                    for (Integer i = 0; i < 1000000; i++) {
                        sb.append(j+i.toString() + "V");
                    }
                    sb.append("S");
                    byte[] msgs = sb.toString().getBytes();
                    //System.out.println("The " + j + " times msg length is " + msgs.length);//测试
                    cr.addToMsgBuf(msgs);
                }
                cr.interrupt();//------- 循环结束,用中断方式结束线程执行。
            }
        }
     
        class Consumer extends Thread {
            int bufferSize = 20 * 1024 * 1024;
            private byte[] buffer = new byte[bufferSize];
            private Integer pos = 0;
            private Integer prePos = 0;
            //String outputFile = "F:\\test\\ringBuf.txt";
            String outputFile = "d:\\test\\ringBuf.txt";//----- 我电脑没有F盘。
            private int totalLength = 0;
            BufferedWriter des = null;//-------- 定义成成员变量
            
            public Consumer(){
             try {
                 des = new BufferedWriter(new FileWriter(outputFile,true));// ------- 初始化
             }
             catch(IOException ioe){
             ioe.printStackTrace();
             }
            }
             
            public void run() {
                while (!Thread.interrupted()) {
                    try {
                        synchronized (buffer) {
                            if (pos > bufferSize ) { 
                                //BufferedWriter des = new BufferedWriter(new FileWriter(
                                  //      outputFile,true));
                                totalLength = totalLength + prePos ;
                                System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
                                //des.write(new String(buffer));
                                des.append(new String(buffer));//----- 追加写入
                                
                                System.out.println("write End Time: "    + System.currentTimeMillis());
                                prePos = pos = 0;
                                buffer.notifyAll() ;  //清空buffer后通知等待的线程
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                //------ 循环结束,把缓冲区剩余的内容写入文件。
                //------
                try {
                
                    totalLength = totalLength + prePos ;
                    System.out.println("写入剩余内容");
                    des.append(new String(buffer, 0, pos));//------- 不是全缓冲区了。
                    des.flush();//------- 把缓冲区内容都写入文件
                    des.close();//------- 关闭流
                }
                catch(IOException ioe1) {
                 ioe1.printStackTrace();
                }
                //------- end 
            }
     
            public void addToMsgBuf(byte[] buf) {
                if (buf == null || buf.length == 0)
                    return;
                try {
                    synchronized (buffer) {
                        pos = prePos + buf.length ;
                        while (pos > bufferSize ) {  
                            System.out.println("Wait Until there is enough space! ");
                            buffer.wait();
                        }
                        System.arraycopy(buf, 0, buffer, prePos, buf.length);
                        if(pos==0&&pos==prePos){
                            pos = prePos + buf.length ;
                        }
                        prePos = pos ;
                        buffer.notifyAll() ;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
      

  4.   

    自己写啊?厉害!可以去看一下 Disruptor 
      

  5.   

    楼主对synchronized的理解还需要加深。Consumer里面的run方法,你使用了synchronized (buffer),在addToMsgBuf中也同样使用了synchronized (buffer)。这两个同步代码块使用了同一个锁(buffer)其实是互斥的,就说说,run方法里面的synchronized 代码块未执行完成,那么addToMsgBuf方法中的同步代码块其实不能够执行(即处于等待状态)。那么,只需要做简单的修改就可以正常执行了:
    public void addToMsgBuf(byte[] buf) {
                if (buf == null || buf.length == 0)
                    return;
                try {
                    synchronized (buffer) {//不需要进行wait处理,锁是互斥的
                        System.arraycopy(buf, 0, buffer, pos, buf.length);
                        pos = pos + buf.length ;
                        left = left - buf.length ;
                        buffer.notify();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }synchronized (buffer) {//块中的代码未执行完时,不会释放锁,addToMsgBuf只能等待
                            if (pos > bufferSize * 0.7) { // 大于70%则溢写
                                BufferedWriter des = new BufferedWriter(new FileWriter(
                                        outputFile));
                                totalLength = totalLength + pos ;
                                System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
                                des.write(new String(buffer));  // 
                                System.out.println("write End Time: "    + System.currentTimeMillis());
                                pos = 0;
                                left = bufferSize ;
                                continue;
                            }
                            buffer.wait();
                        }
      

  6.   

    1,Customer.run()方法内:写入文件之后close文件,因为你每次循环的时候都在new
    2,Customer.addToMsgBuf()方法里面buffer.notifyAll() ;多余,别的线程没有wait的地方
      

  7.   

    感谢诸位帮助,我改进了下代码,写了两种方式,主要改进了几个地方:
    1. 多余的notify和wait
    2. if改成了while
    3. 为了避免子线程在主线程前结束,使用了join,现在应该没有什么问题了。
    现在还有问题么?package ringBuffer;import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.LinkedHashMap;
    import java.util.Map;public class Producer {
     
    public static void main(String[] args) {
    long start = System.currentTimeMillis() ;
    Producer mc = new Producer() ;
    Consumer cr = mc.new Consumer() ;
    cr.start() ;

    for (Integer j = 0; j < 5; j++) {
    LinkedHashMap<Integer, String> msgs = new LinkedHashMap<Integer, String>() ;
    for (Integer i = 0; i < 100000; i++) {
    msgs.put(i,j.toString()) ;
    }
    cr.addToMsgBuf(msgs);
    }
    try {
    cr.interrupt() ;
    cr.join() ;   //这里主线程会否比子线程先跑完? 加上join
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(" last : "+ ( System.currentTimeMillis() -start));
    }

    class Consumer extends Thread {
    int bufferSize = 2 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
         RandomAccessFile raf;
           
         LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ;    ; public Consumer(){
        try {
         raf = new RandomAccessFile(outputFile, "rw");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void run() {
    try {
       while (!Thread.interrupted()) {
       synchronized (cMsgs){
       while (cMsgs.isEmpty()) {
       cMsgs.wait();
                   }
       StringBuffer sb = new StringBuffer();
       for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
       sb.append(v.getKey()).append(v.getValue()).append("V") ;
       }
       sb.append("S");  
       byte[] buf = sb.toString().getBytes();
       int length = buf.length ;
       pos = prePos + length ;
       int bufPos = 0;
       if (pos > bufferSize && prePos < bufferSize) {   
        bufPos = bufferSize - prePos;
    System.arraycopy(buf, 0, buffer, prePos, bufPos);  
    prePos = pos = 0;
    long fileLength = raf.length();  
    raf.seek(fileLength);  
    raf.write(buffer);  
       }else 
        System.arraycopy(buf, 0, buffer, prePos, length);  
       if(bufPos==0){   
       prePos = pos;
       }else{    
       System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); 
       prePos = prePos +  length-bufPos ;
       pos =prePos ;
       }
       cMsgs.clear() ;
       }
       }
       long fileLength = raf.length();  
       raf.seek(fileLength);  
       raf.write(buffer, 0, prePos);  
       raf.close();
        }catch (IOException e) {
    e.printStackTrace();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }  
    }

    public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
     synchronized (cMsgs) {
      cMsgs.putAll(currentStepMsgs);
      cMsgs.notify() ;
            }
    }
    }
    }package ringBuffer;import java.io.IOException;
    import java.io.RandomAccessFile;public class Producer2 {
     
    public static void main(String[] args) {
    long start = System.currentTimeMillis() ;
    Producer2 pr = new Producer2() ;
    Consumer cr = pr.new Consumer() ;
    cr.start() ;
    for (Integer j = 0; j < 5; j++) {
    StringBuffer sb = new StringBuffer();
    for (Integer i = 0; i < 100000; i++) {
    sb.append(i.toString()+ j.toString() + "V");
    }
    sb.append("S");
    byte[] msgs = sb.toString().getBytes();
    // System.out.println(msgs.length);
    cr.addToMsgBuf(msgs);
    }

    try {
    cr.interrupt() ; 
    cr.join() ;  //这里主线程会否比子线程先跑完? 加上join
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(" last : "+ ( System.currentTimeMillis() -start));
    }

    class Consumer extends Thread {
    int bufferSize = 2 * 1024 * 1024;
    private byte[] buffer = new byte[bufferSize];
    private Integer pos = 0;
    private Integer prePos = 0;
    String outputFile = "F:\\test\\ringBuf.txt";
            private int totalLength = 0;
         RandomAccessFile raf; public Consumer(){
        try {
         raf = new RandomAccessFile(outputFile, "rw");
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public void run() {
    try {
    while (!Thread.interrupted()) {
    synchronized (buffer) {
    while (pos > bufferSize ) { 
    totalLength = totalLength + prePos ;
         long fileLength = raf.length();     
         raf.seek(fileLength);     
         raf.write(buffer) ;  
    prePos = prePos % bufferSize ;
    pos = prePos;  
    buffer.notifyAll() ;   
    }
    }
    }
    totalLength = totalLength + prePos ;
    long fileLength = raf.length();    
         raf.seek(fileLength);    
         raf.write(buffer, 0, prePos) ;  
    raf.close() ;
    } catch (IOException e) {
    e.printStackTrace();
    }
    } public void addToMsgBuf(byte[] buf) {
    if (buf == null || buf.length == 0)
    return;
    try {
    synchronized (buffer) {
    pos = prePos + buf.length ;
    int bufPos = 0 ;
    while (pos > bufferSize && prePos<bufferSize) {  
    bufPos = bufferSize-prePos ;
    System.arraycopy(buf, 0, buffer, prePos, bufPos);  
    prePos = bufferSize;
    System.out.println("Wait Until there is enough space! ");
    buffer.wait();
    }
    System.arraycopy(buf,bufPos, buffer, prePos, buf.length - bufPos);
    if(pos==0&&pos==prePos){
    pos = prePos + buf.length ;
    }
    prePos = pos ;
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }