多线程下环形缓冲区的时间开销和空间开销差异很大,想不通,求大牛指点原因 本帖最后由 cloudeagle_bupt 于 2015-02-16 12:53:30 编辑 解决方案 » 免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货 为了节省内存空间,我把MsgWriter修改如下:package ringBuffer;import java.io.IOException;import java.io.RandomAccessFile;import java.util.LinkedHashMap;import java.util.Map;public class MsgWriter { /** * @param args */ public static void main(String[] args) { long start = System.currentTimeMillis() ; MsgWriter mc = new MsgWriter() ; Consumer cr = mc.new Consumer() ; cr.start() ; for (Integer j = 0; j < 5; j++) { LinkedHashMap<Integer, String> msgs =cr.cMsgs ; //节省空间开销 for (Integer i = 0; i < 100000; i++) { msgs.put(i,j.toString()) ; } cr.addToMsgBuf(msgs); } cr.interrupt() ; System.out.println(" last : "+ ( System.currentTimeMillis() -start)); } class Consumer extends Thread { int bufferSize = 2 * 1024 * 1024; private byte[] buffer = new byte[bufferSize]; private Integer pos = 0; private Integer prePos = 0; String outputFile = "F:\\test\\ringBuf.txt"; private int totalLength = 0; RandomAccessFile raf; public LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ; ; public Consumer(){ try { raf = new RandomAccessFile(outputFile, "rw"); } catch (IOException e) { e.printStackTrace(); } } public void run() { try { while (!Thread.interrupted()) { synchronized (cMsgs){ while (cMsgs.isEmpty()) { cMsgs.wait(); } StringBuffer sb = new StringBuffer(); for (Map.Entry<Integer, String> v : cMsgs.entrySet()) { sb.append(v.getKey()).append(v.getValue()).append("V") ; } sb.append("S"); byte[] buf = sb.toString().getBytes(); int length = buf.length ; pos = prePos + length ; int bufPos = 0; if (pos > bufferSize && prePos < bufferSize) { bufPos = bufferSize - prePos; System.arraycopy(buf, 0, buffer, prePos, bufPos); // 写满剩余的buf prePos = pos = 0; long fileLength = raf.length(); raf.seek(fileLength); raf.write(buffer); }else System.arraycopy(buf, 0, buffer, prePos, length); // 写满剩余的buf if(bufPos==0){ //说明整段buf被复制 prePos = pos; }else{ //此时 缓冲区应清空,存入后续buf System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); // 写满剩余的buf prePos = prePos + length-bufPos ; pos =prePos ; } cMsgs.clear() ; } } long fileLength = raf.length(); // 文件长度,字节数 raf.seek(fileLength); // 将写文件指针移到文件尾 long start = System.currentTimeMillis() ; raf.write(buffer, 0, prePos); //溢写剩余数据 raf.close(); }catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) { synchronized (cMsgs) { cMsgs.putAll(currentStepMsgs); cMsgs.notifyAll(); } } }}如上面红色部分,但是会报java.util.ConcurrentModificationException, 可是我有写同步代码啊,还是不知道咋回事,求指点了 http://blog.csdn.net/it_man/article/details/8225477 又修改了下代码,性能测试结果仍然如此,package ringBuffer;import java.io.IOException;import java.io.RandomAccessFile;import java.util.LinkedHashMap;import java.util.Map;public class Producer { public static void main(String[] args) { long start = System.currentTimeMillis() ; Producer mc = new Producer() ; Consumer cr = mc.new Consumer() ; cr.start() ; for (Integer j = 0; j < 5; j++) { LinkedHashMap<Integer, String> msgs = new LinkedHashMap<Integer, String>() ; for (Integer i = 0; i < 100000; i++) { msgs.put(i,j.toString()) ; } cr.addToMsgBuf(msgs); } try { cr.interrupt() ; cr.join() ; //这里主线程会否比子线程先跑完? 加上join } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" last : "+ ( System.currentTimeMillis() -start)); } class Consumer extends Thread { int bufferSize = 2 * 1024 * 1024; private byte[] buffer = new byte[bufferSize]; private Integer pos = 0; private Integer prePos = 0; String outputFile = "F:\\test\\ringBuf.txt"; RandomAccessFile raf; LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ; ; public Consumer(){ try { raf = new RandomAccessFile(outputFile, "rw"); } catch (IOException e) { e.printStackTrace(); } } public void run() { try { while (!Thread.interrupted()) { synchronized (cMsgs){ while (cMsgs.isEmpty()) { cMsgs.wait(); } StringBuffer sb = new StringBuffer(); for (Map.Entry<Integer, String> v : cMsgs.entrySet()) { sb.append(v.getKey()).append(v.getValue()).append("V") ; } sb.append("S"); byte[] buf = sb.toString().getBytes(); int length = buf.length ; pos = prePos + length ; int bufPos = 0; if (pos > bufferSize && prePos < bufferSize) { bufPos = bufferSize - prePos; System.arraycopy(buf, 0, buffer, prePos, bufPos); prePos = pos = 0; long fileLength = raf.length(); raf.seek(fileLength); raf.write(buffer); }else System.arraycopy(buf, 0, buffer, prePos, length); if(bufPos==0){ prePos = pos; }else{ System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); prePos = prePos + length-bufPos ; pos =prePos ; } cMsgs.clear() ; } } long fileLength = raf.length(); raf.seek(fileLength); raf.write(buffer, 0, prePos); raf.close(); }catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) { synchronized (cMsgs) { cMsgs.putAll(currentStepMsgs); cMsgs.notify() ; } } }}package ringBuffer;import java.io.IOException;import java.io.RandomAccessFile;public class Producer2 { public static void main(String[] args) { long start = System.currentTimeMillis() ; Producer2 pr = new Producer2() ; Consumer cr = pr.new Consumer() ; cr.start() ; for (Integer j = 0; j < 5; j++) { StringBuffer sb = new StringBuffer(); for (Integer i = 0; i < 100000; i++) { sb.append(i.toString()+ j.toString() + "V"); } sb.append("S"); byte[] msgs = sb.toString().getBytes();// System.out.println(msgs.length); cr.addToMsgBuf(msgs); } try { cr.interrupt() ; cr.join() ; //这里主线程会否比子线程先跑完? 加上join } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" last : "+ ( System.currentTimeMillis() -start)); } class Consumer extends Thread { int bufferSize = 2 * 1024 * 1024; private byte[] buffer = new byte[bufferSize]; private Integer pos = 0; private Integer prePos = 0; String outputFile = "F:\\test\\ringBuf.txt"; private int totalLength = 0; RandomAccessFile raf; public Consumer(){ try { raf = new RandomAccessFile(outputFile, "rw"); } catch (IOException e) { e.printStackTrace(); } } public void run() { try { while (!Thread.interrupted()) { synchronized (buffer) { while (pos > bufferSize ) { totalLength = totalLength + prePos ; long fileLength = raf.length(); raf.seek(fileLength); raf.write(buffer) ; prePos = prePos % bufferSize ; pos = prePos; buffer.notifyAll() ; } } } totalLength = totalLength + prePos ; long fileLength = raf.length(); raf.seek(fileLength); raf.write(buffer, 0, prePos) ; raf.close() ; } catch (IOException e) { e.printStackTrace(); } } public void addToMsgBuf(byte[] buf) { if (buf == null || buf.length == 0) return; try { synchronized (buffer) { pos = prePos + buf.length ; int bufPos = 0 ; while (pos > bufferSize && prePos<bufferSize) { bufPos = bufferSize-prePos ; System.arraycopy(buf, 0, buffer, prePos, bufPos); prePos = bufferSize; System.out.println("Wait Until there is enough space! "); buffer.wait(); } System.arraycopy(buf,bufPos, buffer, prePos, buf.length - bufPos); if(pos==0&&pos==prePos){ pos = prePos + buf.length ; } prePos = pos ; } } catch (InterruptedException e) { e.printStackTrace(); } } }} 大家有啥好的技术论坛推荐下? csdn上面回复太慢了! 理论上说,Producer 把这个步骤:StringBuffer sb = new StringBuffer(); for (Map.Entry<Integer, String> v : cMsgs.entrySet()) { sb.append(v.getKey()).append(v.getValue()).append("V") ; } sb.append("S"); byte[] buf = sb.toString().getBytes();放到Consumer线程中,并行度提高了,应该更快才对,但是明显结果是Producer 慢,且内存消耗大(这个很容易理解),但是性能上差异很大,是内存的原因么?Producer2无论是时间还是空间都比Producer 好,为啥呢? 求大牛指点下 关于Exception in thread "Timer-0" apache mina2 如何发送和接受 字节数组 byte[] JAVA结构问题 本来准备每天散200的,信誉分恢复,散最后100分,散分到此结束! 有关java的两个小问题 那里与jbuilder7免费学习教程下载 又来给分了! 如何解决!JBuider光标定位不准确的问题啊! 作为一个java初学者,有一个问题,请指教! 非常急,安装sqlserver7.0的机子改名后 openfire安装与配置 java制作的MP3播放器,如何实现时间轴功能
package ringBuffer;import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedHashMap;
import java.util.Map;public class MsgWriter { /**
* @param args
*/
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
MsgWriter mc = new MsgWriter() ;
Consumer cr = mc.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
LinkedHashMap<Integer, String> msgs =cr.cMsgs ; //节省空间开销
for (Integer i = 0; i < 100000; i++) {
msgs.put(i,j.toString()) ;
}
cr.addToMsgBuf(msgs);
}
cr.interrupt() ;
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
RandomAccessFile raf;
public LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ; ; public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (cMsgs){
while (cMsgs.isEmpty()) {
cMsgs.wait();
}
StringBuffer sb = new StringBuffer();
for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
sb.append(v.getKey()).append(v.getValue()).append("V") ;
}
sb.append("S");
byte[] buf = sb.toString().getBytes();
int length = buf.length ;
pos = prePos + length ;
int bufPos = 0;
if (pos > bufferSize && prePos < bufferSize) {
bufPos = bufferSize - prePos;
System.arraycopy(buf, 0, buffer, prePos, bufPos); // 写满剩余的buf
prePos = pos = 0;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer);
}else
System.arraycopy(buf, 0, buffer, prePos, length); // 写满剩余的buf
if(bufPos==0){ //说明整段buf被复制
prePos = pos;
}else{ //此时 缓冲区应清空,存入后续buf
System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); // 写满剩余的buf
prePos = prePos + length-bufPos ;
pos =prePos ;
}
cMsgs.clear() ;
}
}
long fileLength = raf.length(); // 文件长度,字节数
raf.seek(fileLength); // 将写文件指针移到文件尾
long start = System.currentTimeMillis() ;
raf.write(buffer, 0, prePos); //溢写剩余数据
raf.close();
}catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
synchronized (cMsgs) {
cMsgs.putAll(currentStepMsgs);
cMsgs.notifyAll();
}
}
}
}如上面红色部分,但是会报java.util.ConcurrentModificationException, 可是我有写同步代码啊,还是不知道咋回事,求指点了
import java.io.RandomAccessFile;
import java.util.LinkedHashMap;
import java.util.Map;public class Producer {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
Producer mc = new Producer() ;
Consumer cr = mc.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
LinkedHashMap<Integer, String> msgs = new LinkedHashMap<Integer, String>() ;
for (Integer i = 0; i < 100000; i++) {
msgs.put(i,j.toString()) ;
}
cr.addToMsgBuf(msgs);
}
try {
cr.interrupt() ;
cr.join() ; //这里主线程会否比子线程先跑完? 加上join
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
RandomAccessFile raf;
LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ; ; public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (cMsgs){
while (cMsgs.isEmpty()) {
cMsgs.wait();
}
StringBuffer sb = new StringBuffer();
for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
sb.append(v.getKey()).append(v.getValue()).append("V") ;
}
sb.append("S");
byte[] buf = sb.toString().getBytes();
int length = buf.length ;
pos = prePos + length ;
int bufPos = 0;
if (pos > bufferSize && prePos < bufferSize) {
bufPos = bufferSize - prePos;
System.arraycopy(buf, 0, buffer, prePos, bufPos);
prePos = pos = 0;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer);
}else
System.arraycopy(buf, 0, buffer, prePos, length);
if(bufPos==0){
prePos = pos;
}else{
System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos);
prePos = prePos + length-bufPos ;
pos =prePos ;
}
cMsgs.clear() ;
}
}
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer, 0, prePos);
raf.close();
}catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
synchronized (cMsgs) {
cMsgs.putAll(currentStepMsgs);
cMsgs.notify() ;
}
}
}
}
package ringBuffer;import java.io.IOException;
import java.io.RandomAccessFile;public class Producer2 {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
Producer2 pr = new Producer2() ;
Consumer cr = pr.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 100000; i++) {
sb.append(i.toString()+ j.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
// System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
try {
cr.interrupt() ;
cr.join() ; //这里主线程会否比子线程先跑完? 加上join
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
RandomAccessFile raf; public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (buffer) {
while (pos > bufferSize ) {
totalLength = totalLength + prePos ;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer) ;
prePos = prePos % bufferSize ;
pos = prePos;
buffer.notifyAll() ;
}
}
}
totalLength = totalLength + prePos ;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer, 0, prePos) ;
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
} public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
pos = prePos + buf.length ;
int bufPos = 0 ;
while (pos > bufferSize && prePos<bufferSize) {
bufPos = bufferSize-prePos ;
System.arraycopy(buf, 0, buffer, prePos, bufPos);
prePos = bufferSize;
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf,bufPos, buffer, prePos, buf.length - bufPos);
if(pos==0&&pos==prePos){
pos = prePos + buf.length ;
}
prePos = pos ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
StringBuffer sb = new StringBuffer();
for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
sb.append(v.getKey()).append(v.getValue()).append("V") ;
}
sb.append("S");
byte[] buf = sb.toString().getBytes();放到Consumer线程中,并行度提高了,应该更快才对,但是明显结果是Producer 慢,且内存消耗大(这个很容易理解),但是性能上差异很大,是内存的原因么?Producer2无论是时间还是空间都比Producer 好,为啥呢? 求大牛指点下