需求:
现有一个List,List中的数据作为线程所要调用的方法中的参数,现在需要通过多线程来运行调用方法,方法执行完后返回参数然后再去跑List中的其他数据,每次跑的时候最多4个线程,直到List中数据迭代完!
求思路,求代码,求示例!
现有一个List,List中的数据作为线程所要调用的方法中的参数,现在需要通过多线程来运行调用方法,方法执行完后返回参数然后再去跑List中的其他数据,每次跑的时候最多4个线程,直到List中数据迭代完!
求思路,求代码,求示例!
解决方案 »
- ConcurrentMap
- 请帮我看一下,这段servlet为什么不能访问oracle10g数据库,差在哪里??
- 请教大家一个问题?
- 监控线程 急
- 求一个flex与java servlet交互!
- hql语句查询问题
- 急救!weblogic启动有误sci.isResourceStale("/index.jsp", 1119690976000L, "8.1.2.0", "Asia/Shanghai
- 大家帮我看看操作数据源的BEAN,错误在那里,先行谢谢了
- JB数据库程序两问题,请教高手,谢谢!
- 请问在WEBLOGIC下jsp如何调用EJB?
- Hibernate 自身一对多的映射, hql级联查询的问题..
- 关于一个sql问题啊,其实也不是很难,只是本人太菜啦
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;public class ThreadTaskManager {
/** the total number of task **/
private static final int TASK_NUMBER = 1000;
/** the total number of thread **/
private static final int THREAD_NUMBER = 100;
/** the task list that will be execute **/
private static final List<TaskInfomation> allTasks = new ArrayList<TaskInfomation>(
TASK_NUMBER); /**
* the entrance of main
*
* @param args
*/
public static void main(String[] args) {
// build the instance
final ThreadTaskManager t = new ThreadTaskManager(); // build the task
t.buildTask(); // thread pool
ExecutorService service = null;
try {
service = Executors.newFixedThreadPool(THREAD_NUMBER);
service.invokeAll(t.buildTaskExecution()); } catch (InterruptedException ex) {
System.out.println(ex.getMessage());
} finally {
if (service != null) {
service.shutdown();
}
}
} /**
* build the task
*/
private void buildTask() {
for (int i = 0; i < TASK_NUMBER; i++) {
allTasks.add(new TaskInfomation(i));
}
Collections.synchronizedList(allTasks);
} /**
* buildTaskExecution
*/
private List<TaskExecution> buildTaskExecution() {
List<TaskExecution> executions = new ArrayList<TaskExecution>();
for (int i = 0; i < THREAD_NUMBER; i++) {
executions.add(new TaskExecution(allTasks));
}
return executions;
} /**
* TaskInfomation
*/
static class TaskInfomation implements Serializable {
private static final long serialVersionUID = 9189633146186577956L;
private int taskIndex;
private volatile TaskStaus status;
private transient Thread ownerThread; public TaskInfomation(int taskIndex) {
this.taskIndex = taskIndex;
this.status = TaskStaus.QUEUE;
} public int getTaskIndex() {
return taskIndex;
} public boolean isStatus(TaskStaus status) {
return (this.status == status);
} public void updateStatus(TaskStaus statusPara) {
this.status = statusPara;
} public void registeTheThread(final Thread t) {
if (ownerThread == null) {
ownerThread = t;
}
}
} /**
*
*/
static enum TaskStaus {
QUEUE, WORKING, DONE
} /**
* TaskExecution
*/
static class TaskExecution implements Callable<Object> {
private List<TaskInfomation> tasks;
/** the number of already done task **/
private static final AtomicInteger alreadyDoneTasks = new AtomicInteger();
private static final Random R = new Random();
private static final int MAX_TLEEP_TIME = 1; /**
*
* @param allTasks
* @param alreadyDoneTasks
*/
public TaskExecution(final List<TaskInfomation> allTasks) {
this.tasks = allTasks;
} @Override
public Object call() {
final Thread ct = Thread.currentThread(); // do loop until all the task is completed successfully
while (true) { // if isInterrupted, return directly
if (ct.isInterrupted()) {
System.out
.println("the current thread is Interrupted..thread id:"
+ ct.getId());
return null;
} // if the all job is done, return directly
if (isAllTaskDone()) {
System.out.println("all the task is already done..");
return null;
} // loop each task and do task
for (final TaskInfomation task : tasks) {
try {
synchronized (task) {
// if task queue ,update the status and do job
if (task.isStatus(TaskStaus.QUEUE)) {
// update the task status to working
task.updateStatus(TaskStaus.WORKING);
} else {
continue;
}
} // at same time, register the owner thread
task.registeTheThread(ct); // output the log
System.out.println("i am ready to do task index:"
+ task.getTaskIndex() + " and thread id is:"
+ ct.getId()); // do task logic using thread sleep instead...
doTaskLogic(); // after task completed successfully
notifyDoneTask(); } catch (Exception ex) {
System.err.println(ex.getMessage());
} finally {
// output the log
// System.out
// .println("i have already do task success completely index:"
// + task.getTaskIndex()
// + " and thread id is:" + ct.getId());
}
}
}
} /**
* notifyDoneTask
*/
private void notifyDoneTask() {
alreadyDoneTasks.getAndIncrement();
} /**
* isAllTaskDone
*
* @return is All Task Done
*/
private boolean isAllTaskDone() {
return (alreadyDoneTasks.get() >= TASK_NUMBER);
} /**
* doTaskLogic
*
* @throws InterruptedException
*/
private void doTaskLogic() throws InterruptedException {
Thread.sleep(R.nextInt(MAX_TLEEP_TIME));
}
}}
如何将TaskInfomation定义成我传入的List集合对应的实体类!
->private static final int TASK_NUMBER = 10000;这里定义了一个task的总数
t.buildTask();只是模拟构建了任务列表。要构建多少的任务,只要修改TASK_NUMBER这个变量就可以了如何将TaskInfomation定义成我传入的List集合对应的实体类!
->
TaskInfomation只是构建这个task对象最基本的几个成员,比如说这个task的状态。这边只是例子,对于实际要灵活运用。
-> 以下修改了代码package cn.sax.test;import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class ThreadTaskManager {
/** the total number of thread **/
private static final int THREAD_NUMBER = 10;
/** the task list that will be execute **/
private static final Queue<TaskInfomation> allTasks = new ConcurrentLinkedQueue<TaskInfomation>();
/** taskIndex **/
private static final AtomicInteger taskIndex = new AtomicInteger();
/** the period of add task to queue **/
private static final int PERIOD_MILLISECONDS = 10; /**
* the entrance of main
*
* @param args
*/
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
// build the instance
final ThreadTaskManager t = new ThreadTaskManager(); // this thread is add the task periodicity
ScheduledExecutorService addTaskService = null;
// thread pool using to invoke all task list
ExecutorService service = null;
try {
addTaskService = Executors.newScheduledThreadPool(1);
addTaskService.scheduleAtFixedRate(new DynamicAddTask(t), 0,
PERIOD_MILLISECONDS, TimeUnit.MILLISECONDS); service = Executors.newFixedThreadPool(THREAD_NUMBER);
service.invokeAll(t.buildTaskExecution()); service.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException ex) {
System.out.println(ex.getMessage());
} finally {
if (service != null) {
service.shutdown();
} if (addTaskService != null) {
addTaskService.shutdown();
}
}
} finally {
long end = System.currentTimeMillis();
System.out.println("execute time " + (end - start) + "ms");
}
} /**
* build the task
*/
private void buildTask() {
final int index = taskIndex.getAndIncrement();
System.out.println("add task and task id is:" + index);
allTasks.add(new TaskInfomation(index));
} /**
* buildTaskExecution
*/
private List<TaskExecution> buildTaskExecution() {
List<TaskExecution> executions = new ArrayList<TaskExecution>();
for (int i = 0; i < THREAD_NUMBER; i++) {
executions.add(new TaskExecution(allTasks));
}
return executions;
} /**
* Dynamic Add Task
*/
static class DynamicAddTask implements Runnable {
private ThreadTaskManager manager; public DynamicAddTask(ThreadTaskManager parameter) {
this.manager = parameter;
} @Override
public void run() {
// create a new thread for build the task dynamicly
manager.buildTask();
}
} /**
* TaskInfomation
*/
static class TaskInfomation implements Serializable {
private static final long serialVersionUID = 9189633146186577956L;
private int taskIndex;
private volatile TaskStaus status;
private transient Thread ownerThread; public TaskInfomation(int taskIndex) {
this.taskIndex = taskIndex;
this.status = TaskStaus.QUEUE;
} public int getTaskIndex() {
return taskIndex;
} public boolean isStatus(TaskStaus status) {
return (this.status == status);
} public void updateStatus(TaskStaus statusPara) {
this.status = statusPara;
} public void registeTheThread(final Thread t) {
if (ownerThread == null) {
ownerThread = t;
}
}
} /**
*
*/
static enum TaskStaus {
QUEUE, WORKING, DONE
} /**
* TaskExecution
*/
static class TaskExecution implements Callable<Object> {
private Queue<TaskInfomation> tasks;
/** the number of already done task **/
private static final AtomicInteger alreadyDoneTasks = new AtomicInteger();
private static final Random R = new Random();
private static final int MAX_TLEEP_TIME = 1000; /**
*
* @param allTasks
* @param alreadyDoneTasks
*/
public TaskExecution(final Queue<TaskInfomation> allTasks) {
this.tasks = allTasks;
} @Override
public Object call() {
final Thread ct = Thread.currentThread(); // do loop until all the task is completed successfully
while (true) { // if isInterrupted, return directly
if (ct.isInterrupted()) {
System.out
.println("the current thread is Interrupted..thread id:"
+ ct.getId());
return null;
} // loop each task and do task
for (final TaskInfomation task : tasks) {
try {
synchronized (task) {
// if task queue ,update the status and do job
if (task.isStatus(TaskStaus.QUEUE)) {
// update the task status to working
task.updateStatus(TaskStaus.WORKING);
} else {
continue;
}
} // at same time, register the owner thread
task.registeTheThread(ct); // output the log
System.out.println("i am ready to do task index:"
+ task.getTaskIndex() + " and thread id is:"
+ ct.getId()); // do task logic using thread sleep instead...
doTaskLogic(); // after task completed successfully
notifyDoneTask(); } catch (Exception ex) {
System.err.println(ex.getMessage());
} finally {
// output the log
// System.out
// .println("i have already do task success completely index:"
// + task.getTaskIndex()
// + " and thread id is:" + ct.getId());
}
}
}
} /**
* notifyDoneTask
*/
private void notifyDoneTask() {
alreadyDoneTasks.getAndIncrement();
} /**
* doTaskLogic
*
* @throws InterruptedException
*/
private void doTaskLogic() throws InterruptedException {
Thread.sleep(R.nextInt(MAX_TLEEP_TIME));
}
}}修改点:
1 一个线程周期动态加入task, 另外的几个线程不停的poll and do task
2 arrayList -> concurrentLinkedQueue, arrayList线程不安全,迭代有问题。
->
传入该方法所在class的instance,就可以了,就像这样:static class DynamicAddTask implements Runnable {
private ThreadTaskManager manager; public DynamicAddTask(ThreadTaskManager parameter) {
this.manager = parameter;
} @Override
public void run() {
// create a new thread for build the task dynamicly
manager.buildTask();
}
}
我加你下,QQ上问你,看了代码还是不明白!
private ThreadTaskManager manager;
public DynamicAddTask(ThreadTaskManager parameter) {
this.manager = parameter;
}
@Override
public void run() {
// create a new thread for build the task dynamicly
manager.buildTask();
}
}
->
拿到这个队列的对象,add一个TaskInformation的对象就可以了。我这边的话,将这个队列做为参数传进去后该如何操作呢
->
如何操作是你自己决定,我这边只是模拟了另外一个线程周期性向这个列队增加任务的操作。
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.njedc.dssportal.meta.rmi.server.iagent.IEtlAgent;public class MyExecutor extends Thread {
String name; public MyExecutor(String s) throws MalformedURLException, RemoteException,
NotBoundException {
this.name = s;
} public void run() {
try {
System.out.println(name);
System.out.println(Thread.currentThread().getName());
System.out.println(" start....");
System.out.println("result:" + s);
Thread.sleep((int) (Math.random() * 1000));
System.out.println(" end...");
} catch (Exception e) {
e.printStackTrace();
}
} public static void main(String args[]) throws MalformedURLException,
RemoteException, NotBoundException {
ExecutorService service = Executors.newFixedThreadPool(4); Queue<String> allTasks = new ConcurrentLinkedQueue<String>();
allTasks.offer("1111");
allTasks.offer("2222");
allTasks.offer("3333");
allTasks.offer("4444");
allTasks.offer("5555");
allTasks.offer("6666"); String str; while ((str = allTasks.poll()) != null) {
service.execute(new MyExecutor(str));
System.out.println(str);
} System.out.println("submit finish");
service.shutdown();
}
}