我想做一个多线程共享的东东主要功能是,一个产生数据的线程放到一个公共的LIST里,另外多个消耗线程各有自己的私有LIST,消耗线程对自己的LIST数据进行消耗,边消边清,然后一个主线程用来控制将生产线程的公共数据均分到多个消耗线程的私有LIST里,哪位大侠能写个简单DEMO供参考下啊。由于分不多。还请大家不吝啬赐教。
解决方案 »
- Java Swing怎么调用系统计算器
- 逆向思维大考验
- 求助:Integer.parseInt(test); 如果test="test"的话。。。。
- [高手请入,已折磨我整整24小时]几个让我极为困惑的问题(涉及setOpaque(),JScrollPane,GridBagLayout(),JTextPane)
- 线程执行中遇到异常会阻塞吗?
- SOS请教如何配置IntelliJ IDEA v3.0.5?
- java的正则表达式的问题.
- 如何使用jsdk类库中的接口?
- 请教:在Java中如何自销毁一个对象,或如何销毁一个对象?
- 如何在Linux(Red Hat Linux 7.2)下卸载JBuilder 6,重装??Linux系统字符集由中文更改为英文后,JBuilder支持的字体极少!!
- 求计算机类Java相关的中英文对照文章(5000字左右)
- 高分求助图片等比缩放算法?
public static void main(String[] args) {
try {
JobThread jobThread = new JobThread();
jobThread.start();
ExecuteThread[] executeThread = new ExecuteThread[10];
for (int i=0; i<executeThread.length; i++) {
executeThread[i] = new ExecuteThread((i+1)*100); //模拟不同的线程处理数据的时间不同
executeThread[i].start();
}
int count = 0;
while (true) {
String job = jobThread.getJob();
executeThread[count%10].addJob(job); //一般的平均分配
/* 按待处理数据最少的分配
int min=0, idx=0;
for (int i=0; i<executeThread.length; i++) {
int tmp = executeThread[i].getJobCount(); //获取当前线程的待处理数据件数
if (min < tmp) {
min = tmp;
idx = i; //找到待处理数据件数最少的线程
}
}
executeThread[idx].addJob(jobThread.getJob());
*/
System.out.printf("%s: assign %s to %s\n",
Thread.currentThread().getName(),
job, executeThread[count%10].getName());
//yield();
Thread.sleep(200); //为了让程序执行缓慢以便查看结果所以让主线程休眠一会,
//不要太快分配任务
count++;
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}class JobThread extends Thread {
List<String> jobQueue = new ArrayList<String>(); public void run () {
int count = 0;
while (true) {
try {
synchronized(jobQueue) {
if (jobQueue.size() > 10) { //为了避免产生的数据过多,
//当数据达到一定时暂时停止产生数据
jobQueue.wait();
}
String job = "job-" + (count++);
jobQueue.add(job);
System.out.println(getName() + ": create " + job);
jobQueue.notifyAll();
}
yield();
} catch (Exception e) {e.printStackTrace();}
}
} public String getJob() {
String job = null;
try {
synchronized(jobQueue) {
if (jobQueue.size() == 0) {
jobQueue.wait();
}
job = jobQueue.remove(0);
jobQueue.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
return job;
}
}class ExecuteThread extends Thread {
List<String> jobQueue = new ArrayList<String>();
int sleepTime = 0;
public ExecuteThread(int sleepTime) {
this.sleepTime = sleepTime;
}
public void run() {
while (true) {
try {
synchronized(jobQueue) {
if (jobQueue.size() == 0) {
jobQueue.wait();
}
System.out.println(getName() + ": execute " + jobQueue.remove(0));
}
//yield();
sleep(sleepTime); //模拟不同的线程处理数据的时间不同
} catch (Exception e) {
e.printStackTrace();
}
}
} public void addJob(String job) {
try {
synchronized (jobQueue) {
jobQueue.add(job);
jobQueue.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
} public int getJobCount() {
synchronized(jobQueue) {
return jobQueue.size();
}
}
}
public static void main(String[] args) {
try {
JobQueue<String> queue = new JobQueue<String>(10); //设置公共LIST最大容量为10
JobThread jobThread = new JobThread(queue); //创建一个生产线程
jobThread.start();
ExecuteThread[] executeThread = new ExecuteThread[10]; //创建10个消耗线程
for (int i=0; i<executeThread.length; i++) {
executeThread[i] = new ExecuteThread(5, (10-i)*500); //设置私有最大容量为5
executeThread[i].start();
}
int count=0, idx=0;
while (true) { //主线程从公共LIST取数据分配给各消耗线程
String job = queue.getJob();
if (count < 10) { //第一次平均分配
idx = count%10;
executeThread[idx].addJob(job); //一般的平均分配
} else {
//按待处理数据最少的分配
int min = 5;
for (int i=0; i<executeThread.length; i++) {
int tmp = executeThread[i].getJobCount();
if (min > tmp) {
min = tmp;
idx = i; //找到待处理数据件数最少的线程
}
}
executeThread[idx].addJob(job);
}
System.out.printf("%s: assign %s to %s\n",
Thread.currentThread().getName(),
job, executeThread[idx].getName());
Thread.sleep(300);
count++;
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}class JobThread extends Thread { //生产数据线程
private JobQueue<String> queue; public JobThread(JobQueue<String> queue) {
this.queue = queue;
} public void run () {
int count = 0;
while (true) {
try {
String job = "job-" + (count++);
queue.addJob(job);
System.out.println(getName() + ": create " + job);
yield();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}class ExecuteThread extends Thread { //消耗数据线程
JobQueue<String> queue;
int delay; public ExecuteThread(int capacity, int delay) {
queue = new JobQueue<String>(capacity);
this.delay = delay;
} public void run() {
while (true) {
try {
sleep(delay);
String job = queue.getJob();
System.out.println(getName() + ": execute " + job);
} catch (Exception e) {
//
}
}
} public void addJob(String job) {
queue.addJob(job);
} public int getJobCount() {
return queue.getJobCount();
}
}class JobQueue<T> { //自己封装LIST
private List<T> queue = new ArrayList<T>();
private int capacity; public JobQueue(int capacity) {
this.capacity = capacity;
} public void addJob(T job) {
synchronized(queue) {
if (queue.size() >= capacity) {
try {
queue.wait();
} catch (InterruptedException e) {
//
}
}
queue.add(job);
queue.notifyAll();
}
} public T getJob() {
synchronized(queue) {
if (queue.size() == 0) {
try {
queue.wait();
} catch (InterruptedException e) {
//
}
}
T job = queue.remove(0);
queue.notifyAll();
return job;
}
} public int getJobCount() {
synchronized(queue) {
return queue.size();
}
}
}
第一个当主程序调用jobThread.getJob();的时候,getJob方法里有notifyAll
第二个当主程序调用String job = queue.getJob();的时候,getJob有nofityAll
也就是说主程序取出数据分配的时候就会唤醒生产线程,告诉他数据我拿走了,你不要wait了,赶紧生产
JobThread jobThread = new JobThread(queue);它将queue赋给捞取线程的私有queue。如果这个不是私有的。而是public static的。那么还能同步不紊乱么?