修改了下代码, 可是还是不行:
package ringBuffer;import java.io.BufferedWriter;
import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
MemConsume mc = new MemConsume() ;
Consumer cr = mc.new Consumer() ;
Producer pr = mc.new Producer(cr) ;
pr.start() ;
cr.start() ;
} class Producer extends Thread {
Consumer cr = null; public Producer(Consumer consumer) {
cr = consumer;
} public void run() {
for (int j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 1000000; i++) {
sb.append(j+i.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
}
} class Consumer extends Thread {
int bufferSize = 20 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer left =0 ;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
public Consumer(){
left = bufferSize;
}
public void run() {
while (!Thread.interrupted()) {
try {
synchronized (buffer) {
if (pos > bufferSize * 0.7) { // 大于70%则溢写
BufferedWriter des = new BufferedWriter(new FileWriter(
outputFile));
totalLength = totalLength + pos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
des.write(new String(buffer)); //
System.out.println("write End Time: " + System.currentTimeMillis());
pos = 0;
left = bufferSize ;
buffer.notifyAll() ; //清空buffer后通知等待的线程
}
buffer.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
while ((pos + buf.length)> bufferSize ) {
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf, 0, buffer, pos, buf.length);
pos = pos + buf.length ;
left = left - buf.length ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package ringBuffer;import java.io.BufferedWriter;
import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
MemConsume mc = new MemConsume() ;
Consumer cr = mc.new Consumer() ;
Producer pr = mc.new Producer(cr) ;
pr.start() ;
cr.start() ;
} class Producer extends Thread {
Consumer cr = null; public Producer(Consumer consumer) {
cr = consumer;
} public void run() {
for (int j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 1000000; i++) {
sb.append(j+i.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
}
} class Consumer extends Thread {
int bufferSize = 20 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer left =0 ;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
public Consumer(){
left = bufferSize;
}
public void run() {
while (!Thread.interrupted()) {
try {
synchronized (buffer) {
if (pos > bufferSize * 0.7) { // 大于70%则溢写
BufferedWriter des = new BufferedWriter(new FileWriter(
outputFile));
totalLength = totalLength + pos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
des.write(new String(buffer)); //
System.out.println("write End Time: " + System.currentTimeMillis());
pos = 0;
left = bufferSize ;
buffer.notifyAll() ; //清空buffer后通知等待的线程
}
buffer.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
while ((pos + buf.length)> bufferSize ) {
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf, 0, buffer, pos, buf.length);
pos = pos + buf.length ;
left = left - buf.length ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package ringBuffer;import java.io.BufferedWriter;
import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
MemConsume mc = new MemConsume() ;
Consumer cr = mc.new Consumer() ;
Producer pr = mc.new Producer(cr) ;
pr.start() ;
cr.start() ;
} class Producer extends Thread {
Consumer cr = null; public Producer(Consumer consumer) {
cr = consumer;
} public void run() {
for (int j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 1000000; i++) {
sb.append(j+i.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
}
} class Consumer extends Thread {
int bufferSize = 20 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
public Consumer(){
}
public void run() {
while (!Thread.interrupted()) {
try {
synchronized (buffer) {
if (pos > bufferSize ) {
BufferedWriter des = new BufferedWriter(new FileWriter(
outputFile));
totalLength = totalLength + prePos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
des.write(new String(buffer)); //
System.out.println("write End Time: " + System.currentTimeMillis());
prePos = pos = 0;
buffer.notifyAll() ; //清空buffer后通知等待的线程
}
buffer.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
pos = prePos + buf.length ;
while (pos > bufferSize ) {
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf, 0, buffer, prePos, buf.length);
prePos = pos ;
buffer.notifyAll() ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package ringBuffer;import java.io.BufferedWriter;
import java.io.FileWriter;public class MemConsume { public static void main(String[] args) {
MemConsume mc = new MemConsume() ;
Consumer cr = mc.new Consumer() ;
Producer pr = mc.new Producer(cr) ;
pr.start() ;
cr.start() ;
} class Producer extends Thread {
Consumer cr = null; public Producer(Consumer consumer) {
cr = consumer;
} public void run() {
for (int j = 0; j < 3; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 1000000; i++) {
sb.append(j+i.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
}
} class Consumer extends Thread {
int bufferSize = 20 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
public Consumer(){
}
public void run() {
while (!Thread.interrupted()) {
try {
synchronized (buffer) {
if (pos > bufferSize ) {
BufferedWriter des = new BufferedWriter(new FileWriter(
outputFile,true));
totalLength = totalLength + prePos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
des.write(new String(buffer));
System.out.println("write End Time: " + System.currentTimeMillis());
prePos = pos = 0;
buffer.notifyAll() ; //清空buffer后通知等待的线程
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
pos = prePos + buf.length ;
while (pos > bufferSize ) {
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf, 0, buffer, prePos, buf.length);
if(pos==0&&pos==prePos){
pos = prePos + buf.length ;
}
prePos = pos ;
buffer.notifyAll() ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import java.io.FileWriter;
import java.io.IOException;//----- 增加引入。
public class MemConsume {
public static void main(String[] args) {
MemConsume mc = new MemConsume() ;
Consumer cr = mc.new Consumer() ;
Producer pr = mc.new Producer(cr) ;
pr.start() ;
cr.start() ;
}
class Producer extends Thread {
Consumer cr = null;
public Producer(Consumer consumer) {
cr = consumer;
}
public void run() {
for (int j = 0; j < 3; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 1000000; i++) {
sb.append(j+i.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
//System.out.println("The " + j + " times msg length is " + msgs.length);//测试
cr.addToMsgBuf(msgs);
}
cr.interrupt();//------- 循环结束,用中断方式结束线程执行。
}
}
class Consumer extends Thread {
int bufferSize = 20 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
//String outputFile = "F:\\test\\ringBuf.txt";
String outputFile = "d:\\test\\ringBuf.txt";//----- 我电脑没有F盘。
private int totalLength = 0;
BufferedWriter des = null;//-------- 定义成成员变量
public Consumer(){
try {
des = new BufferedWriter(new FileWriter(outputFile,true));// ------- 初始化
}
catch(IOException ioe){
ioe.printStackTrace();
}
}
public void run() {
while (!Thread.interrupted()) {
try {
synchronized (buffer) {
if (pos > bufferSize ) {
//BufferedWriter des = new BufferedWriter(new FileWriter(
// outputFile,true));
totalLength = totalLength + prePos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
//des.write(new String(buffer));
des.append(new String(buffer));//----- 追加写入
System.out.println("write End Time: " + System.currentTimeMillis());
prePos = pos = 0;
buffer.notifyAll() ; //清空buffer后通知等待的线程
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//------ 循环结束,把缓冲区剩余的内容写入文件。
//------
try {
totalLength = totalLength + prePos ;
System.out.println("写入剩余内容");
des.append(new String(buffer, 0, pos));//------- 不是全缓冲区了。
des.flush();//------- 把缓冲区内容都写入文件
des.close();//------- 关闭流
}
catch(IOException ioe1) {
ioe1.printStackTrace();
}
//------- end
}
public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
pos = prePos + buf.length ;
while (pos > bufferSize ) {
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf, 0, buffer, prePos, buf.length);
if(pos==0&&pos==prePos){
pos = prePos + buf.length ;
}
prePos = pos ;
buffer.notifyAll() ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {//不需要进行wait处理,锁是互斥的
System.arraycopy(buf, 0, buffer, pos, buf.length);
pos = pos + buf.length ;
left = left - buf.length ;
buffer.notify();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}synchronized (buffer) {//块中的代码未执行完时,不会释放锁,addToMsgBuf只能等待
if (pos > bufferSize * 0.7) { // 大于70%则溢写
BufferedWriter des = new BufferedWriter(new FileWriter(
outputFile));
totalLength = totalLength + pos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
des.write(new String(buffer)); //
System.out.println("write End Time: " + System.currentTimeMillis());
pos = 0;
left = bufferSize ;
continue;
}
buffer.wait();
}
2,Customer.addToMsgBuf()方法里面buffer.notifyAll() ;多余,别的线程没有wait的地方
1. 多余的notify和wait
2. if改成了while
3. 为了避免子线程在主线程前结束,使用了join,现在应该没有什么问题了。
现在还有问题么?package ringBuffer;import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedHashMap;
import java.util.Map;public class Producer {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
Producer mc = new Producer() ;
Consumer cr = mc.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
LinkedHashMap<Integer, String> msgs = new LinkedHashMap<Integer, String>() ;
for (Integer i = 0; i < 100000; i++) {
msgs.put(i,j.toString()) ;
}
cr.addToMsgBuf(msgs);
}
try {
cr.interrupt() ;
cr.join() ; //这里主线程会否比子线程先跑完? 加上join
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
RandomAccessFile raf;
LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ; ; public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (cMsgs){
while (cMsgs.isEmpty()) {
cMsgs.wait();
}
StringBuffer sb = new StringBuffer();
for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
sb.append(v.getKey()).append(v.getValue()).append("V") ;
}
sb.append("S");
byte[] buf = sb.toString().getBytes();
int length = buf.length ;
pos = prePos + length ;
int bufPos = 0;
if (pos > bufferSize && prePos < bufferSize) {
bufPos = bufferSize - prePos;
System.arraycopy(buf, 0, buffer, prePos, bufPos);
prePos = pos = 0;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer);
}else
System.arraycopy(buf, 0, buffer, prePos, length);
if(bufPos==0){
prePos = pos;
}else{
System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos);
prePos = prePos + length-bufPos ;
pos =prePos ;
}
cMsgs.clear() ;
}
}
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer, 0, prePos);
raf.close();
}catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
synchronized (cMsgs) {
cMsgs.putAll(currentStepMsgs);
cMsgs.notify() ;
}
}
}
}package ringBuffer;import java.io.IOException;
import java.io.RandomAccessFile;public class Producer2 {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
Producer2 pr = new Producer2() ;
Consumer cr = pr.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 100000; i++) {
sb.append(i.toString()+ j.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
try {
cr.interrupt() ;
cr.join() ; //这里主线程会否比子线程先跑完? 加上join
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
RandomAccessFile raf; public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (buffer) {
while (pos > bufferSize ) {
totalLength = totalLength + prePos ;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer) ;
prePos = prePos % bufferSize ;
pos = prePos;
buffer.notifyAll() ;
}
}
}
totalLength = totalLength + prePos ;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer, 0, prePos) ;
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
pos = prePos + buf.length ;
int bufPos = 0 ;
while (pos > bufferSize && prePos<bufferSize) {
bufPos = bufferSize-prePos ;
System.arraycopy(buf, 0, buffer, prePos, bufPos);
prePos = bufferSize;
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf,bufPos, buffer, prePos, buf.length - bufPos);
if(pos==0&&pos==prePos){
pos = prePos + buf.length ;
}
prePos = pos ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}