用JOTM写的分布式事务。 能提交,能回滚,但有个重大问题。在一个业务方法中插入2个表,每表一条记录。多线程并发(10线程 乘以 50次)的访问此业务方法,两个表中的总记录数不一样。一个少10条,一个少2~3条。
业务方法补贴了,贴上事务实现。代码1:业务方法代理类,代理所有的业务方法。package cn.tsb.comm.db.transaction;import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.EmptyStackException;
import java.util.Stack;import javax.transaction.UserTransaction;import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;import org.apache.log4j.Logger;/* 
 * Copyright (C) 2012 TSB
 * All Rights Reserved 
 * Description: 方法拦截,事务管理
 * 
 * Modification History: 
 **********************************************************
 * Date       Author   Comments
 **********************************************************
 * 2012/9/19      Init Version
 */
public abstract class TransactionProxy implements MethodInterceptor {

private static Logger log = Logger.getLogger(TransactionProxy.class); private String ERROR = TMConstants.ERROR+Thread.currentThread().getId()+" | ";
private String DEBUG = TMConstants.DEBUG+Thread.currentThread().getId()+" | ";
//执行堆,以此实现事务的原子性,为空是进入事务开始,再次为空时事务结束
private static final ThreadLocal<Stack<String>> threadLocal = new ThreadLocal<Stack<String>>();

private String methodname = "";



public Object intercept(Object obj, Method method, Object[] args,
MethodProxy proxy) throws Throwable {

methodname = method.getName();

Object ret = null;

if(method.getName().equals("finalize") || method.getName().equals("count")){
ret = proxy.invokeSuper(obj, args);
}else{
try {
// for each new invoke, push stack
if (isEmpty()) {
//begin();
//本来这里应该ut.begin()的。但为了保证事务的开始在获取连接(第一个)之后,所以这里只打印日志。 
//有一种情况会出现:假设一个业务方法需要两个连接,第一个连接打开之后,第二个连接打开之前,会ut.begin,不知道是不是这个原因
preBegin();
}
push(method.getName()); //log.debug(DEBUG+"method="+method.getName()+" os="+(threadLocal.get()==null?"0":threadLocal.get().size())+" ts="+(transactionLocal.get()==null?"0":transactionLocal.get().size())+" cs="+(connLocal.get()==null?"0":connLocal.get().size()));
ret = proxy.invokeSuper(obj, args); // do business // finish once, pop up
methodname = pop();
// check whether commit the transaction
if (isEmpty()) {
commit();
//提交
}
} catch (Throwable e) {
// if it is in the inner of the invoke chain, throw exception
// directly.
methodname = pop(); if (isEmpty()) {
rollback();
//回滚
}
throw e;
} finally {
// check whether close the session
if (isEmpty()) {
close();
//关闭所有连接
threadLocal.remove();
}
}
}
return ret;
} private void push(String name) {
Stack<String> invokingStack = threadLocal.get() == null ? new Stack<String>()
: threadLocal.get();
invokingStack.push(name);
if (threadLocal.get() == null)
threadLocal.set(invokingStack);
} private String pop() {
Stack<String> invokingStack = threadLocal.get();
if (invokingStack != null) {
try {
return invokingStack.pop();
} catch (EmptyStackException e) {
return "Empty invokingStack";
}
}else{
return "Empty invokingStack";
}
} private boolean isEmpty() {
Stack<String> invokingStack = threadLocal.get();
if (invokingStack == null)
return true;
return invokingStack.isEmpty();
}

private void preBegin(){
TransactionManager tm = new TransactionManager();
tm.preBegin(methodname);
}

private void begin(){
TransactionManager tm = new TransactionManager();
tm.begin(methodname);
}

private void commit(){
TransactionManager tm = new TransactionManager();
tm.commit(methodname);
}

private void rollback(){
TransactionManager tm = new TransactionManager();
tm.rollback(methodname);
}

private void close(){
try{
Stack<Connection> stack = getConnection();
for(Connection con:stack){
con.close();
if(log.isDebugEnabled()){
log.debug(DEBUG+"连接已关闭.|"+methodname);
}
}
clear();
}catch(Throwable e){
log.error(ERROR+"**************** ERROR:连接无法正常关闭.  ********************|"+methodname,e);
throw new RuntimeException("连接无法正常关闭.|"+methodname+"|"+e.getMessage(),e);
}
}

private void clear(){
clearConnection(methodname);
}

//三个抽象方法,子类中实现,用户获取事务和连接
public abstract Stack<Connection> getConnection();
public abstract void clearConnection(String methodname);
public abstract UserTransaction getUserTransaction();
}代码2:代理类的子类,只是从工厂中后去事务和连接。连接是存储在事务工厂本地线程中,事务直接从JNDI中获取。package cn.tsb.comm.db.transaction;import java.sql.Connection;
import java.util.Stack;import javax.transaction.UserTransaction;public class WebTransactionProxy extends TransactionProxy{

@Override
public UserTransaction getUserTransaction() {
return ConnectionFactory.getUserTransaction();
} @Override
public void clearConnection(String name) {
ConnectionFactory.clear(name);
} @Override
public Stack<Connection> getConnection() {
return ConnectionFactory.getConnection();
}
}代码3:事务管理器,因为ut是从JNDI中获取的(JOTM实现时,也是存储在本地线程中,所以使用事务管理器的时候,new 一个就行了package cn.tsb.comm.db.transaction;import javax.transaction.UserTransaction;import org.apache.log4j.Logger;public class TransactionManager {

private String ERROR = TMConstants.ERROR+Thread.currentThread().getId()+" | ";
private String DEBUG = TMConstants.DEBUG+Thread.currentThread().getId()+" | ";

private static Logger log = Logger.getLogger(TransactionManager.class);

public TransactionManager(){

}

public void preBegin(String methodname) {
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务准备.|"+methodname);
}
}

public void begin(){
this.begin("");
}

public void begin(String methodname){
try {
UserTransaction ut = ConnectionFactory.getUserTransaction();
if(ut!=null){
ut.setTransactionTimeout(1000);
ut.begin();
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务开始.|"+methodname);
}
}
} catch (Throwable e) {
log.error(ERROR+"**************** ERROR:事务无法正常开始.  ********************|",e);
//throw new RuntimeException("事务无法开始.|"+e.getMessage(),e);
}
}

