/** * {@link Transaction} that makes use of the JDBC commit and rollback facilities directly. * It relies on the connection retrieved from the dataSource to manage the scope * of the transaction. * Delays connection retrieval until getConnection() is called. * Ignores commit or rollback requests when autocommit is on. * * @see JdbcTransactionFactory */ /** * Jdbc事务 * @author Clinton Begin */ publicclassJdbcTransactionimplementsTransaction{
/** * @description 设置是否自动提交事务 * @author xudj * @date 2016年7月15日 下午5:37:37 * @param desiredAutoCommit */ protectedvoidsetDesiredAutoCommit(boolean desiredAutoCommit){ try { //和之前的不相等时进行设置 if (connection.getAutoCommit() != desiredAutoCommit) { if (log.isDebugEnabled()) { log.debug("Setting autocommit to " + desiredAutoCommit + " on JDBC Connection [" + connection + "]"); } connection.setAutoCommit(desiredAutoCommit); } } catch (SQLException e) { // Only a very poorly implemented driver would fail here, // and there's not much we can do about that. thrownew TransactionException("Error configuring AutoCommit. " + "Your driver may not support getAutoCommit() or setAutoCommit(). " + "Requested setting: " + desiredAutoCommit + ". Cause: " + e, e); } }
/** * @description 重置事务自动提交 * @author xudj * @date 2016年7月15日 下午5:41:40 */ protectedvoidresetAutoCommit(){ try { if (!connection.getAutoCommit()) { // MyBatis does not call commit/rollback on a connection if just selects were performed. // Some databases start transactions with select statements // and they mandate a commit/rollback before closing the connection. // A workaround is setting the autocommit to true before closing the connection. // Sybase throws an exception here. if (log.isDebugEnabled()) { log.debug("Resetting autocommit to true on JDBC Connection [" + connection + "]"); } connection.setAutoCommit(true); } } catch (SQLException e) { if (log.isDebugEnabled()) { log.debug("Error resetting autocommit to true " + "before closing the connection. Cause: " + e); } } }
coordinator接收完participant的反馈(vote)之后,进入阶段2,给各个participant发送准备提交(prepare to commit)指令。participant接到准备提交指令后可以锁资源,但要求相关操作必须可回滚。coordinator接收完确认(ACK)后进入阶段3、进行commit/abort,3PC的阶段3与2PC的阶段2无异。协调者备份(coordinator watchdog)、状态记录(logging)同样应用在3PC。
example: 服务a(小明从招行转出100元): try: update cmb_account set balance=balance-100, freeze=freeze+100 where acc_id=1 and balance>100; confirm: update cmb_account set freeze=freeze-100 where acc_id=1; cancel: update cmb_account set balance=balance+100, freeze=freeze-100 where acc_id=1; 服务b(小明往广发银行汇入100元): try: update cgb_account set freeze=freeze+100 where acc_id=1; confirm: update cgb_account set balance=balance+100, freeze=freeze-100 where acc_id=1; cancel: update cgb_account set freeze=freeze-100 where acc_id=1; 具体说明: a的try阶段,服务做了两件事,1:业务检查,这里是检查小明的帐户里的钱是否多余100元;2:预留资源,将100元从余额中划入冻结资金。 a的confirm阶段,这里不再进行业务检查,因为try阶段已经做过了,同时由于转账已经成功,将冻结资金扣除。 a的cancel阶段,释放预留资源,既100元冻结资金,并恢复到余额。 b的try阶段进行,预留资源,将100元冻结。 b的confirm阶段,使用try阶段预留的资源,将100元冻结资金划入余额。 b的cancel阶段,释放try阶段的预留资源,将100元从冻结资金中减去。 从上面的简单例子可以看出,TCC模式比纯业务补偿模式更加复杂,所以在实现上每个服务都需要实现Cofirm和Cancel两个接口。
voidParker::park(bool isAbsolute, jlong time){ // Ideally we'd do something useful while spinning, such // as calling unpackTime(). // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. // 原子操作 if (Atomic::xchg(0, &_counter) > 0) return;
/** * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED * / private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ privatevolatile Thread runner; /** Treiber stack of waiting threads */ privatevolatile WaitNode waiters;
state 是运行的状态, callable 是要执行的逻辑, 它会在其run方法中被执行 outcome 是结果. waiters字段则代表阻塞在该Future上的线程链表.
publicvoidrun(){ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protectedvoidset(V v){ if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } // 通过CAS的方式唤醒每一个被pack的线程,将他们unpack privatevoidfinishCompletion(){ // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }