==========求最优解决方案~!!!!!!!!!!! =============== 现有三台WEB服务器:A、B、C,两台做了集群的数据库服务器DB1、DB2。三台WEB服务器都部署了相同的程序,用于运行后台任务(比如数据校验、数据统计等)。数据库中有个任务分配表TASK_TAB(ID,任务类型,锁控制字段…),每个任务对应一个任务线程。
要求
1、避免三台服务器处理TASK_TAB表中同一个任务,且每个任务都能确保执行。
2、尽量做到负载均衡,避免有些服务器忙,有些服务器却闲置。
3、最好能用线程池管理任务线程。 目前我的设计是这样。后台程序有一主线程,主线程负责创建一任务生产者线程和一任务消费者线程。
生产者线程负责不断从数据库任务分配表TASK_TAB中读出未被锁定(锁控制字段=0)的记录,根据任务类型反射生产任务类,更新数据库锁控制字段=1,然后将任务类放入任务队列。
消费者线程维护一线程池,将任务队列中的任务交由线程池管理。问:
1、对要求1,通过任务表中TASK_TAB加锁控制字段来控制是否合理?是否有更好的方法?
2、对要求3,应该用什么样的线程池最好?是否需要生产消费者模式?
下面是我初步设计的代码。/**
* 主线程
* @author
*
*/
public class MainThread { /**
* @param args
*/
public void schedue() { // 任务队列
BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10, true); // 线程池
ExecutorService excutor = new ThreadPoolExecutor(5, 20, 30,
TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy()); new Thread(new TaskProducer(queue)).start(); new Thread(new TaskCustomer(excutor, queue)).start(); }}
/**
* 任务生产者
* @author
*
*/
public class TaskProducer implements Runnable { BlockingQueue<Task> queue;
TaskSevice taskSevice;
public TaskProducer(BlockingQueue<Task> queue){
this.queue = queue;
}
public void run () {
while (true) {
// 从数据库中获取未被锁定任务
List<Task> list = taskSevice.getTaskAndInstance();
for (Task task : list) {
if (task.lock()) {
try {
queue.put(task);// 放入队列
} catch (InterruptedException e) {
task.unLock();
}
}
}
}
}
}/**
* 任务消费者
* @author
*
*/
public class TaskCustomer implements Runnable {
Executor executor;
BlockingQueue<Task> queue;
public TaskCustomer(Executor executor,BlockingQueue<Task> queue){
this.executor = executor;
this.queue = queue;
} public void run() {
// 从任务队列中取任务
while (true) {
Task task = null;
try {
while ((task = queue.take()) != null) {
executor.execute(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
if (task != null)
task.unLock();
}
}
}
}
要求
1、避免三台服务器处理TASK_TAB表中同一个任务,且每个任务都能确保执行。
2、尽量做到负载均衡,避免有些服务器忙,有些服务器却闲置。
3、最好能用线程池管理任务线程。 目前我的设计是这样。后台程序有一主线程,主线程负责创建一任务生产者线程和一任务消费者线程。
生产者线程负责不断从数据库任务分配表TASK_TAB中读出未被锁定(锁控制字段=0)的记录,根据任务类型反射生产任务类,更新数据库锁控制字段=1,然后将任务类放入任务队列。
消费者线程维护一线程池,将任务队列中的任务交由线程池管理。问:
1、对要求1,通过任务表中TASK_TAB加锁控制字段来控制是否合理?是否有更好的方法?
2、对要求3,应该用什么样的线程池最好?是否需要生产消费者模式?
下面是我初步设计的代码。/**
* 主线程
* @author
*
*/
public class MainThread { /**
* @param args
*/
public void schedue() { // 任务队列
BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10, true); // 线程池
ExecutorService excutor = new ThreadPoolExecutor(5, 20, 30,
TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy()); new Thread(new TaskProducer(queue)).start(); new Thread(new TaskCustomer(excutor, queue)).start(); }}
/**
* 任务生产者
* @author
*
*/
public class TaskProducer implements Runnable { BlockingQueue<Task> queue;
TaskSevice taskSevice;
public TaskProducer(BlockingQueue<Task> queue){
this.queue = queue;
}
public void run () {
while (true) {
// 从数据库中获取未被锁定任务
List<Task> list = taskSevice.getTaskAndInstance();
for (Task task : list) {
if (task.lock()) {
try {
queue.put(task);// 放入队列
} catch (InterruptedException e) {
task.unLock();
}
}
}
}
}
}/**
* 任务消费者
* @author
*
*/
public class TaskCustomer implements Runnable {
Executor executor;
BlockingQueue<Task> queue;
public TaskCustomer(Executor executor,BlockingQueue<Task> queue){
this.executor = executor;
this.queue = queue;
} public void run() {
// 从任务队列中取任务
while (true) {
Task task = null;
try {
while ((task = queue.take()) != null) {
executor.execute(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
if (task != null)
task.unLock();
}
}
}
}
解决方案 »
- hibernate annotation 怎么设置主键自动增长?mysql数据库
- javax.servlet.ServletException: Request[/admin/user/user] does not contain handl
- Spring结合MVC类加载的问题
- 请问jsp中,如何设置上传文件的类型?
- 一段JS代码问题.
- 求分页的代码 最好是用Hibernate+jsp来实现
- 关于JSP中使用javaBean的一个不明白的问题,请各位指点~
- 用到List.add(object)方法,怎么NullPointerException
- 怎样解决JSP下中文显示乱码问题呢?
- servlet能调用Activex控件吗?
- 大家帮忙看看
- 请教个通过链接分类显示信息的问题,请看内容
恐怕这个状态表不好定义。我现在只要求能做到,每一台服务器固定线程池大小,有多少空闲线程就从任务分配表中拿多少任务。尽量避免服务器缓冲过多任务,比如A服务器线程满后,将过多任务线程放入缓冲队列。所以我这里定制了SynchronousQueue作为线程池的缓冲队列。
但是我现在发现个问题,我希望线程池满后,自动阻塞消费者线程,所以我采用了ThreadPoolExecutor.CallerRunsPolicy策略,但我发现当线程池满时,最后一个添加的任务把消费者线程阻塞了,直到该线程任务结束后才能唤醒消费者线程,导致当其他任务线程结束后无法再添加任务线程到线程池。