public void commit(){
this.commit("");
}

public void commit(String methodname){
UserTransaction ut = null;
try{
ut = ConnectionFactory.getUserTransaction();
if(ut!=null){
ut.commit();
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务已提交.|"+methodname);
}
}
} catch (Throwable e) {
log.error(ERROR+"**************** ERROR:事务无法正常提交.  ********************|"+methodname,e);
//throw new RuntimeException("事务无法正常提交.|"+methodname+"|"+e.getMessage(),e);
}
}

public void rollback(){
this.rollback("");
}

public void rollback(String methodname){
UserTransaction ut = null;
try{
ut = ConnectionFactory.getUserTransaction();
if(ut!=null){
ut.rollback();
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务已回滚.|"+methodname);
}
}
} catch (Throwable e) {
log.error(ERROR+"**************** ERROR:事务无法正常回滚.  ********************|"+methodname,e);
//throw new RuntimeException("事务无法正常回滚.|"+methodname+"|"+e.getMessage(),e);
}
}}
代码4:连接工厂,从JNDI获取连接,存储于本地线程package cn.tsb.comm.db.transaction;import java.sql.Connection;
import java.sql.SQLException;
import java.util.Stack;import javax.transaction.UserTransaction;import org.apache.log4j.Logger;import cn.tsb.comm.db.dao.ConnDAO;public final class ConnectionFactory { private static Logger log = Logger.getLogger(ConnectionFactory.class); private static final String DEBUG = TMConstants.DEBUG; private static final ThreadLocal<Stack<Connection>> threadLocal = new ThreadLocal<Stack<Connection>>();
//private static final ThreadLocal<UserTransaction> utLocal = new ThreadLocal<UserTransaction>(); private ConnectionFactory() {
} public   static void clear(String methodname) {
threadLocal.remove();
//utLocal.remove();
if(log.isDebugEnabled()){
log.debug(DEBUG+Thread.currentThread().getId()+" | "+"连接stack已清空.|"+methodname);
}
} public   static Stack<Connection> getConnection(){
Stack<Connection> stack = threadLocal.get();
if (stack == null) {
stack = new Stack<Connection>();
threadLocal.set(stack);
}
return stack;
} public   static Connection getConnection(String dbFlag)
throws SQLException {
Stack<Connection> stack = threadLocal.get();

//从JNDI获取连接
Connection con = ConnDAO.getDBConnection(dbFlag);
con.setAutoCommit(false);

if (stack == null) {
stack = new Stack<Connection>();
threadLocal.set(stack);
}

if(stack.isEmpty()){
TransactionManager tm = new TransactionManager();
tm.begin();
}

if (con == null) {
throw new RuntimeException("空的连接.");
} if (con.isClosed()) {
throw new RuntimeException("关闭的连接.");
}

if (log.isDebugEnabled()) {
log.debug(DEBUG+Thread.currentThread().getId()+" | " + dbFlag +"|连接已获取.");
}

stack.add(con); return con;
}

public  static UserTransaction getUserTransaction() {
//UserTransaction ut = utLocal.get();
//if (ut == null) {
//从JNDI获取ut
UserTransaction ut = ConnDAO.getUserTransaction();
// utLocal.set(ut);
//}
return ut;
}}