我想做一个多线程共享的东东主要功能是,一个产生数据的线程放到一个公共的LIST里,另外多个消耗线程各有自己的私有LIST,消耗线程对自己的LIST数据进行消耗,边消边清,然后一个主线程用来控制将生产线程的公共数据均分到多个消耗线程的私有LIST里,哪位大侠能写个简单DEMO供参考下啊。由于分不多。还请大家不吝啬赐教。

解决方案 »

  1.   

    for exampleimport java.util.*;public class MyJob {
        public static void main(String[] args) {
            try {
                JobThread jobThread = new JobThread();
                jobThread.start();
                ExecuteThread[] executeThread = new ExecuteThread[10];
                for (int i=0; i<executeThread.length; i++) {
                    executeThread[i] = new ExecuteThread((i+1)*100); //模拟不同的线程处理数据的时间不同
                    executeThread[i].start();
                }
                int count = 0;
                while (true) {
                    String job = jobThread.getJob();
                    executeThread[count%10].addJob(job); //一般的平均分配
                    /* 按待处理数据最少的分配
                    int min=0, idx=0;
                    for (int i=0; i<executeThread.length; i++) {
                        int tmp = executeThread[i].getJobCount(); //获取当前线程的待处理数据件数
                        if (min < tmp) {
                            min = tmp;
                            idx = i; //找到待处理数据件数最少的线程
                        }
                    }
                    executeThread[idx].addJob(jobThread.getJob());
                    */
                    System.out.printf("%s: assign %s to %s\n", 
                                      Thread.currentThread().getName(), 
                                      job, executeThread[count%10].getName());
                    //yield();
                    Thread.sleep(200); //为了让程序执行缓慢以便查看结果所以让主线程休眠一会,
                                            //不要太快分配任务
                    count++;
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }class JobThread extends Thread {
        List<String> jobQueue = new ArrayList<String>();    public void run () {
            int count = 0;
            while (true) {
                try {
                    synchronized(jobQueue) {
                        if (jobQueue.size() > 10) { //为了避免产生的数据过多,
                                                           //当数据达到一定时暂时停止产生数据
                            jobQueue.wait();
                        }
                        String job = "job-" + (count++);
                        jobQueue.add(job);
                        System.out.println(getName() + ": create " + job);
                        jobQueue.notifyAll();
                    }
                    yield();
                } catch (Exception e) {e.printStackTrace();}
            }
        }    public String getJob() {
            String job = null;
            try {
                synchronized(jobQueue) {
                    if (jobQueue.size() == 0) {
                        jobQueue.wait();
                    }
                    job = jobQueue.remove(0);
                    jobQueue.notifyAll();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return job;
        }
    }class ExecuteThread extends Thread {
        List<String> jobQueue = new ArrayList<String>();
        int sleepTime = 0;
        public ExecuteThread(int sleepTime) {
            this.sleepTime = sleepTime;
        }
        public void run() {
            while (true) {
                try {
                    synchronized(jobQueue) {
                        if (jobQueue.size() == 0) {
                            jobQueue.wait();
                        }
                        System.out.println(getName() + ": execute " + jobQueue.remove(0));
                    }
                    //yield();
                    sleep(sleepTime); //模拟不同的线程处理数据的时间不同
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }    public void addJob(String job) {
            try {
                synchronized (jobQueue) {
                    jobQueue.add(job);
                    jobQueue.notifyAll();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }    public int getJobCount() {
            synchronized(jobQueue) {
                return jobQueue.size();
            }
        }
    }
      

  2.   

    改进一下,模拟按待处理数据最少分配import java.util.*;public class MyJob {
        public static void main(String[] args) {
            try {
                JobQueue<String> queue = new JobQueue<String>(10); //设置公共LIST最大容量为10
                JobThread jobThread = new JobThread(queue); //创建一个生产线程
                jobThread.start();
                ExecuteThread[] executeThread = new ExecuteThread[10]; //创建10个消耗线程
                for (int i=0; i<executeThread.length; i++) {
                    executeThread[i] = new ExecuteThread(5, (10-i)*500); //设置私有最大容量为5
                    executeThread[i].start();
                }
                int count=0, idx=0;
                while (true) { //主线程从公共LIST取数据分配给各消耗线程
                    String job = queue.getJob();
                    if (count < 10) { //第一次平均分配
                        idx = count%10;
                        executeThread[idx].addJob(job); //一般的平均分配
                    } else {
                        //按待处理数据最少的分配
                        int min = 5;
                        for (int i=0; i<executeThread.length; i++) {
                            int tmp = executeThread[i].getJobCount();
                            if (min > tmp) {
                                min = tmp;
                                idx = i; //找到待处理数据件数最少的线程
                            }
                        }
                        executeThread[idx].addJob(job);
                    }
                    System.out.printf("%s: assign %s to %s\n", 
                                      Thread.currentThread().getName(), 
                                      job, executeThread[idx].getName());
                    Thread.sleep(300);
                    count++;
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }class JobThread extends Thread { //生产数据线程
        private JobQueue<String> queue;    public JobThread(JobQueue<String> queue) {
            this.queue = queue;
        }    public void run () {
            int count = 0;
            while (true) {
                try {
                    String job =  "job-" + (count++);
                    queue.addJob(job);
                    System.out.println(getName() + ": create " + job);
                    yield();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }class ExecuteThread extends Thread { //消耗数据线程
        JobQueue<String> queue;
        int delay;    public ExecuteThread(int capacity, int delay) {
            queue = new JobQueue<String>(capacity);
            this.delay = delay;
        }    public void run() {
            while (true) {
                try {
                    sleep(delay);
                    String job = queue.getJob();
                    System.out.println(getName() + ": execute " + job);
                } catch (Exception e) {
                    //
                }
            }
        }    public void addJob(String job) {
            queue.addJob(job);
        }    public int getJobCount() {
            return queue.getJobCount();
        }
    }class JobQueue<T> { //自己封装LIST
        private List<T> queue = new ArrayList<T>();
        private int capacity;    public JobQueue(int capacity) {
            this.capacity = capacity;
        }    public void addJob(T job) {
            synchronized(queue) {
                if (queue.size() >= capacity) {
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        //
                    }
                }
                queue.add(job);
                queue.notifyAll();
            }
        }    public T getJob() {
            synchronized(queue) {
                if (queue.size() == 0) {
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        //
                    }
                }
                T job = queue.remove(0);
                queue.notifyAll();
                return job;
            }
        }    public int getJobCount() {
            synchronized(queue) {
                return queue.size();
            }
        }
    }
      

  3.   

    你说的是第一个还是第二个例子?
    第一个当主程序调用jobThread.getJob();的时候,getJob方法里有notifyAll
    第二个当主程序调用String job = queue.getJob();的时候,getJob有nofityAll
    也就是说主程序取出数据分配的时候就会唤醒生产线程,告诉他数据我拿走了,你不要wait了,赶紧生产
      

  4.   

    嗯,没仔细看。两个还有区别。每一个貌似还比较好懂。在main线程中唤醒捞取线程。第二个线程我一直揪纠的是。  JobQueue<String> queue = new JobQueue<String>(10); //设置公共LIST最大容量为10
    JobThread jobThread = new JobThread(queue);它将queue赋给捞取线程的私有queue。如果这个不是私有的。而是public static的。那么还能同步不紊乱么?
      

  5.   

    还有昨天一直在揪纠,究竟在主线程中是获得queue的锁呢还是获得new JobQueue<String>(10)这个的锁。后来我想想应该是获得后面的锁吧