public abstract class Task { public enum State { /* 新建 */NEW, /* 执行中 */RUNNING, /* 已完成 */FINISHED; } // 任务状态 private State state = State.NEW; public void setState(State state) { this.state = state; } public State getState() { return state; } public abstract void deal(); } public class TaskQueue {
private List<Task> queue = new LinkedList<Task>(); // 添加一项任务 public synchronized void addTask(Task task) { if (task != null) { queue.add(task); } } // 完成任务后将它从任务队列中删除 public synchronized void finishTask(Task task) { if (task != null) { task.setState(Task.State.FINISHED); queue.remove(task); } } // 取得一项待执行任务 public synchronized Task getTask() { Iterator<Task> it = queue.iterator(); Task task; while (it.hasNext()) { task = it.next(); // 寻找一个新建的任务 if (Task.State.NEW.equals(task.getState())) { // 把任务状态置为运行中 task.setState(Task.State.RUNNING); return task; } } return null; } } public class ThreadPoolService { // 线程数 public static final int THREAD_COUNT = 5; // 线程池状态 private Status status = Status.NEW; private TaskQueue queue = new TaskQueue(); public enum Status { /* 新建 */NEW, /* 提供服务中 */RUNNING, /* 停止服务 */TERMINATED, } private List<Thread> threads = new ArrayList<Thread>(); public ThreadPoolService() { for (int i = 0; i < THREAD_COUNT; i++) { Thread t = new TaskThread(this); threads.add(t); } } // 启动服务 public void start() { this.status = Status.RUNNING; for (int i = 0; i < THREAD_COUNT; i++) { threads.get(i).start(); } } // 停止服务 public void stop() { this.status = Status.TERMINATED; } // 是否正在运行 public boolean isRunning() { return status == Status.RUNNING; } // 执行任务 public void runTask(Task task) { queue.addTask(task); } protected TaskQueue getTaskQueue() { return queue; } } public class TaskThread extends Thread { // 该线程所属的线程池 private ThreadPoolService service; public TaskThread(ThreadPoolService tps) { service = tps; } public void run() { // 在线程池运行的状态下执行任务队列中的任务 while (service.isRunning()) { TaskQueue queue = service.getTaskQueue(); Task task = queue.getTask(); if (task != null) { task.deal(); } queue.finishTask(task); } } }public class SimpleTaskTest extends Task { @Override public void deal() { // do something System.out.println(Thread.currentThread().getName() + " is running...."); try { Thread.sleep(100); } catch (Exception e) { } } public static void main(String[] args) throws InterruptedException { ThreadPoolService service = new ThreadPoolService(); service.start(); // 执行十次任务 for (int i = 0; i < 10; i++) { service.runTask(new SimpleTaskTest()); } // 睡眠1秒钟,等待所有任务执行完毕 Thread.sleep(1000); service.stop(); } }
public abstract class Task {
public enum State {
/* 新建 */NEW,
/* 执行中 */RUNNING,
/* 已完成 */FINISHED;
}
// 任务状态
private State state = State.NEW; public void setState(State state) {
this.state = state;
} public State getState() {
return state;
} public abstract void deal();
}
public class TaskQueue {
private List<Task> queue = new LinkedList<Task>(); // 添加一项任务
public synchronized void addTask(Task task) {
if (task != null) {
queue.add(task);
}
} // 完成任务后将它从任务队列中删除
public synchronized void finishTask(Task task) { if (task != null) {
task.setState(Task.State.FINISHED);
queue.remove(task);
}
} // 取得一项待执行任务
public synchronized Task getTask() {
Iterator<Task> it = queue.iterator();
Task task;
while (it.hasNext()) {
task = it.next();
// 寻找一个新建的任务
if (Task.State.NEW.equals(task.getState())) {
// 把任务状态置为运行中
task.setState(Task.State.RUNNING);
return task;
}
}
return null;
}
}
public class ThreadPoolService {
// 线程数
public static final int THREAD_COUNT = 5;
// 线程池状态
private Status status = Status.NEW;
private TaskQueue queue = new TaskQueue(); public enum Status {
/* 新建 */NEW,
/* 提供服务中 */RUNNING,
/* 停止服务 */TERMINATED,
} private List<Thread> threads = new ArrayList<Thread>(); public ThreadPoolService() {
for (int i = 0; i < THREAD_COUNT; i++) {
Thread t = new TaskThread(this);
threads.add(t);
}
} // 启动服务
public void start() {
this.status = Status.RUNNING;
for (int i = 0; i < THREAD_COUNT; i++) {
threads.get(i).start();
}
} // 停止服务
public void stop() {
this.status = Status.TERMINATED;
} // 是否正在运行
public boolean isRunning() {
return status == Status.RUNNING;
} // 执行任务
public void runTask(Task task) {
queue.addTask(task);
} protected TaskQueue getTaskQueue() {
return queue;
}
}
public class TaskThread extends Thread {
// 该线程所属的线程池
private ThreadPoolService service; public TaskThread(ThreadPoolService tps) {
service = tps;
} public void run() {
// 在线程池运行的状态下执行任务队列中的任务
while (service.isRunning()) {
TaskQueue queue = service.getTaskQueue();
Task task = queue.getTask();
if (task != null) {
task.deal();
}
queue.finishTask(task);
}
}
}public class SimpleTaskTest extends Task {
@Override
public void deal() {
// do something
System.out.println(Thread.currentThread().getName() + " is running....");
try {
Thread.sleep(100);
} catch (Exception e) {
}
} public static void main(String[] args) throws InterruptedException {
ThreadPoolService service = new ThreadPoolService();
service.start();
// 执行十次任务
for (int i = 0; i < 10; i++) {
service.runTask(new SimpleTaskTest());
}
// 睡眠1秒钟,等待所有任务执行完毕
Thread.sleep(1000);
service.stop();
}
}