需求是要匀速输出某list的对象,比如每秒50个,或者20个我目前想到的方法:某个Thread里做个死循环,不停的检查list是否为空,然后用Thread.sleep()停一下比如一下:public class DispatchCenter extends Thread {
static Logger logger = Logger.getLogger(DispatchCenter.class);
public static Vector<String> tmpCache = null;
public static int cacheSize = 0;
public DispatchCenter(){
tmpCache = new Vector<String>();
cacheSize = 20; //每秒取20个
}
public void run(){
while(true){
int counter = 0;
Iterator<String> itr = tmpCache.iterator();
long stime = System.currentTimeMillis();
while(itr.hasNext()){
String txt = itr.next();
logger.info("do something about "+txt);
itr.remove();
counter++;
if(counter == cacheSize)
break;
}
long etime = System.currentTimeMillis();
try {
//System.out.println("sleep "+(1000 - (etime - stime))+"ms");
Thread.sleep(1000 - (etime - stime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}tmpCache 是vector,其他thread可以直接往其插入对象,vector本身又是同步的,理论上也许没问题,但现在一个实际应用里,似乎这方法有点问题,有10个thread在调用 DispatchCenter.tmpCache.add()方法,但这个循环在开始几秒的输出却没有按照每秒几个的输出,而是全部输出了谁有更好的办法做到匀速输出,或者怎么改进这个办法。。
static Logger logger = Logger.getLogger(DispatchCenter.class);
public static Vector<String> tmpCache = null;
public static int cacheSize = 0;
public DispatchCenter(){
tmpCache = new Vector<String>();
cacheSize = 20; //每秒取20个
}
public void run(){
while(true){
int counter = 0;
Iterator<String> itr = tmpCache.iterator();
long stime = System.currentTimeMillis();
while(itr.hasNext()){
String txt = itr.next();
logger.info("do something about "+txt);
itr.remove();
counter++;
if(counter == cacheSize)
break;
}
long etime = System.currentTimeMillis();
try {
//System.out.println("sleep "+(1000 - (etime - stime))+"ms");
Thread.sleep(1000 - (etime - stime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}tmpCache 是vector,其他thread可以直接往其插入对象,vector本身又是同步的,理论上也许没问题,但现在一个实际应用里,似乎这方法有点问题,有10个thread在调用 DispatchCenter.tmpCache.add()方法,但这个循环在开始几秒的输出却没有按照每秒几个的输出,而是全部输出了谁有更好的办法做到匀速输出,或者怎么改进这个办法。。
public class Test {
public static void main(String[] args) {
final Cache cache = new Cache(20);
AddThread[] at = new AddThread[10];
for (int i=0; i<at.length; i++) {
at[i] = new AddThread(cache);
at[i].start();
}
DispatchThread dt = new DispatchThread(cache);
dt.start(); Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
public void run () {
catch.setFlag(false);
}
}, 1000, 1000);
}
}class Cache {
Vector<String> cache = new Vector<String>();
int cacheSize = 0;
boolean flag = true; public Cache(int cacheSize) {
this.cacheSize = cacheSize;
}
public void setFlag (boolean flag) {
this.flag = flag;
} public void add(String value) {
synchornized (cache) {
if (! flag) {
cache.wait();
}
cache.addElement(value);
cache.notifyAll();
}
} public void dispatch() {
synchronized (cache) {
if (flag) {
cache.wait();
}
int counter = 0;
Iterator<String> itr = tmpCache.iterator();
while(itr.hasNext()){
String txt = itr.next();
itr.remove();
counter++;
if(counter == cacheSize)
break;
}
}
flag = true;
catch.notifyAll();
}
}
}class AddThread extends Thread {
Cache cache;
public AddThread (Cache cache) {this.cache = cache;} public void run () {
int counter = 0;
while (true) {
cache.add(getName() + ":" + counter);
counter++
}
}
}class DispatchThread extends Thread {
Cache cache;
public DispatchThread (Cache cache) {this.cache = cache;} public void run () {
while (true) {
catch.dispatch();
}
}
}
没看懂,怎么换?不过上面代码在dispatch方法里的cache.notifyAll();报错还有关系时间控制上我原来代码里 list输出的过程是计算了时间的,之后等待的时间是1秒减去这个时间。。比如list输出用了100ms,那么等待900现在用这个timer的话似乎有可能误差就累计起来了?。。
是什么意思?按你现在的逻辑,如果刚开始几秒,vector里的元素没有达到20,当然是全部都被输出了,如果你想让它满20以后才开始输出,那么可以做个if判断,当size大于等于20时才开始打印。
首先Vertor的同步不代表Iterator的同步,你在迭代的过程中修改Vertor,Iterator去remove时就会出现ConcurrentModificationException;
其次线程调度器是非确定的,在多线程中,Thread.sleep()指定时间后一定会执行当前线程吗?
还有这个匀速输出某list的对象后,这些对象是不保留在list中的,队列是否是更合适的存储结构?java中有个计时器Timer的概念,而线程池中有个延迟后运行或定期执行的预定执行器ScheduledExecutorService,你可以是这个来 匀速输出一个List里对象的方法(示例为了显示输出效果,每2秒执行一次,至于并发对list操作的线程,楼主自己思考):import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class DispatchCenter implements Runnable{ static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();//这是线程安全队列,返回的迭代器是一个“弱一致”迭代器,它不会抛出 ConcurrentModificationException
static ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor();//定时执行器
static int count = 1;
public static void main(String[] args) {
for(int i = 1; i <= 100; i++){
queue.add("Object-"+i);
}
schedule.scheduleAtFixedRate(new DispatchCenter(), 0, 2, TimeUnit.SECONDS);//这里是每隔2秒执行一次输出任务
} @Override
public void run() {
System.out.println("第"+(count++)+"次执行:");
int size = 20;
while((size-- > 0) && (!queue.isEmpty())){
System.out.println(queue.poll());
}
System.out.println("-----------");
if(queue.isEmpty()){
schedule.shutdownNow();
}
}
}
我顶楼的代码应该理论上是每秒20个输出,但实际中在第一秒里输出了100多个。。if(counter == cacheSize)这就是判断代码啊
原来有个ConcurrentLinkedQueue类,学习了另外Iterator去remove不会出现异常,直接用vector.remove才会产生异常。。这个都试验过了
线程调度器是只有一个在运行的,所以Thread.sleep()肯定会当前线程,是其他多线程在往list里加东西,这个并发操作应该就按照楼上那个兄弟的思路,在list取东西出来做事的时候不能让其他线程添加吧。。关于timer我也一再强调了,输出list里对象不仅仅是打印这个语句,实际中是调用其他一些方法的,而这些调用需要耗用一定时间,如果输出100个对象,总共花费了200ms,那用timer的这个方法实际上是1.2s里输出了100个,不符合要求,所以还得通过计算输出这个过程的时间
伪代码:
while(true)
{
if(curTime-lastTime>20)//20毫秒一个,每秒50个
{
outObject();
}
sleep(1);
}
if(counter == cacheSize)
break;
import java.util.concurrent.ConcurrentLinkedQueue;import org.apache.log4j.Logger;public class DispatchCenter extends Thread {
static Logger logger = Logger.getLogger(DispatchCenter.class);
public static ConcurrentLinkedQueue<String> tmpCache = null;
public static int cacheSize = 0;
public static boolean addFlag = true;
public DispatchCenter(){
tmpCache = new ConcurrentLinkedQueue<String>();
cacheSize = 100;
}
public static void addElement(String input){
if(addFlag){
try {
synchronized(tmpCache){
tmpCache.wait();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
tmpCache.add(input);
}
public void run(){
while(true){
addFlag = false;
int counter = cacheSize;
long stime = System.currentTimeMillis();
while(counter-- >0 && !tmpCache.isEmpty()){
String txt = tmpCache.poll();
logger.info("do something about "+txt);
}
addFlag = true;
synchronized(tmpCache){
tmpCache.notifyAll();
}
long etime = System.currentTimeMillis();
try {
System.out.println("sleep "+(1000 - (etime - stime))+"ms");
Thread.sleep(1000 - (etime - stime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
} 其他线程调用DispatchCenter.addElement()往list里添加东西即可,并且第1秒内的输出也正常了,感谢以上