有一个读取数据库的线程(product),一次读取一定数量数据放到内存列表里面,然后这个线程等待 ;有多个消费线程,来将这些数据同步到别的服务器上面去,然后更新数据库状态,当内存内的数据消化完了,唤醒product线程,product线程再去读同样数量的数据, 一直这样重复。下面是伪代码。
productpublic class Product implements Runnable { private volatile boolean running = true;
/**
* 用于每次读取出来的数据的消耗计数
*/
public static CountDownLatch countDownLatch = null; public static final BlockingQueue<T> blockList = new LinkedBlockingQueue<T>(); @Override
public void run() { while (running) {
try {
List<T> list = //数据库读取数据,状态0的数据 , 100条
if (list.size() == 0) {
//没有数据等待20秒
TimeUnit.SECONDS.sleep(20);
} else {
countDownLatch = new CountDownLatch(list.size());
blockList.addAll(list);
countDownLatch.await();
} } catch (InterruptedException e) {
// 不考虑被打断的情况
}
}
}
consume:public class Consume implements Runnable { private boolean running = true;
@Override
public void run() {
while (running) {
try {
T t= Product.blockList.take();
if (running) {
//更新数据库t数据的状态,更新为1
}
Product.countDownLatch.countDown();
} catch (InterruptedException e) {// 忽略 }
}
}
}就剩下91分啦,全上了,大家看看有啥问题不。
productpublic class Product implements Runnable { private volatile boolean running = true;
/**
* 用于每次读取出来的数据的消耗计数
*/
public static CountDownLatch countDownLatch = null; public static final BlockingQueue<T> blockList = new LinkedBlockingQueue<T>(); @Override
public void run() { while (running) {
try {
List<T> list = //数据库读取数据,状态0的数据 , 100条
if (list.size() == 0) {
//没有数据等待20秒
TimeUnit.SECONDS.sleep(20);
} else {
countDownLatch = new CountDownLatch(list.size());
blockList.addAll(list);
countDownLatch.await();
} } catch (InterruptedException e) {
// 不考虑被打断的情况
}
}
}
consume:public class Consume implements Runnable { private boolean running = true;
@Override
public void run() {
while (running) {
try {
T t= Product.blockList.take();
if (running) {
//更新数据库t数据的状态,更新为1
}
Product.countDownLatch.countDown();
} catch (InterruptedException e) {// 忽略 }
}
}
}就剩下91分啦,全上了,大家看看有啥问题不。
一个简单的coutdown的应用吧,
我没实际中用过这个,
等大神来点评
/**
*
*/
package com.zuxiang.countdown;import java.util.concurrent.CountDownLatch;/**
* @author zuxiang
*
*/
public class Product implements Runnable {
private CountDownLatch downLatch;
public Product(CountDownLatch downLatch){
this.downLatch = downLatch;
}
public void run() {
System.out.println("正等待数据消费完......");
try {
this.downLatch.await();
} catch (InterruptedException e) {
}
System.out.println("所有数据都消费完了,再到数据库查询数据 ....!");
}
}
/**
*
*/
package com.zuxiang.countdown;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/**
* @author zuxiang
*
*/
public class ProductToConsumptionTest { public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Dao dao = new Dao();
List<String> datas = dao.getData();
if (datas != null && !datas.isEmpty()){
CountDownLatch latch = new CountDownLatch(datas.size());
List<Consumption> consumptions = new ArrayList<Consumption>();
Consumption consumption;
Product product = new Product(latch);
for (String str : datas){
consumption = new Consumption(latch,str);
consumptions.add(consumption);
}
product = new Product(latch);
for (Consumption c : consumptions){
executor.execute(c);
}
executor.execute(product);
executor.shutdown();
}
}
}/**
*
*/
package com.zuxiang.countdown;import java.util.ArrayList;
import java.util.List;/**
* @author zuxiang
*
*/
public class Dao { /**
* 查询数据
* @return
*/
public List<String> getData(){
List<String> list = new ArrayList<String>();
for (int i = 0; i < 10 ; i++)
{
list.add("data---" + i);
}
System.out.println("数据库查询数据 完成!");
return list;
}
}
/**
*
*/
package com.zuxiang.countdown;import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/**
* @author zuxiang
*
*/
public class Consumption implements Runnable{
private CountDownLatch downLatch;
private String name;
public Consumption(CountDownLatch downLatch, String name){
this.downLatch = downLatch;
this.name = name;
}
public void run() {
this.doConsumption();
try{
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
}catch(InterruptedException ie){
}
System.out.println( this.name + "数据同步到服务器完毕!");
this.downLatch.countDown();
}
private void doConsumption(){
System.out.println(this.name + "正在同步数据到服务器.....");
}
}
Java的队列支持
wait();等待
notify();唤醒的
所以你的程序里其实缺少的是线程同步锁。