首先有一个主线程,是做事情的,里面有个全局静态变量Vector list这个主线程的run方法里如下示例:public void run(){while(true){
do something...
}}总之就是遍历这个list,具体如何遍历后面有说明list里放的对象示例如下:class Block{
String userID;
String content;
..........
}现在要求把list里的Block对象按照userID分组,相同的userID的对象都放到一个queue里去,这样子假设总共有3个user,那就是3个queue,每个queu里很多个Block在主线程里的run方法里,实际上已经是要求分组的queue已经完成了,然后具体对每个queue的对象进行操作 其他的多个线程可以对list插入Block对象,现在问题是,如何用一种比较好的方法,实现:1.其他线程不停地往list里添加对象
2.list里的对象要分组
3.主线程对list里的每个queue进行遍历注意1.2.3都不是顺序的,都可能是并行的
顺序的话是很简单的。。
do something...
}}总之就是遍历这个list,具体如何遍历后面有说明list里放的对象示例如下:class Block{
String userID;
String content;
..........
}现在要求把list里的Block对象按照userID分组,相同的userID的对象都放到一个queue里去,这样子假设总共有3个user,那就是3个queue,每个queu里很多个Block在主线程里的run方法里,实际上已经是要求分组的queue已经完成了,然后具体对每个queue的对象进行操作 其他的多个线程可以对list插入Block对象,现在问题是,如何用一种比较好的方法,实现:1.其他线程不停地往list里添加对象
2.list里的对象要分组
3.主线程对list里的每个queue进行遍历注意1.2.3都不是顺序的,都可能是并行的
顺序的话是很简单的。。
多个线程分别往list添加,一个或多个线程负责分配,主线程负责遍历分配好的queue
for example
添加
public void run() {
while (true) {
create_or_get_block;
list.add(some_block);
yield();
}
}分配
public void run() {
while (true) {
Block b = null;
synchronized (list) {
if (list.size() > 0) {
block = list.remove(0);
}
}
if (b != null) {
Queue q = null;
synchronized (map) { //如果是同步安全的map,这里不需要锁
q = map.get(b.getUserId());
if (q == null) {q = new SomeQueue();}
}
synchronized (q) { //如果是同步安全的queue,这里不需要锁
q.add(b);
}
}
}
}主线程遍历
String[] queueId = {userId1, userId2, userId3};
for count = 0;
while (true) {
Queue q = null;
Block b = null;
synchronized (map) {
q = map.get(queueId[count]);
}
if (q != null) {
synchronized(q) {
b = q.poll();
}
}
if (b != null) {
//do something
}
count = (count+1)%queueId.length;
}
如果用map的话,分配这里倒是简单,但是主线程的遍历是要求能用list的get(index)这种方法的,用map的遍历不行,如果用map的values()倒是可以转成list,不过这里一方面map在不断添加,一方面如果这样转的话可能有问题?。。
synchronized (map) {
q = map.get(b.getUserId());
if (q == null) {q = new SomeQueue();}
map.put(b.getUserId, q); //在这里会发生map的values的改变,
//但是这里的map被同步的,所以不可能两个线程同时put
}主程序
while (true) {
Block b = null;
for (Queue q : new ArrayList(map.values())) { //这里拿到一个list后就和map没关系了
//就算map的values发生了改变,那也是下一次while循环后能反映
//所以如果有影响,最多不过是第一次while的时候,有可能queue增加了,
//但是没被遍历到,不过下一次while循环,就能遍历到了
synchronized(q) {
b = q.poll();
}
if (b != null) {
//do something
}
}
}