这个程序的功能主要是监控服务器上的表,如果有需要下发的表的话,就把对应的表的内容打印出来,并且把相关信息记录一下。每个表只打印一次。这样循环执行。
这个程序是跑的通的,但是吧publish()方法的哪个for循环加上就不行了,要报Exception in thread "pool-1-thread-3" java.lang.IllegalMonitorStateException异常。先上代码,大家有空的帮忙分析分析原因。本人初学线程池,可能在锁的应用上不是很熟练,请大家指点。package com.delochi.hao.jiankong;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class DataBasePut implements Runnable{
public static int currExec = 0;       //当前正在运行的线程数
public static int maxExec = 4;  //线程池的最大容量
private static boolean certain = true;
private static String sql = null;
private static String pro_Name = "";
private static String areaName = "marid";
private static Connection conn = null;
public static Map<String,String> currExecMap = new HashMap<String,String>();
 
public DataBasePut(){
conn = new DbConnection().createConn(); 
//st = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_UPDATABLE);
}
    
public List<String> checkRun(){
List<String> table = new ArrayList<String>();
if(!certain){
try {
Thread.currentThread().wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
certain = false;
String sql ="select t_name from A_abc where isrun=1"+
                "and t_name not in(select pro_name from pro_table)";
try {
Statement st = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_UPDATABLE);
ResultSet rs = null;
    rs = st.executeQuery(sql);
    if(!rs.next()){
System.out.println("没有检测到需要下发的表!"+Thread.currentThread().getName());
    }else{
     rs.beforeFirst();
     System.out.println("状态为1且未下发的表有:"+Thread.currentThread().getName());
    while(rs.next()){
table.add(rs.getString("t_name"));
System.out.println(rs.getString("t_name"));
    }
    }
}catch(SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("*********************************");
}
return table;
}

public boolean checkIsrun(String pro_Name){
String sql = "select pro_name from pro_table where pro_name='"+pro_Name+"'";
PreparedStatement ps;
try {
ps = conn.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
if(rs.next()){
return false;
}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
public synchronized void addExec(String pro_Name){
currExec++;
currExecMap.put(pro_Name, Thread.currentThread().getName());
certain = true;
notify();
}
public static void reduceExec(String pro_Name){
currExec--;
currExecMap.remove(pro_Name);
}

public void run(){
List<String> table = new ArrayList<String>();
table = checkRun();
System.out.println("table.size = "+table.size());
if(table.size() > 0){
sql = "select * from "+ table.get(0);
pro_Name = String.valueOf(table.get(0));
if(!currExecMap.containsKey(pro_Name) &&  checkIsrun(pro_Name))
{
addExec(pro_Name);
    //notify();
System.out.println("正在下发:"+table.get(0));
    publish(sql,pro_Name);
}
}else{
synchronized(this){
certain = true;
notify();
}
}
}

public static void addPro(String pro_Name){
String sql = "insert into pro_table values('"+pro_Name+"')";
PreparedStatement ps;
try {
ps = new DbConnection().createConn().prepareStatement(sql);
ps.executeUpdate();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
public static void currExecMap(){
if(currExecMap.isEmpty()){
System.out.println("--------正在下发 "+ currExecMap.size() +"个---------");
for(String procName:currExecMap.keySet()){
System.out.println("["+areaName+"]\t["+currExecMap.get(procName)+"]\t"+procName);
}
}
}
public static void publish(String sql, String pro_Name){
String sqla = null;
int ida = 0;
try{
Statement ps = conn.createStatement();
//ResultSet rsService = ps.executeQuery("select SYS_CONTEXT('USERENV','HOST') as clientname,SYS_CONTEXT('USERENV', 'IP_ADDRESS') as clientip from dual");
//rsService.next();
ResultSet rs1 = ps.executeQuery("select max(tid) as id from put_log");
if(rs1.next()){
ida = (rs1.getInt("id")) + 1;
sqla = "insert into put_log(tid,area,pro,startdate,clientname,clientip) values(" + ida + ",'" + areaName + "','" + pro_Name + "',SYSDATE,'xxxxxxxx','127.0.0.1')";
}else{
sqla = "insert into put_log(tid,area,pro,startdate,clientname,clientip) values(" + ida + ",'" + areaName + "','" + pro_Name + "',SYSDATE,'xxxxxxxx','127.0.0.1')";
}
ResultSet rs = ps.executeQuery(sql);
while(rs.next()){
System.out.println(rs.getString("u_name")+rs.getString("u_course")+rs.getInt("u_score")+"------"+Thread.currentThread().getName());
ps.executeUpdate(sqla);
System.out.println("下发过程日志存储成功!");
}
for(int i = 0; i<1000000; i++){
System.out.println(pro_Name);
}
System.out.println(pro_Name+"-------下发完毕");
         addPro(pro_Name);
        reduceExec(pro_Name);
        rs.close();
}catch(Exception e){
e.printStackTrace();
}
}

public static void main(String[] args){
ExecutorService exec = Executors.newFixedThreadPool(maxExec);
DataBasePut dbp = new DataBasePut();
while(true){
//create logfile
for(;DataBasePut.currExec < maxExec;){
exec.execute(dbp);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(currExec == maxExec){
currExecMap();
//noPutTable();
try {
TimeUnit.SECONDS.sleep(10*1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}

解决方案 »

  1.   

    发现三个问题。第一个:
      Thread.currentThread().wait();
    当你要针对某对象调用 wait(),必须先对该对象进行 synchronized,具体参见API说明:The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:synchronized (obj) {
       while (<condition does not hold>) obj.wait();
       ... // Perform action appropriate to condition
    }
    第二个问题:
    是一个神奇的逻辑问题,客观地说我没有细看,因为你没有用代码排版,这看起来比较辛苦。
    但是你对当前线程执行 wait() 操作,那么还能有谁去notify它呢?第三个问题:
    你notify()的都是 this 对象;跟所wait的当前线程也没有任何关系。
    所以,神奇的觉得你的代码还能运行真是太奇怪了。
      

  2.   

    没排版,大概看了下首先,问题1如2L所说,程序能跑起来,是因为 Thread.currentThread().wait();时抛出异常被捕捉,但是程序不会中断其次,对于一些static方法,直接修改static变量,在多线程的情况下,这些变量还能保证同步吗?