现在公司项目中碰到这样一个需求,需要设计一个类似生产者-消费者的模块,负责数据的分发。
1、入口只有一个:可能是一条一条写入,也可能是一批一批写入。目前设计了一个List暂存这些数据。
2、数据量很大,所以目前我们限制了List的大小,当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。
3、消费者有多个,并且每个消费者都需要消费队列中List中的所有数据。每个消费者是一个线程。
4、当多个消费者线程都消费掉队列中的一个暂存list后,该暂存的list会从队列中清理。比我们平常讨论的生产者-消费者程序复杂很多。想了很久,没想出思路,请教各位大牛,在线急等。

解决方案 »

  1.   

    LinkedBlockingQueue看下这个类的API
      

  2.   

    同1楼,也觉得BlockingQueue可以楼主,如果是考虑到内存问题才使用暂存List的话,不是等于没解决问题么?
      

  3.   

    我也有类似的场景,我准备使用BlockingQueue
      

  4.   

    楼主的有一点描述看不懂,为什么要“当达到一定容量时,会把当前的暂存List临时存到一个消费队列中,重新new一个新的暂存list继续接受数据。”?
    数据你是可以放到多个List中,但是都是在使用内存。使用一个还是多个List,都耗费几乎同样多的内存。
    也就是说你这样设计数据Keeper是画蛇添足,完全没有必要的。除非你把多的数据放到文件或者数据库中,这样设计才有意义。
    你可能是担心ArrayList每次删掉前面的数据时,后面的数据会搬动。但是实际上这样老是需要在前面删除数据,在后面加入数据的应用需求,你本来就不能用ArrayList而应该用LinkedList。
      

  5.   

    如果你对以上我说的没有异议,那么我可以给你出一个方案:
    1、使用BlockingQueue作为数据容器。
    2、数据中增加一个属性,用于保存使用过该数据的消费者
    3、消费者每次都从BlockingQueue取数据,当发现该数据已经使用过了,就等待其他消费者使用完该数据
    4、当最后一个消费者消费完该数据时,把这个数据从BlockingQueue中remove
      

  6.   

    貌似楼主的资源一个只能被消费一次,如有资源A,B,C,如果消费者I消费了A,那就只剩下B,C,消费者II只能去消费B,C了,是这样的吗?
      

  7.   

    写了个例子,看看合适不合适:package csdn;import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;/**
     * data
     */
    class Data{
    public Data(String message){
    this.message=message;
    }

    private String message; public String getMessage() {
    return message;
    } public void setMessage(String message) {
    this.message = message;
    }
    }
    /**
     * Data Keeper
     */
    class DataKeeper<T>{
    //最大队列长度
    public static final int MAXQUEUELENGTH=100;
    //消费者个数
    public static final int MAXCONSUMER=5;

    //用于存储数据的队列
    private BlockingQueue<T> queue=new ArrayBlockingQueue<T>(MAXQUEUELENGTH);
    //用于暂存队列保存不下的数据的容器
    private List<T> tempdata=new LinkedList<T>();

    //记录哪些消费者已经消费当前消息的容器
    private List<Consumer> currentDataConsumer=new ArrayList<Consumer>();

    //消费消息
    public T getData(Consumer consumer){
    if (currentDataConsumer.contains(consumer))
    return null;
    if (queue.isEmpty())
    return null; if (currentDataConsumer.size()==0){
    System.out.println();
    printStatus();
    }
    currentDataConsumer.add(consumer);

    T d=null;
    if (MAXCONSUMER==currentDataConsumer.size()){
    d=queue.poll();
    currentDataConsumer.clear();
    copyDataFromTemp();
    //System.out.println();

    } else {
    d=queue.peek();
    }

    return d;
    }

    //保存单条消息
    public void putData(T d){
    if (queue.size()<MAXQUEUELENGTH){
    queue.add(d);
    } else {
    tempdata.add(d);
    }
    }

    //保存多条消息
    public void putData(List<T> datas){
    while ((queue.size()<MAXQUEUELENGTH)&&datas.size()>0){
    queue.add(datas.remove(0));
    }
    for (int i=0;i<datas.size();i++){
    tempdata.add(datas.get(i));
    }
    }

    private void copyDataFromTemp(){
    synchronized(tempdata){
    if (queue.size()<MAXQUEUELENGTH&&tempdata.size()>0){
    for (int i=0;i<min(MAXQUEUELENGTH-queue.size(),tempdata.size());i++){
    queue.add(tempdata.remove(i));
    }
    }
    }
    }

    private int min(int n1,int n2){
    return (n1>n2?n2:n1);
    }

    private void printStatus(){
    System.out.println("queue.size()="+queue.size()+"\ttempdata.size()="+tempdata.size());
    }
    }/**
     * 生产者
     */
    class Producer implements Runnable {
    static Random rand=new Random();
    private final DataKeeper<Data> dataKeeper;

    private int producttimes=0;
    Producer(DataKeeper<Data> dataKeeper) {
    this.dataKeeper = dataKeeper;
    } public void run() {
    while (true){
    producttimes++;
    int count=rand.nextInt(200)+1;
    List<Data> datas=new LinkedList<Data>();
    for (int i=0;i<count;i++){
    datas.add(new Data("第"+producttimes+"次产生的第"+(i+1)+"条数据"));
    }
    if (datas.size()==1){
    dataKeeper.putData(datas.get(0));
    } else {
    dataKeeper.putData(datas);
    }
    //dataKeeper.printStatus();
    try {
    Thread.sleep(2500);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }
    }/**
     * 消费者
     */
    class Consumer implements Runnable {
    private final DataKeeper<Data> dataKeeper;
    private int id;
    public int getId() {
    return id;
    }
    public void setId(int id) {
    this.id = id;
    }
    Consumer(DataKeeper<Data> dataKeeper,int id) {
    this.dataKeeper = dataKeeper;
    this.id=id;
    }
    @Override
    public boolean equals(Object o){
    if (!(o instanceof Consumer))
    return false;

    return this.id==((Consumer)o).id;
    }

    static Random rand=new Random();

    public void run() {
    while (true) {
    Data d=dataKeeper.getData(this);
    if (d!=null){
    // ToDo 此处是消费代码,本演示只打印消息
    System.out.print("["+id+"] ");
    }

    int sleep=rand.nextInt(30);
    try {
    Thread.sleep(sleep);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }
    }/**
     * 一个生产者、5个消费者
     */
    public class BlockingQueueExample {
    public static void main(String[] args) {
    DataKeeper<Data> dataKeeper = new DataKeeper<Data>();
    Producer p = new Producer(dataKeeper);
    new Thread(p).start();

    for (int i=1;i<=DataKeeper.MAXCONSUMER;i++){
    Consumer c = new Consumer(dataKeeper,i);
    new Thread(c).start();
    }
    }}
      

  8.   

    非常感谢各位。后面忙于项目就没有上论坛了,目前已经解决。
    现在结贴,感谢各位支持,特别是hbwhwang同学。