我的mysql是5.5驱动是5.1.9用JOTM跑分布式事务,插入2个表,同库,但不同的datasource我是20个线程并发,一共执行200多次。日志里该提交的都提交了,该回滚的都回滚了,就是DB里的记录不对。某一个表里边比另外一个表里少一条记录。是不是MYSQL对于分布式事务就是不靠谱的?
解决方案 »
- 为什么mysql子查询中不能用limit而临时表能呢?
- c# 向mysql 存储一张图片 为何报错呢?
- 相匹配6位字母加2位数字怎么写正则表达式
- 在mysql复制同步数据时*.err的内容如下,数据没有同步到,请高手帮帮忙。
- sql语句求助
- 建表失败,原因是有个字段名为 describe 该怎么办?
- 怎样知道一个ResultSet rs 实例中有多少条记录,不许用rs.next()加循环来求,因为记录集指针改变会影响我的应用
- 帮个忙,日期查询条件
- mysql varbinary和varchar如何相互转化
- MySQL5.7怎么开启TLSv1.2
- 关于mysql源码安装的问题
- MySQL高并发问题
换成ORACLE也这问题。可能还是并发的问题。 sql和事务都正常。可能还是并发的时候,connection使用的时候串笼子了
private static Logger log = Logger.getLogger(TransactionProxy.class); private String ERROR = TMConstants.ERROR+Thread.currentThread().getId()+" | ";
private String DEBUG = TMConstants.DEBUG+Thread.currentThread().getId()+" | ";
private static final ThreadLocal<Stack<String>> threadLocal = new ThreadLocal<Stack<String>>();
private String methodname = "";
public Object intercept(Object obj, Method method, Object[] args,
MethodProxy proxy) throws Throwable {
methodname = method.getName();
Object ret = null;
if(method.getName().equals("finalize") || method.getName().equals("count")){
ret = proxy.invokeSuper(obj, args);
}else{
try {
// for each new invoke, push stack
if (isEmpty()) {
//begin();
preBegin();
}
push(method.getName()); //log.debug(DEBUG+"method="+method.getName()+" os="+(threadLocal.get()==null?"0":threadLocal.get().size())+" ts="+(transactionLocal.get()==null?"0":transactionLocal.get().size())+" cs="+(connLocal.get()==null?"0":connLocal.get().size()));
ret = proxy.invokeSuper(obj, args); // do business // finish once, pop up
methodname = pop();
// check whether commit the transaction
if (isEmpty()) {
commit();
}
} catch (Throwable e) {
// if it is in the inner of the invoke chain, throw exception
// directly.
methodname = pop(); if (isEmpty()) {
rollback();
}
throw e;
} finally {
// check whether close the session
if (isEmpty()) {
close();
threadLocal.remove();
}
}
}
return ret;
} private void push(String name) {
Stack<String> invokingStack = threadLocal.get() == null ? new Stack<String>()
: threadLocal.get();
invokingStack.push(name);
if (threadLocal.get() == null)
threadLocal.set(invokingStack);
} private String pop() {
Stack<String> invokingStack = threadLocal.get();
if (invokingStack != null) {
try {
return invokingStack.pop();
} catch (EmptyStackException e) {
return "Empty invokingStack";
}
}else{
return "Empty invokingStack";
}
} private boolean isEmpty() {
Stack<String> invokingStack = threadLocal.get();
if (invokingStack == null)
return true;
return invokingStack.isEmpty();
}
private void preBegin(){
TransactionManager tm = new TransactionManager();
tm.preBegin(methodname);
}
private void begin(){
TransactionManager tm = new TransactionManager();
tm.begin(methodname);
}
private void commit(){
TransactionManager tm = new TransactionManager();
tm.commit(methodname);
}
private void rollback(){
TransactionManager tm = new TransactionManager();
tm.rollback(methodname);
}
private void close(){
try{
Stack<Connection> stack = getConnection();
for(Connection con:stack){
con.close();
if(log.isDebugEnabled()){
log.debug(DEBUG+"连接已关闭.|"+methodname);
}
}
clear();
}catch(Throwable e){
log.error(ERROR+"**************** ERROR:连接无法正常关闭. ********************|"+methodname,e);
throw new RuntimeException("连接无法正常关闭.|"+methodname+"|"+e.getMessage(),e);
}
}
private void clear(){
clearConnection(methodname);
}
public abstract Stack<Connection> getConnection();
public abstract void clearConnection(String methodname);
public abstract UserTransaction getUserTransaction();
代理类的子类,用于获取事务(用于提交回滚)和连接(用于关闭)package cn.tsb.comm.db.transaction;import java.sql.Connection;
import java.util.Stack;import javax.transaction.UserTransaction;public class WebTransactionProxy extends TransactionProxy{
@Override
public UserTransaction getUserTransaction() {
return ConnectionFactory.getUserTransaction();
} @Override
public void clearConnection(String name) {
ConnectionFactory.clear(name);
} @Override
public Stack<Connection> getConnection() {
return ConnectionFactory.getConnection();
}
}
代码2: 事务管理器。package cn.tsb.comm.db.transaction;import javax.transaction.UserTransaction;import org.apache.log4j.Logger;public class TransactionManager {
private String ERROR = TMConstants.ERROR+Thread.currentThread().getId()+" | ";
private String DEBUG = TMConstants.DEBUG+Thread.currentThread().getId()+" | ";
private static Logger log = Logger.getLogger(TransactionManager.class);
public TransactionManager(){
}
public void preBegin(String methodname) {
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务准备.|"+methodname);
}
}
public void begin(){
this.begin("");
}
public void begin(String methodname){
try {
UserTransaction ut = ConnectionFactory.getUserTransaction();
if(ut!=null){
ut.setTransactionTimeout(1000);
ut.begin();
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务开始.|"+methodname);
}
}
} catch (Throwable e) {
log.error(ERROR+"**************** ERROR:事务无法正常开始. ********************|",e);
//throw new RuntimeException("事务无法开始.|"+e.getMessage(),e);
}
}
public void commit(){
this.commit("");
}
public void commit(String methodname){
UserTransaction ut = null;
try{
ut = ConnectionFactory.getUserTransaction();
if(ut!=null){
ut.commit();
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务已提交.|"+methodname);
}
}
} catch (Throwable e) {
log.error(ERROR+"**************** ERROR:事务无法正常提交. ********************|"+methodname,e);
//throw new RuntimeException("事务无法正常提交.|"+methodname+"|"+e.getMessage(),e);
}
}
public void rollback(){
this.rollback("");
}
public void rollback(String methodname){
UserTransaction ut = null;
try{
ut = ConnectionFactory.getUserTransaction();
if(ut!=null){
ut.rollback();
if(log.isDebugEnabled()){
log.debug(DEBUG+"事务已回滚.|"+methodname);
}
}
} catch (Throwable e) {
log.error(ERROR+"**************** ERROR:事务无法正常回滚. ********************|"+methodname,e);
//throw new RuntimeException("事务无法正常回滚.|"+methodname+"|"+e.getMessage(),e);
}
}}代码3:用于获取Connection 和UserTransaction的工厂package cn.tsb.comm.db.transaction;import java.sql.Connection;
import java.sql.SQLException;
import java.util.Stack;import javax.transaction.UserTransaction;import org.apache.log4j.Logger;import cn.tsb.comm.db.dao.ConnDAO;public final class ConnectionFactory { private static Logger log = Logger.getLogger(ConnectionFactory.class); private static final String DEBUG = TMConstants.DEBUG; private static final ThreadLocal<Stack<Connection>> threadLocal = new ThreadLocal<Stack<Connection>>();
//private static final ThreadLocal<UserTransaction> utLocal = new ThreadLocal<UserTransaction>(); private ConnectionFactory() {
} public static void clear(String methodname) {
threadLocal.remove();
//utLocal.remove();
if(log.isDebugEnabled()){
log.debug(DEBUG+Thread.currentThread().getId()+" | "+"连接stack已清空.|"+methodname);
}
} public static Stack<Connection> getConnection(){
Stack<Connection> stack = threadLocal.get();
if (stack == null) {
stack = new Stack<Connection>();
threadLocal.set(stack);
}
return stack;
} public static Connection getConnection(String dbFlag)
throws SQLException {
Stack<Connection> stack = threadLocal.get();
Connection con = ConnDAO.getDBConnection(dbFlag);
con.setAutoCommit(false);
if (stack == null) {
stack = new Stack<Connection>();
threadLocal.set(stack);
}
if(stack.isEmpty()){
TransactionManager tm = new TransactionManager();
tm.begin();
}
if (con == null) {
throw new RuntimeException("空的连接.");
} if (con.isClosed()) {
throw new RuntimeException("关闭的连接.");
}
if (log.isDebugEnabled()) {
log.debug(DEBUG+Thread.currentThread().getId()+" | " + dbFlag +"|连接已获取.");
}
stack.add(con); return con;
}
public static UserTransaction getUserTransaction() {
//UserTransaction ut = utLocal.get();
//if (ut == null) {
UserTransaction ut = ConnDAO.getUserTransaction();
// utLocal.set(ut);
//}
return ut;
}}
不是有日志吗?看log.debug(DEBUG+"事务已提交.|"+methodname);
加上Thread.getCurrentThread().getName(),把它打出来,统计一下,看看到底提交了没有?这种问题应该可以解决。
1 利用JOTM实现分布式事务,我总结如下步骤:事务打开->[获取连接1->执行SQL->关闭连接1]->[获取连接2->执行SQL->关闭连接2]->[获取连接N->执行SQL->关闭连接N]->提交或者回滚
3 事务的获取,每一次事务的使用,直接从JNDI中获取即可。如此就带来一个新的问题,因为根据JTA+XA规范2 所有连接不需要保存,用的时候获取,用完之后关闭即可。
,从得有个地方start,end,prepared commit rollback等步骤,这些东西有JOTM+XAPool实现的。有时间我在整理整理这部分。
关于XA规范,详见:http://blog.csdn.net/kangojian/article/details/6780305httphttp://blog.csdn.net/kangojian/article/details/6780305://blog.csdn.net/kangojian/article/details/6780305http://blog.csdn.net/kangojian/article/details/6780305
,从得有个地方start,end,prepared commit rollback等步骤,这些东西有JOTM+XAPool实现的。有时间我在整理整理这部分。
关于XA规范,详见:http://blog.csdn.net/kangojian/article/details/6780305httphttp://blog.csdn.net/kangojian/article/details/6780305://blog.csdn.net/kangojian/article/details/6780305http://blog.csdn.net/kangojian/article/details/6780305