本帖最后由 paullbm 于 2009-11-04 18:13:23 编辑

解决方案 »

  1.   

    元素每次增长的时候 都判断一下是否超过200个元素  如果超过就调用一下poll操作 这样不是很简单吗?
    也不用线程去监视了 
      

  2.   

    咕~~(╯﹏╰)b,你还挺聪明的!
    那你看看Scala的Actor类,如果你做的是项目,建议直接用Scala,反正可以随便调用java的class。你说的方式,应该是处理并发比较好的办法了。轮询肯定是效率最差的那种了。----------------------------------------------------------------------
    目前Java,你可以使用wait和notifyAll来模拟我说的Actor,不知道效率会怎么样。
    其实考虑到实际操作系统的特性,Windowss的消息机制,就是一个很好的例子。
      

  3.   


      队列类持有控制线程的引用,每put一个数据到队列中时都检查一下当前队列的大小,如果发现
    达到200,则调用控制线程去处理。  其实就是将你上面轮询的做法反过来,你原来用线程轮询监视队列中的大小,现在是队列自己去
    监视自己,达到一定要求就通知处理线程去处理。
      

  4.   

    如果并发量不是很大的话就简单的wait和notify机制就可以实现
      

  5.   


    我按照你的方法进行了试验,发现这种方法理论上行得通。但实践一下就出问题了
    具体问题所在:比如这个促使元素增长的方法名为insert,然后每一次在insert方法里进行队列元素个数的判断,超过了则poll操作。但是由此而会引发insert方法改变队列元素个数与队列的poll方法不同步的问题,这样就会导致很多数据会被重复读取。然而一旦将insert方法设为synchronized时,效率就明显下降了!!
      

  6.   

    我是楼主,补充8楼:
     就算将insert方法设为同步的,仍然会有重复读取的问题!!!
      

  7.   


    private synchronized void addData(Object value)
    {
    if (m_receivedData == null)
    m_receivedData = new ConcurrentLinkedQueue(); ((ConcurrentLinkedQueue) m_receivedData).add(value); while (((ConcurrentLinkedQueue) m_receivedData).size() > 200)
    {
    DOSOMETHING (((ConcurrentLinkedQueue) m_receivedData).poll()); }
    }
      

  8.   

    给个意见 
    看看阻塞队列的知识java.util.concurrent 
    接口 BlockingQueue<E>
    类型参数:
    E - 在此 collection 中保持的元素类型
    所有超级接口: 
    Collection<E>, Iterable<E>, Queue<E> 
    所有已知子接口: 
    BlockingDeque<E> 
    所有已知实现类: 
    ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, 
    PriorityBlockingQueue, SynchronousQueue take
    E take()
           throws InterruptedException获取并移除此队列的头部,在元素变得可用之前一直等待(如果有
    必要)。 返回:
    此队列的头部 
    抛出: 
    InterruptedException - 如果在等待时被中断--------------------------------------------------------------------------------put
    void put(E e)
             throws InterruptedException将指定元素插入此队列中,将等待可用的空间(如果有必要)。
     
    参数:
    e - 要添加的元素 
    抛出: 
    InterruptedException - 如果在等待时被中断 
    ClassCastException - 如果指定元素的类不允许将其添加到此队列 
    NullPointerException - 如果指定元素为 null 
    IllegalArgumentException - 如果指定元素的某些属性不允许将其添加到此队列
    写一个类继承或包含一个实现了 BlockingQueue接口的类 重写或包装一下 take() put()
    实现put()达到200阻塞 take() 在达到200() 时可用
     
      

  9.   

    我只知道要效率高的不能轮询和sleep。加锁吧。
    sleep是很耗系统资源的,不能释放已经占有的资源。
      

  10.   

    触发方式就用wait()和notify() 
    处理线程(A线程)里面object.wait()队列增加线程(B线程),当>200是酒object.notifyAll(),唤醒A线程进行处理就可以了。
      

  11.   

    这段代码其实就是3#说的方法的一个实现。
    1. 队列在加元素的时候去判断是否已有200个元素在队列里,并调用其他线程处理。
    2. 方法体还是需要synchronized同步,效率应该没有太大问题。这段代码中我在Socket应用过。
    3. 楼主需要考虑当队列中的元素最终不超过200个时如何处理,确保不丢失数据。
      

  12.   


    我在常量池中定义的队列就是ArrayBlockingQueue类型的啊!!
      

  13.   


    客户端提交时是一条一条的发送过来,但是客户端的数量是比较大的,而且每个客户端的发送频率都比较高。
    也就是说客户端只负责提交数据,关于如何处理那是它不需要关心的,而服务端则是根据其所维护的队列的大小(比如超过200时)进行批量保存。而对队列元素的个数的增加操作则是服务端对客户端提供的一个RPC方法调用接口来执行的。
      

  14.   


    用一个function去做这个判断  在判断的时候把insert停掉
    直到作出判断(或者没有超过200或者poll掉) 然后再开启insert
      

  15.   


    你这样试试看 应该效率会提高很多private volatile Object 200个的队列 = new Object();
    public synchronized void insert(Object obj){
             200个的队列.add(obj);
    }
    public boolean isFull(){
    //isfull?
    }
      

  16.   

    public @interface Guarded {
    String value();
    }
    @Guarded("this") private volatile Object 200个的队列 = new Object();
    public synchronized void insert(Object obj){
             200个的队列.add(obj);
    }
    public boolean isFull(){
    //isfull?
    }
      

  17.   


    按照搂主描述,在服务器端似乎是有一个队被所有的并发访问共有
    为了效率和并发问题,我个人认为,是否追加缓冲队,该队可无限增加,当原队满的时候使用,然后定时调用插入线程,判断缓冲队是否为空来确定执行插入。
    此次调用会直接将原队200条以及缓冲队n条全部保存。
    需要注意的就是定时调用的周期应该根据并发提交的频率确定,为了保证缓冲队不会过大,定时肯定不能太长(如果有条件,这里可以通过统计并发量与time的函数),另外就是执行插入的时候,原队是否需要syn的考虑,个人认为可以先将原队导入缓冲队,然后释放原队给客户端,插入线程去操作缓冲队。
      

  18.   


    还有一个办法是在你那200个数据的队列外面在包一个队列也就是需要2个Queue  第一个Queue用来接收客户端发过来的东西 异步
    第二个Queue用来处理第一个Queue并且从第二个Queue里面判断isFull并插入数据库因为你数据的队列始终是要同步的  所以同步方法总是要用到的
      

  19.   

    按照楼主原话,应该是想从原来无限循环的监视+插入改为外部触发+调用插入,外部触发依然需要监视原队的数量如果不是手动,又不用定时,那只有使用插入1条监检测一次的方法。根据楼上其他人想法,楼主在考虑是否在插入的时候syn原队的效率问题。
    这个时候,如果有一个缓冲队呢?无论何时,客户端都可以进行提交,首选原队,当原队被判断出大于200,则syn原队并执行插入,此时客户端提交的数据暂存缓冲队。等原队的数据库操作完成后释放原队给客户端,然后后台保存缓冲队。这样,客户端不会有任何效率问题。当然,客户端传输过程中,必然会存在服务器端syn原队导致其他并发失败的情况,这种数据传送必然需要备份系统作为支持
      

  20.   


    package com.home;import java.util.LinkedList;public class Queue<E> { /** 队列的最大个数 */
    public static final int MAX_SIZE = 200; public static final Object object = new Object(); /** 队列 */
    private LinkedList<E> list = new LinkedList<E>(); /** 添加至队尾 */
    public synchronized void add(E e) {
    list.addLast(e); // 添加至队尾 // 大于最大个数
    if (list.size() > MAX_SIZE) {
    synchronized (object) {
    object.notifyAll(); // 唤醒处理线程
    }
    }
    } /** 移除第一个 */
    public synchronized E remove() {
    return list.removeFirst();
    } public synchronized int size() {
    return list.size();
    } public synchronized boolean isProcess() {
    return list.size() > MAX_SIZE;
    }
    }
    package com.home;public class Business implements Runnable { private Queue<?> queue; public Business(Queue<?> q) {
    this.queue = q;
    } @Override
    public void run() {
    while (true) {
    try {
    process(); // 业务处理
    synchronized (Queue.object) {
    Queue.object.wait(); // 处理完一直处于等待状态
    }
    } catch (Exception e) {
    e.printStackTrace(); //保证出异常线程不会死掉,除非出现error
    }
    } } private void process() {
    // 业务处理......
    while (queue.isProcess()) {
    System.out.println((String) queue.remove());
    }
    }
    }
    package com.home;import java.util.concurrent.TimeUnit;public class QueueTest { /**
     * @param args
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException {
    Queue<String> queue = new Queue<String>();

    Business bus = new Business(queue);
    new Thread(bus).start(); // 启动业务线程

    //添加队列
    for (int i = 0; i <= 300; i++)
    {
    queue.add(String.valueOf(i));
    }

    TimeUnit.SECONDS.sleep(60);
    System.out.println("Queue size=" + queue.size());
    }}如果程序运行两天了,列队一直都是198个,就是不满,这时用户重启了,数据丢失,死翘翘,不知道你设计这个用在什么方面
      

  21.   


    我要批量处理的目的就是为了减少对数据库的I/O操作,这样才能提高程序的效率。
    而且有一点我想需要指出,我定义的队列是ArrayBlockingQueue类型的,它定义时需要指定一个容量参数capacity。而这个capacity指的是这个队列能够存放多少个元素,而非我在上面所说的200。我将capacity定义为1500. 200指的是一旦发现队列中存放的元素达到这么多的时候,开始一次批处理。
    而之所以将容量capacity定义为1500就是为了考虑在多客户端高频率给服务端发送数据(每个客户端一次只会发送一条)时,使得队列不会溢出!
      

  22.   

    如果要一次处理200个,就改下噻,刚好现在很闲

    /** 一次移除200个 */
    public synchronized List<E> removeSub() {
    if (list.size() > MAX_SIZE) {
    List<E> result = new ArrayList<E>(MAX_SIZE);
    for (int i = 0; i <= MAX_SIZE - 1; i++) {
    result.add(list.removeFirst());
    }
    return result;
    } else {
    return null;
    }
    }
      

  23.   

    ArrayBlockingQueue是个阻塞队列,有一条就会取一条出来,如果要用一次处理很多条的话,你还得改造一下,那还不如自己写个队列
      

  24.   

    不知道lz是怎么区分轮询的,看到while就是轮训?
    那就用线程池吧,如果队列大于200个,就取出200个,构造业务处理对象,扔进线程池处理吧,这个不是轮询,是触发方式了么?
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;public class ThreadTool {
    private static final ExecutorService tp = Executors.newCachedThreadPool();        /**
             * @param list 那200个
             */
    public static void execute(List<?> list) {
    tp.execute(new 实现Runnable接口的业务类(list));
    }
    }
      

  25.   

    去看一下 JDK6/JDK7 对线程做的优化。。
      

  26.   

    请教gao11811
    44楼的方法,哪里中止了并发了??
      

  27.   


    不明白吗?当你使用remove的时候,linkedList对象被锁住,这里是你提供的process方式,所用时间忽略,但是按照楼主的实际情况,这里对队的操作需要连接数据库,哪怕连接一次,插入一条,你认为需要多少时间,假设1秒,如果在这1秒内,有其他客户端提交新的数据呢?此时的队是被锁住的,那就是数据丢失,客户端报错。就拿你这个例子来说,你在remove的时候加入延迟,你看看数据还正确不?
    第2个问题就是效率问题,1次处理一条。。不现实。如果用你的方式批处理,则丢失的数据还要多,因为锁的时间更长
      

  28.   


    remove方法调用完就释放对象锁了啊,remove方法本来就是很快的,process方法也没有锁住队列啊,连接数据库也是移除了队列里面的对象并释放了对象锁以后的事情的处理了
    而且队列被锁住,添加数据时不会丢失,会等待释放对象锁,这里都是服务器端的处理,不存在网络问题第二点倒是同意,取200条的那方法效率真不咋地,
    不知道有什么高效率的方法移除队列中的前200条
      

  29.   

    不知道楼主用没有 hibernate ,不妨去看看它的源码的实现。
      

  30.   

    楼主再改一下,是pull吧,不是poll. :)
      

  31.   


    每put一个数据到队列中时都检查一下当前队列的大小,这跟轮询有啥区别,差不多~~
      

  32.   


    process中调用了remove方法,按照楼主的要求,这里在业务中就应该是连接数据库了,如果数据库没有操作结束,remove是不会释放锁的。在这样长时间的数据库操作中,队列将不能接收客户端的数据。
      

  33.   

    你在程序中只写了process业务处理,只包括的revome,并没有说明白阿。而且就这200个元素移除也需要时间(这段时间中队列被锁),此时的多例模式+高并发,数据肯定丢失(我用单例高速提交,都已经丢失了).还有,我对单从这个程序的理解是:当队列大于200的时候将会调用removeFirst(),但是这个方法只移动一条,现实中肯定不可能.
    另外,构造动态数组不就是我在43楼说的增加缓冲队吗?
      

  34.   


    一点看法。首先移除数据时,不要一个一个移除,要锁定lock object,然后重新构造队列。
    比如:/** 一次移除200或更多个 */
    public List<E> removeSub() {
        synchronized (object) {
            List<E> result = new LinkedList<E>();
            result.addAll(list);        list.clear();
            
            object.notifyAll();
        }
        
        return result;
    }
    另外,新的队列应该用新建的线程或者线程池的线程处理。附带讲一点,44楼的代码synchronized有问题。正确做法可参考上面的代码。
      

  35.   

    回ls
    那个object是用来唤醒线程用的,不是用来锁队列的,移除队列肯定要锁住队列
    改了下,不移除队列的前200个,瓶颈,不知道jvm分配空间快还是一个个移除快,要出差了,没时间上网了package com.home;import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;public class ArrayQueue<E> {
    private int MAX_SIZE = 200; private List<E> list; public ArrayQueue() { } public synchronized void add(E e) {
    list.add(e);
    if (list.size() >= MAX_SIZE) {
    ThreadToolUtil.execute(list);
    list = new ArrayList<E>(MAX_SIZE);
    }
    }
    }class ThreadToolUtil {
    private static final ExecutorService tp = Executors.newCachedThreadPool(); public static void execute(List<?> list) {
    tp.execute(new BusinessProcess(list));
    }
    }class BusinessProcess implements Runnable {
    private List<?> list; public BusinessProcess(List<?> list) {
    this.list = list;
    } @Override
    public void run() {
    // 备份,业务处理等......
    }
    }
      

  36.   

    这样写就实现了部分缓冲队的原理了,不过还需要考虑
    result.addAll(list);
    list.clear();
    这2句执行的需要时间,如果元素较大,或者保存类型比较复杂,200个数量跟随业务变大,都会导致数据丢失,所以应该从健壮性上增强对该队的维护(比如备份、2级备份、在remove的时候给与客户端2级队列作为数据缓冲)
      

  37.   

    这个很简单啊你可以用C#中的一个信号量的变量 AutoResetEvent  (通知正在等待的线程已发生事件)当然一个类变量 AutoResetEvent mutex = new AutoResetEvent();线程1                             线程2
    while(true)                     while(true)
    {                               {
        获取数据                       mutex.WaitOne();
         加入到队列                           取队列元素。。
         满200就执行 mutex.Set();         处理数据
    }                               }   
      

  38.   


    很有道理,当对队列里的数据进行保存操作的时候,那客户端的数据岂不是无法提交到服务器了?
    我觉得楼主应该用一个缓冲器,用ArrayList等链表集合去保存客户端传过来的数据,然后有一个定时线程去扫缓冲链表中的数据入队列
    队列自己判断是否达到200上限,达到的时候,进行一次批处理这样就可以解决客户端高并发的传输数据的问题了,同时数据也不回丢失,除非服务器重启
      

  39.   

    这个帖子不错   DDDDDDDDDDD