需求:
现有一个List,List中的数据作为线程所要调用的方法中的参数,现在需要通过多线程来运行调用方法,方法执行完后返回参数然后再去跑List中的其他数据,每次跑的时候最多4个线程,直到List中数据迭代完!
求思路,求代码,求示例!

解决方案 »

  1.   

    自己写的samplepackage cn.sax.test;import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;public class ThreadTaskManager {
        /** the total number of task **/
        private static final int TASK_NUMBER = 1000;
        /** the total number of thread **/
        private static final int THREAD_NUMBER = 100;
        /** the task list that will be execute **/
        private static final List<TaskInfomation> allTasks = new ArrayList<TaskInfomation>(
                TASK_NUMBER);    /**
         * the entrance of main
         * 
         * @param args
         */
        public static void main(String[] args) {
            // build the instance
            final ThreadTaskManager t = new ThreadTaskManager();        // build the task
            t.buildTask();        // thread pool
            ExecutorService service = null;
            try {
                service = Executors.newFixedThreadPool(THREAD_NUMBER);
                service.invokeAll(t.buildTaskExecution());        } catch (InterruptedException ex) {
                System.out.println(ex.getMessage());
            } finally {
                if (service != null) {
                    service.shutdown();
                }
            }
        }    /**
         * build the task
         */
        private void buildTask() {
            for (int i = 0; i < TASK_NUMBER; i++) {
                allTasks.add(new TaskInfomation(i));
            }
            Collections.synchronizedList(allTasks);
        }    /**
         * buildTaskExecution
         */
        private List<TaskExecution> buildTaskExecution() {
            List<TaskExecution> executions = new ArrayList<TaskExecution>();
            for (int i = 0; i < THREAD_NUMBER; i++) {
                executions.add(new TaskExecution(allTasks));
            }
            return executions;
        }    /**
         * TaskInfomation
         */
        static class TaskInfomation implements Serializable {
            private static final long serialVersionUID = 9189633146186577956L;
            private int taskIndex;
            private volatile TaskStaus status;
            private transient Thread ownerThread;        public TaskInfomation(int taskIndex) {
                this.taskIndex = taskIndex;
                this.status = TaskStaus.QUEUE;
            }        public int getTaskIndex() {
                return taskIndex;
            }        public boolean isStatus(TaskStaus status) {
                return (this.status == status);
            }        public void updateStatus(TaskStaus statusPara) {
                this.status = statusPara;
            }        public void registeTheThread(final Thread t) {
                if (ownerThread == null) {
                    ownerThread = t;
                }
            }
        }    /**
         * 
         */
        static enum TaskStaus {
            QUEUE, WORKING, DONE
        }    /**
         * TaskExecution
         */
        static class TaskExecution implements Callable<Object> {
            private List<TaskInfomation> tasks;
            /** the number of already done task **/
            private static final AtomicInteger alreadyDoneTasks = new AtomicInteger();
            private static final Random R = new Random();
            private static final int MAX_TLEEP_TIME = 1;        /**
             * 
             * @param allTasks
             * @param alreadyDoneTasks
             */
            public TaskExecution(final List<TaskInfomation> allTasks) {
                this.tasks = allTasks;
            }        @Override
            public Object call() {
                final Thread ct = Thread.currentThread();            // do loop until all the task is completed successfully
                while (true) {                // if isInterrupted, return directly
                    if (ct.isInterrupted()) {
                        System.out
                                .println("the current thread is Interrupted..thread id:"
                                        + ct.getId());
                        return null;
                    }                // if the all job is done, return directly
                    if (isAllTaskDone()) {
                        System.out.println("all the task is already done..");
                        return null;
                    }                // loop each task and do task
                    for (final TaskInfomation task : tasks) {
                        try {
                            synchronized (task) {
                                // if task queue ,update the status and do job
                                if (task.isStatus(TaskStaus.QUEUE)) {
                                    // update the task status to working
                                    task.updateStatus(TaskStaus.WORKING);
                                } else {
                                    continue;
                                }
                            }                        // at same time, register the owner thread
                            task.registeTheThread(ct);                        // output the log
                            System.out.println("i am ready to do task index:"
                                    + task.getTaskIndex() + "   and thread id is:"
                                    + ct.getId());                        // do task logic using thread sleep instead...
                            doTaskLogic();                        // after task completed successfully
                            notifyDoneTask();                    } catch (Exception ex) {
                            System.err.println(ex.getMessage());
                        } finally {
                            // output the log
                            // System.out
                            // .println("i have already do task success completely index:"
                            // + task.getTaskIndex()
                            // + " and thread id is:" + ct.getId());
                        }
                    }
                }
            }        /**
             * notifyDoneTask
             */
            private void notifyDoneTask() {
                alreadyDoneTasks.getAndIncrement();
            }        /**
             * isAllTaskDone
             * 
             * @return is All Task Done
             */
            private boolean isAllTaskDone() {
                return (alreadyDoneTasks.get() >= TASK_NUMBER);
            }        /**
             * doTaskLogic
             * 
             * @throws InterruptedException
             */
            private void doTaskLogic() throws InterruptedException {
                Thread.sleep(R.nextInt(MAX_TLEEP_TIME));
            }
        }}
      

  2.   

    麻烦问下,我想将任务的总数定义为我传入List集合的大小,该如何去定义?
    如何将TaskInfomation定义成我传入的List集合对应的实体类!
      

  3.   

    我想将任务的总数定义为我传入List集合的大小,该如何去定义?
    ->private static final int TASK_NUMBER = 10000;这里定义了一个task的总数
    t.buildTask();只是模拟构建了任务列表。要构建多少的任务,只要修改TASK_NUMBER这个变量就可以了如何将TaskInfomation定义成我传入的List集合对应的实体类!
    ->
    TaskInfomation只是构建这个task对象最基本的几个成员,比如说这个task的状态。这边只是例子,对于实际要灵活运用。
      

  4.   

    我的意思是说TASK_NUMBER是根据我传入的List集合的size来动态变化的。
      

  5.   

    我的意思是说TASK_NUMBER是根据我传入的List集合的size来动态变化的。
    -> 以下修改了代码package cn.sax.test;import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Queue;
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;public class ThreadTaskManager {
        /** the total number of thread **/
        private static final int THREAD_NUMBER = 10;
        /** the task list that will be execute **/
        private static final Queue<TaskInfomation> allTasks = new ConcurrentLinkedQueue<TaskInfomation>();
        /** taskIndex **/
        private static final AtomicInteger taskIndex = new AtomicInteger();
        /** the period of add task to queue **/
        private static final int PERIOD_MILLISECONDS = 10;    /**
         * the entrance of main
         * 
         * @param args
         */
        public static void main(String[] args) {
            long start = System.currentTimeMillis();
            try {
                // build the instance
                final ThreadTaskManager t = new ThreadTaskManager();            // this thread is add the task periodicity
                ScheduledExecutorService addTaskService = null;
                // thread pool using to invoke all task list
                ExecutorService service = null;
                try {
                    addTaskService = Executors.newScheduledThreadPool(1);
                    addTaskService.scheduleAtFixedRate(new DynamicAddTask(t), 0,
                            PERIOD_MILLISECONDS, TimeUnit.MILLISECONDS);                service = Executors.newFixedThreadPool(THREAD_NUMBER);
                    service.invokeAll(t.buildTaskExecution());                service.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
                } catch (InterruptedException ex) {
                    System.out.println(ex.getMessage());
                } finally {
                    if (service != null) {
                        service.shutdown();
                    }                if (addTaskService != null) {
                        addTaskService.shutdown();
                    }
                }
            } finally {
                long end = System.currentTimeMillis();
                System.out.println("execute time " + (end - start) + "ms");
            }
        }    /**
         * build the task
         */
        private void buildTask() {
            final int index = taskIndex.getAndIncrement();
            System.out.println("add task and task id is:" + index);
            allTasks.add(new TaskInfomation(index));
        }    /**
         * buildTaskExecution
         */
        private List<TaskExecution> buildTaskExecution() {
            List<TaskExecution> executions = new ArrayList<TaskExecution>();
            for (int i = 0; i < THREAD_NUMBER; i++) {
                executions.add(new TaskExecution(allTasks));
            }
            return executions;
        }    /**
         * Dynamic Add Task
         */
        static class DynamicAddTask implements Runnable {
            private ThreadTaskManager manager;        public DynamicAddTask(ThreadTaskManager parameter) {
                this.manager = parameter;
            }        @Override
            public void run() {
                // create a new thread for build the task dynamicly
                manager.buildTask();
            }
        }    /**
         * TaskInfomation
         */
        static class TaskInfomation implements Serializable {
            private static final long serialVersionUID = 9189633146186577956L;
            private int taskIndex;
            private volatile TaskStaus status;
            private transient Thread ownerThread;        public TaskInfomation(int taskIndex) {
                this.taskIndex = taskIndex;
                this.status = TaskStaus.QUEUE;
            }        public int getTaskIndex() {
                return taskIndex;
            }        public boolean isStatus(TaskStaus status) {
                return (this.status == status);
            }        public void updateStatus(TaskStaus statusPara) {
                this.status = statusPara;
            }        public void registeTheThread(final Thread t) {
                if (ownerThread == null) {
                    ownerThread = t;
                }
            }
        }    /**
         * 
         */
        static enum TaskStaus {
            QUEUE, WORKING, DONE
        }    /**
         * TaskExecution
         */
        static class TaskExecution implements Callable<Object> {
            private Queue<TaskInfomation> tasks;
            /** the number of already done task **/
            private static final AtomicInteger alreadyDoneTasks = new AtomicInteger();
            private static final Random R = new Random();
            private static final int MAX_TLEEP_TIME = 1000;        /**
             * 
             * @param allTasks
             * @param alreadyDoneTasks
             */
            public TaskExecution(final Queue<TaskInfomation> allTasks) {
                this.tasks = allTasks;
            }        @Override
            public Object call() {
                final Thread ct = Thread.currentThread();            // do loop until all the task is completed successfully
                while (true) {                // if isInterrupted, return directly
                    if (ct.isInterrupted()) {
                        System.out
                                .println("the current thread is Interrupted..thread id:"
                                        + ct.getId());
                        return null;
                    }                // loop each task and do task
                    for (final TaskInfomation task : tasks) {
                        try {
                            synchronized (task) {
                                // if task queue ,update the status and do job
                                if (task.isStatus(TaskStaus.QUEUE)) {
                                    // update the task status to working
                                    task.updateStatus(TaskStaus.WORKING);
                                } else {
                                    continue;
                                }
                            }                        // at same time, register the owner thread
                            task.registeTheThread(ct);                        // output the log
                            System.out.println("i am ready to do task index:"
                                    + task.getTaskIndex() + "    and thread id is:"
                                    + ct.getId());                        // do task logic using thread sleep instead...
                            doTaskLogic();                        // after task completed successfully
                            notifyDoneTask();                    } catch (Exception ex) {
                            System.err.println(ex.getMessage());
                        } finally {
                            // output the log
                            // System.out
                            // .println("i have already do task success completely index:"
                            // + task.getTaskIndex()
                            // + " and thread id is:" + ct.getId());
                        }
                    }
                }
            }        /**
             * notifyDoneTask
             */
            private void notifyDoneTask() {
                alreadyDoneTasks.getAndIncrement();
            }        /**
             * doTaskLogic
             * 
             * @throws InterruptedException
             */
            private void doTaskLogic() throws InterruptedException {
                Thread.sleep(R.nextInt(MAX_TLEEP_TIME));
            }
        }}修改点:
    1 一个线程周期动态加入task, 另外的几个线程不停的poll and do task
    2 arrayList -> concurrentLinkedQueue, arrayList线程不安全,迭代有问题。
      

  6.   

    还有我想问下如果在线程中调用外部方法,该如何实现?
    ->
    传入该方法所在class的instance,就可以了,就像这样:static class DynamicAddTask implements Runnable {
            private ThreadTaskManager manager;        public DynamicAddTask(ThreadTaskManager parameter) {
                this.manager = parameter;
            }        @Override
            public void run() {
                // create a new thread for build the task dynamicly
                manager.buildTask();
            }
        }
      

  7.   

    shnulaa,大哥 你有QQ么?
    我加你下,QQ上问你,看了代码还是不明白!
      

  8.   

    static class DynamicAddTask implements Runnable {
            private ThreadTaskManager manager;
     
            public DynamicAddTask(ThreadTaskManager parameter) {
                this.manager = parameter;
            }
     
            @Override
            public void run() {
                // create a new thread for build the task dynamicly
                manager.buildTask();
            }
        }
      

  9.   

    现在我已经将arrayList中数据转为concurrentLinkedQueue,在上述的代码中我该如何去操作这个队列数据。我看代码中是先new了个任务的队列对象然后是不断往里面add,我这边的话,将这个队列做为参数传进去后该如何操作呢?
      

  10.   

    现在我已经将arrayList中数据转为concurrentLinkedQueue,在上述的代码中我该如何去操作这个队列数据
    ->
    拿到这个队列的对象,add一个TaskInformation的对象就可以了。我这边的话,将这个队列做为参数传进去后该如何操作呢
    ->
    如何操作是你自己决定,我这边只是模拟了另外一个线程周期性向这个列队增加任务的操作。
      

  11.   

    我自己写了段代码,麻烦看下有问题没!import java.net.MalformedURLException;
    import java.rmi.NotBoundException;
    import java.rmi.RemoteException;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import com.njedc.dssportal.meta.rmi.server.iagent.IEtlAgent;public class MyExecutor extends Thread {
    String name; public MyExecutor(String s) throws MalformedURLException, RemoteException,
    NotBoundException {
    this.name = s;
    } public void run() {
    try {
                            System.out.println(name);
    System.out.println(Thread.currentThread().getName());
    System.out.println(" start....");
    System.out.println("result:" + s);
    Thread.sleep((int) (Math.random() * 1000));
    System.out.println(" end...");
    } catch (Exception e) {
    e.printStackTrace();
    }
    } public static void main(String args[]) throws MalformedURLException,
    RemoteException, NotBoundException {
    ExecutorService service = Executors.newFixedThreadPool(4); Queue<String> allTasks = new ConcurrentLinkedQueue<String>();
    allTasks.offer("1111");
    allTasks.offer("2222");
    allTasks.offer("3333");
    allTasks.offer("4444");
    allTasks.offer("5555");
    allTasks.offer("6666"); String str; while ((str = allTasks.poll()) != null) {
    service.execute(new MyExecutor(str));
    System.out.println(str);
    } System.out.println("submit finish");
    service.shutdown();
    }
    }