我的mysql是5.5驱动是5.1.9用JOTM跑分布式事务,插入2个表,同库,但不同的datasource我是20个线程并发,一共执行200多次。日志里该提交的都提交了,该回滚的都回滚了,就是DB里的记录不对。某一个表里边比另外一个表里少一条记录。是不是MYSQL对于分布式事务就是不靠谱的?

解决方案 »

  1.   

    不可能,最起码mysql也成功运行了数十年,若有问题?则何数据库可胜任之??请放心好了。
      

  2.   

    额,看来还是代码或者配置的问题。换到oracle试试
      

  3.   

    查找原因,打开通用日志,看看里面的内容,看看SQL语句、事务都 是否正常
      

  4.   


    换成ORACLE也这问题。可能还是并发的问题。 sql和事务都正常。可能还是并发的时候,connection使用的时候串笼子了
      

  5.   

    表中有主键,从日志中看,所有的事务头提交了。贴代码1: 这个是个代理类。所有业务方法都被此类代理。public abstract class TransactionProxy implements MethodInterceptor {

    private static Logger log = Logger.getLogger(TransactionProxy.class); private String ERROR = TMConstants.ERROR+Thread.currentThread().getId()+" | ";
    private String DEBUG = TMConstants.DEBUG+Thread.currentThread().getId()+" | ";

    private static final ThreadLocal<Stack<String>> threadLocal = new ThreadLocal<Stack<String>>();

    private String methodname = "";



    public Object intercept(Object obj, Method method, Object[] args,
    MethodProxy proxy) throws Throwable {

    methodname = method.getName();

    Object ret = null;

    if(method.getName().equals("finalize") || method.getName().equals("count")){
    ret = proxy.invokeSuper(obj, args);
    }else{
    try {
    // for each new invoke, push stack
    if (isEmpty()) {
    //begin();
    preBegin();
    }
    push(method.getName()); //log.debug(DEBUG+"method="+method.getName()+" os="+(threadLocal.get()==null?"0":threadLocal.get().size())+" ts="+(transactionLocal.get()==null?"0":transactionLocal.get().size())+" cs="+(connLocal.get()==null?"0":connLocal.get().size()));
    ret = proxy.invokeSuper(obj, args); // do business // finish once, pop up
    methodname = pop();
    // check whether commit the transaction
    if (isEmpty()) {
    commit();
    }
    } catch (Throwable e) {
    // if it is in the inner of the invoke chain, throw exception
    // directly.
    methodname = pop(); if (isEmpty()) {
    rollback();
    }
    throw e;
    } finally {
    // check whether close the session
    if (isEmpty()) {
    close();
    threadLocal.remove();
    }
    }
    }
    return ret;
    } private void push(String name) {
    Stack<String> invokingStack = threadLocal.get() == null ? new Stack<String>()
    : threadLocal.get();
    invokingStack.push(name);
    if (threadLocal.get() == null)
    threadLocal.set(invokingStack);
    } private String pop() {
    Stack<String> invokingStack = threadLocal.get();
    if (invokingStack != null) {
    try {
    return invokingStack.pop();
    } catch (EmptyStackException e) {
    return "Empty invokingStack";
    }
    }else{
    return "Empty invokingStack";
    }
    } private boolean isEmpty() {
    Stack<String> invokingStack = threadLocal.get();
    if (invokingStack == null)
    return true;
    return invokingStack.isEmpty();
    }

    private void preBegin(){
    TransactionManager tm = new TransactionManager();
    tm.preBegin(methodname);
    }

    private void begin(){
    TransactionManager tm = new TransactionManager();
    tm.begin(methodname);
    }

    private void commit(){
    TransactionManager tm = new TransactionManager();
    tm.commit(methodname);
    }

    private void rollback(){
    TransactionManager tm = new TransactionManager();
    tm.rollback(methodname);
    }

    private void close(){
    try{
    Stack<Connection> stack = getConnection();
    for(Connection con:stack){
    con.close();
    if(log.isDebugEnabled()){
    log.debug(DEBUG+"连接已关闭.|"+methodname);
    }
    }
    clear();
    }catch(Throwable e){
    log.error(ERROR+"**************** ERROR:连接无法正常关闭.  ********************|"+methodname,e);
    throw new RuntimeException("连接无法正常关闭.|"+methodname+"|"+e.getMessage(),e);
    }
    }

    private void clear(){
    clearConnection(methodname);
    }

    public abstract Stack<Connection> getConnection();
    public abstract void clearConnection(String methodname);
    public abstract UserTransaction getUserTransaction();
    代理类的子类,用于获取事务(用于提交回滚)和连接(用于关闭)package cn.tsb.comm.db.transaction;import java.sql.Connection;
    import java.util.Stack;import javax.transaction.UserTransaction;public class WebTransactionProxy extends TransactionProxy{

    @Override
    public UserTransaction getUserTransaction() {
    return ConnectionFactory.getUserTransaction();
    } @Override
    public void clearConnection(String name) {
    ConnectionFactory.clear(name);
    } @Override
    public Stack<Connection> getConnection() {
    return ConnectionFactory.getConnection();
    }
    }
    代码2: 事务管理器。package cn.tsb.comm.db.transaction;import javax.transaction.UserTransaction;import org.apache.log4j.Logger;public class TransactionManager {

    private String ERROR = TMConstants.ERROR+Thread.currentThread().getId()+" | ";
    private String DEBUG = TMConstants.DEBUG+Thread.currentThread().getId()+" | ";

    private static Logger log = Logger.getLogger(TransactionManager.class);

    public TransactionManager(){

    }

    public void preBegin(String methodname) {
    if(log.isDebugEnabled()){
    log.debug(DEBUG+"事务准备.|"+methodname);
    }
    }

    public void begin(){
    this.begin("");
    }

    public void begin(String methodname){
    try {
    UserTransaction ut = ConnectionFactory.getUserTransaction();
    if(ut!=null){
    ut.setTransactionTimeout(1000);
    ut.begin();
    if(log.isDebugEnabled()){
    log.debug(DEBUG+"事务开始.|"+methodname);
    }
    }
    } catch (Throwable e) {
    log.error(ERROR+"**************** ERROR:事务无法正常开始.  ********************|",e);
    //throw new RuntimeException("事务无法开始.|"+e.getMessage(),e);
    }
    }

    public void commit(){
    this.commit("");
    }

    public void commit(String methodname){
    UserTransaction ut = null;
    try{
    ut = ConnectionFactory.getUserTransaction();
    if(ut!=null){
    ut.commit();
    if(log.isDebugEnabled()){
    log.debug(DEBUG+"事务已提交.|"+methodname);
    }
    }
    } catch (Throwable e) {
    log.error(ERROR+"**************** ERROR:事务无法正常提交.  ********************|"+methodname,e);
    //throw new RuntimeException("事务无法正常提交.|"+methodname+"|"+e.getMessage(),e);
    }
    }

    public void rollback(){
    this.rollback("");
    }

    public void rollback(String methodname){
    UserTransaction ut = null;
    try{
    ut = ConnectionFactory.getUserTransaction();
    if(ut!=null){
    ut.rollback();
    if(log.isDebugEnabled()){
    log.debug(DEBUG+"事务已回滚.|"+methodname);
    }
    }
    } catch (Throwable e) {
    log.error(ERROR+"**************** ERROR:事务无法正常回滚.  ********************|"+methodname,e);
    //throw new RuntimeException("事务无法正常回滚.|"+methodname+"|"+e.getMessage(),e);
    }
    }}代码3:用于获取Connection 和UserTransaction的工厂package cn.tsb.comm.db.transaction;import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.Stack;import javax.transaction.UserTransaction;import org.apache.log4j.Logger;import cn.tsb.comm.db.dao.ConnDAO;public final class ConnectionFactory { private static Logger log = Logger.getLogger(ConnectionFactory.class); private static final String DEBUG = TMConstants.DEBUG; private static final ThreadLocal<Stack<Connection>> threadLocal = new ThreadLocal<Stack<Connection>>();
    //private static final ThreadLocal<UserTransaction> utLocal = new ThreadLocal<UserTransaction>(); private ConnectionFactory() {
    } public   static void clear(String methodname) {
    threadLocal.remove();
    //utLocal.remove();
    if(log.isDebugEnabled()){
    log.debug(DEBUG+Thread.currentThread().getId()+" | "+"连接stack已清空.|"+methodname);
    }
    } public   static Stack<Connection> getConnection(){
    Stack<Connection> stack = threadLocal.get();
    if (stack == null) {
    stack = new Stack<Connection>();
    threadLocal.set(stack);
    }
    return stack;
    } public   static Connection getConnection(String dbFlag)
    throws SQLException {
    Stack<Connection> stack = threadLocal.get();

    Connection con = ConnDAO.getDBConnection(dbFlag);
    con.setAutoCommit(false);

    if (stack == null) {
    stack = new Stack<Connection>();
    threadLocal.set(stack);
    }

    if(stack.isEmpty()){
    TransactionManager tm = new TransactionManager();
    tm.begin();
    }

    if (con == null) {
    throw new RuntimeException("空的连接.");
    } if (con.isClosed()) {
    throw new RuntimeException("关闭的连接.");
    }

    if (log.isDebugEnabled()) {
    log.debug(DEBUG+Thread.currentThread().getId()+" | " + dbFlag +"|连接已获取.");
    }

    stack.add(con); return con;
    }

    public  static UserTransaction getUserTransaction() {
    //UserTransaction ut = utLocal.get();
    //if (ut == null) {
    UserTransaction ut = ConnDAO.getUserTransaction();
    // utLocal.set(ut);
    //}
    return ut;
    }}
      

  6.   


    不是有日志吗?看log.debug(DEBUG+"事务已提交.|"+methodname);
    加上Thread.getCurrentThread().getName(),把它打出来,统计一下,看看到底提交了没有?这种问题应该可以解决。 
      

  7.   

    结贴,解决了。
    1 利用JOTM实现分布式事务,我总结如下步骤:事务打开->[获取连接1->执行SQL->关闭连接1]->[获取连接2->执行SQL->关闭连接2]->[获取连接N->执行SQL->关闭连接N]->提交或者回滚
    3 事务的获取,每一次事务的使用,直接从JNDI中获取即可。如此就带来一个新的问题,因为根据JTA+XA规范2 所有连接不需要保存,用的时候获取,用完之后关闭即可。
    ,从得有个地方start,end,prepared commit rollback等步骤,这些东西有JOTM+XAPool实现的。有时间我在整理整理这部分。
    关于XA规范,详见:http://blog.csdn.net/kangojian/article/details/6780305httphttp://blog.csdn.net/kangojian/article/details/6780305://blog.csdn.net/kangojian/article/details/6780305http://blog.csdn.net/kangojian/article/details/6780305
      

  8.   

    1 利用JOTM实现分布式事务,我总结如下步骤:事务打开->[获取连接1->执行SQL->关闭连接1]->[获取连接2->执行SQL->关闭连接2]->[获取连接N->执行SQL->关闭连接N]->提交或者回滚2 所有连接不需要保存,用的时候获取,用完之后关闭即可。3 事务的获取,每一次事务的使用,直接从JNDI中获取即可。如此就带来一个新的问题,因为根据JTA+XA规范
    ,从得有个地方start,end,prepared commit rollback等步骤,这些东西有JOTM+XAPool实现的。有时间我在整理整理这部分。
    关于XA规范,详见:http://blog.csdn.net/kangojian/article/details/6780305httphttp://blog.csdn.net/kangojian/article/details/6780305://blog.csdn.net/kangojian/article/details/6780305http://blog.csdn.net/kangojian/article/details/6780305