0
2
1
0
专栏/.../

TIKV 源码学习笔记--分布式事务接口 CheckTxnStatus/ ResolveLock

 ylldty  发表于  2024-03-11

前言

本篇文章将会对锁冲突场景下,常用的解决锁冲突的接口,进行个人对代码的理解与解析,希望对大家理解 TIKV 分布式事务有所帮助

CheckTxnStatus

这个接口的作用比较明显,主要是事务中,遇到锁冲突 (一般是悲观事务过程中加悲观锁,或者 prewrite 过程中加的锁),从锁信息中获取到其 Primary KEY ,进而通过这个接口来看 primary key 的当前事务状态。事务状态可能是已经提交,已经回滚,或者还在事务中。

特别的,如果发现 Primary KEY 的锁已经过期了,CheckTxnStatus 还会主动将其进行保护模式的回滚。

参数

假如目前有两个并发事务,t1t2

t1 在事务过程中,发现了 t2 的锁,因此 t1 调用了 CheckTxnStatus 接口来查看 t2 当前的状态

  • primary_key: 需要查看的 t2 主键 KEY
  • lock_ts: t2start_ts
  • caller_start_ts: t1start_ts
  • current_ts: 当前的 ts
  • rollback_if_not_exist : 如果没有发现任何提交记录或者回滚记录的时候,是直接回滚还是返回错误
  • force_sync_commitasync_commit 的场景下,是否强制推进 async_commit 进程,否则返回 uncommitted
  • resolving_pessimistic_lock:false 代表本意是想解析 prewrite locktrue 代表本意是想解析悲观锁。
  • verify_is_primary: 验证主键上面的锁是主键锁 ( issue 42937 ),目前默认开启该校验功能

简化代码

  • 和其他接口一样,首先需要获取 Primary 的锁信息

  • 如果找到的锁是 符合预期t2 的锁,那么调用 check_txn_status_lock_exists

    • 首先需要校验这个 lock 的合法性:如果 verify_is_primary 参数是 true,结果发现这个锁信息中的 primary key 和请求参数的 primary 对应不上,那么需要返回错误 PrimaryMismatch,这种情况可能是 primary key 被替换了。

      • Corner Case:但是存在一个特殊情况 (lock 是悲观锁 && resolving_pessimistic_lockfalse )。就是说本来想要解析的是 prewrite lock,结果发现是悲观锁,而且锁的 primary 主键 key 还对应不上,这种场景下会网开一面并不会报错,而是会清理悲观锁,并且使用 check_txn_status_missing_lock 来进行进一步查看事务状态。这种场景的出现可能是因为悲观事务的 acquire_pessimistic_lock 接口被 stale 调用导致的
    • 如果是 use_async_commit 类型的 lock,非强制模式下 ( force_sync_commitfalse),直接返回 Ok(TxnStatus::uncommitted) 后续可能会进行重试。否则的话,继续执行

    • 如果 lockprewrite lock 的话,是符合预期的进一步判断 lock 的过期时间

      • 如果已经过期,那么直接回滚,返回 TxnStatus::TtlExpire。(这里的回滚好像是非保护模式的?按理说应该对 primary lock 进行保护模式的回滚,笔者比较疑惑)
      • 如果还未过期,那么更新 lockmin_commit_ts,并且返回 TxnStatus::uncommitted
    • 如果 lock 是悲观锁的话,需要使用 check_txn_status_from_pessimistic_primary_lock 进一步处理

      • 特别地,如果悲观锁信息显示该 lock 是通过公平锁功能写入的,那么这个 lock 需要进一步进行检查验证,防止 issue 43540 ,进一步查看该 lock 对应的事务是已经提交或者回滚,防止其是 stalelock。如果确实没有任何提交记录或者回滚记录,那么可以才可以认为该悲观锁是可用的有效的。否则的话,直接清楚该悲观锁,返回事务状态即可。

      • 如果悲观锁已经过期

        • 如果预期检查的 lock 就是悲观锁,那么只需要清理悲观锁,返回 Ok(TxnStatus::PessimisticRollBack) 即可,无需回滚
        • 如果预期检查的 lockprewrite lock,我们就需要清理悲观锁的同时,还需要留下回滚的记录 (非保护模式下,笔者目前不太了解为何还是非保护模式)
      • 如果悲观锁未过期,那么更新 lockmin_commit_ts,并且返回 TxnStatus::uncommitted

  • 如果没有找到锁,或者找到的锁是不是符合预期的 t2 的锁,属于非预期场景,那么调用 check_txn_status_missing_lock

check_txn_status_missing_lock 这个函数我们应该很熟悉了,这个函数在 rollbackcleanup 函数中也会被调用,但是由于 MissingLockAction 的不同,逻辑稍微有些变化:

  • 如果发现有本事务的 OverlappedRollback 的记录或者回滚记录 (SingleRecord::Rollback),说明已经回滚完成,直接返回 OK

  • 如果发现有本事务提交记录的话,返回 ErrorInner::Committed

  • 如果没有找到任何本事务 write 记录的话,属于非预期场景

    • 如果 rollback_if_not_existfalse,那么直接返回 ErrorInner::TxnNotFound

    • 如果 resolving_pessimistic_lock 参数为 true 的话,就是说目标是解析悲观锁,结果并没有发现该锁,这时候会返回 Ok(TxnStatus::LockNotExistDoNothing)

    • 如果 rollback_if_not_existtrue,那么需要进行保护模式的回滚操作:

      • 调用 mark_rollback_on_mismatching_lock 在这个 LOCK 上面添加回滚 LockTS 标记,这样这个 lock 所涉及的事务在提交后,如果发现自己的 commitTSLockTS 重叠的话,需要设置一下 overlap 标记
      • 调用 make_rollback 写入保护模式的 rollback 记录,确保这个回滚记录不会被删除
      • 删除 collapse 以前的非保护rollback 记录

fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
        ...

        let (txn_status, released) = match reader.load_lock(&self.primary_key)? {
            Some(lock) if lock.ts == self.lock_ts => check_txn_status_lock_exists(
                &mut txn,
                &mut reader,
                self.primary_key,
                lock,
                self.current_ts,
                self.caller_start_ts,
                self.force_sync_commit,
                self.resolving_pessimistic_lock,
                self.verify_is_primary,
                self.rollback_if_not_exist,
            )?,
            l => (
                check_txn_status_missing_lock(
                    &mut txn,
                    &mut reader,
                    self.primary_key,
                    l,
                    MissingLockAction::rollback(self.rollback_if_not_exist),
                    self.resolving_pessimistic_lock,
                )?,
                None,
            ),
        };
        
        ...
        Ok(WriteResult {
            ...
        })
    }


pub fn check_txn_status_lock_exists(
    txn: &mut MvccTxn,
    reader: &mut SnapshotReader<impl Snapshot>,
    primary_key: Key,
    mut lock: Lock,
    current_ts: TimeStamp,
    caller_start_ts: TimeStamp,
    force_sync_commit: bool,
    resolving_pessimistic_lock: bool,
    verify_is_primary: bool,
    rollback_if_not_exist: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
    if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
        return match (resolving_pessimistic_lock, lock.is_pessimistic_lock()) {
            (false, true) => {
                ...
                let txn_status = check_txn_status_missing_lock(
                    ...
                    MissingLockAction::rollback(rollback_if_not_exist),
                    resolving_pessimistic_lock,
                )?;

                Ok((txn_status, released))
            }
            _ => {
                Err(
                    ErrorInner::PrimaryMismatch...                )
            }
        };
    }

    // Never rollback or push forward min_commit_ts in check_txn_status if it's
    // using async commit. Rollback of async-commit locks are done during
    // ResolveLock.
    if lock.use_async_commit {
        if force_sync_commit {
            ...
        } else {
            return Ok((TxnStatus::uncommitted(lock, false), None));
        }
    }

    let is_pessimistic_txn = !lock.for_update_ts.is_zero();
    if lock.is_pessimistic_lock() {
        let check_result = check_txn_status_from_pessimistic_primary_lock(
            ...
            resolving_pessimistic_lock,
        )?;
        ...
    } else if lock.ts.physical() + lock.ttl < current_ts.physical() {
        let released = rollback_lock(txn, reader, primary_key, &lock, is_pessimistic_txn, true)?;
        return Ok((TxnStatus::TtlExpire, released));
    }


    if !lock.min_commit_ts.is_zero()
        && !caller_start_ts.is_max()
        // Push forward the min_commit_ts so that reading won't be blocked by locks.
        && caller_start_ts >= lock.min_commit_ts
    {
        lock.min_commit_ts = ...
    }

    Ok((TxnStatus::uncommitted(lock, min_commit_ts_pushed), None))
}

fn check_txn_status_from_pessimistic_primary_lock(
    txn: &mut MvccTxn,
    reader: &mut SnapshotReader<impl Snapshot>,
    primary_key: Key,
    lock: &Lock,
    current_ts: TimeStamp,
    resolving_pessimistic_lock: bool,
) -> Result<(Option<TxnStatus>, Option<ReleasedLock>)> {
    if lock.is_pessimistic_lock_with_conflict() {
        if let Some(txn_status) = check_determined_txn_status(reader, &primary_key)? {
            ...
            let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
            return Ok((Some(txn_status), released));
        }
    }

    if lock.ts.physical() + lock.ttl < current_ts.physical() {
        return if resolving_pessimistic_lock {
            let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
            Ok((Some(TxnStatus::PessimisticRollBack), released))
        } else {
            let released = rollback_lock(txn, reader, primary_key, lock, true, true)?;
            Ok((Some(TxnStatus::TtlExpire), released))
        };
    }

    Ok((None, None))
}

CheckSecondaryLocks

CheckSecondaryLocks 接口主要是应用与 Async Commit 所用,用来查看异步 commit 的过程中,通过 primary lock 上面的 secondary 来查看所有的 prewrite lock,进而分析事务到底是否提交。

/// Check secondary locks of an async commit transaction.

///

/// If all prewritten locks exist, the lock information is returned.

/// Otherwise, it returns the commit timestamp of the transaction.

///

/// If the lock does not exist or is a pessimistic lock, to prevent the

/// status being changed, a rollback may be written.

参数

  • keys:事务涉及到被加锁的 keys
  • start_ts: 事务的开始 ts

简化代码

  • 对每个 key 查询所对应的 lock

    • 如果通过某一个 key 发现了提交或者回滚记录,那么直接可以 break,返回结果。
    • 如果没有记录,也没有找到锁的话,那么就需要回滚,并且是以保护模式下进行回滚,然后 break,返回结果。
    • 否则的话需要遍历所有的 key,收集 lock 信息
  • 如果如预期一样查询到了事务的 lock,那么就会使用 check_status_from_lock 进行进一步检查

    • 如果 lock 是悲观锁

      • checkTxnStatus 一样,如果 lock 是公平锁冲突加锁的话,需要进一步查看提交、回滚、无记录状态。如果是提交或者回滚状态,那么直接可以终止 CheckSecondaryLocks 返回结果。如果是无记录状态的话,可以将其当做普通的悲观锁
      • 悲观锁是非预期状态,这个时候需要清理悲观锁,将其当做无记录也没有找到 lock 的场景来看,也就是执行回滚操作,然后终止 CheckSecondaryLocks
    • 如果 lockprewrite lock,符合预期,返回锁信息,继续检查其他 key 的状态

  • 如果没有 lock,或者没有查询到预期事务的 lock,那么就会 check_determined_txn_status 进一步查询提交或者回滚的记录

fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
        ...
        let mut released_locks = ReleasedLocks::new();
        let mut result = SecondaryLocksStatus::Locked(Vec::new());

        for key in self.keys {
            let mut released_lock = None;
            let mut mismatch_lock = None;
            // Checks whether the given secondary lock exists.
            let (status, need_rollback, rollback_overlapped_write) = match reader.load_lock(&key)? {
                // The lock exists, the lock information is returned.
                Some(lock) if lock.ts == self.start_ts => {
                    let (status, need_rollback, rollback_overlapped_write, lock_released) =
                        check_status_from_lock(&mut txn, &mut reader, lock, &key, region_id)?;
                    released_lock = lock_released;
                    (status, need_rollback, rollback_overlapped_write)
                }
                // Searches the write CF for the commit record of the lock and returns the commit
                // timestamp (0 if the lock is not committed).
                l => {
                    mismatch_lock = l;
                    check_determined_txn_status(&mut reader, &key)?
                }
            };
            
            if need_rollback {
                if let Some(l) = mismatch_lock {
                    txn.mark_rollback_on_mismatching_lock(&key, l, true);
                }
                // We must protect this rollback in case this rollback is collapsed and a stale
                // acquire_pessimistic_lock and prewrite succeed again.
                if let Some(write) = make_rollback(self.start_ts, true, rollback_overlapped_write) {
                    txn.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
                    collapse_prev_rollback(&mut txn, &mut reader, &key)?;
                }
            }
            released_locks.push(released_lock);
            match status {
                SecondaryLockStatus::Locked(lock) => {
                    result.push(lock.into_lock_info(key.to_raw()?));
                }
                SecondaryLockStatus::Committed(commit_ts) => {
                    result = SecondaryLocksStatus::Committed(commit_ts);
                    break;
                }
                SecondaryLockStatus::RolledBack => {
                    result = SecondaryLocksStatus::RolledBack;
                    break;
                }
            }
        }

        ...
    }
}


fn check_status_from_lock<S: Snapshot>(
    txn: &mut MvccTxn,
    reader: &mut ReaderWithStats<'_, S>,
    lock: Lock,
    key: &Key,
    region_id: u64,
) -> Result<(
    SecondaryLockStatus,
    bool,
    Option<OverlappedWrite>,
    Option<ReleasedLock>,
)> {
    let mut overlapped_write = None;
    if lock.is_pessimistic_lock_with_conflict() {
        let (status, need_rollback, rollback_overlapped_write) =
            check_determined_txn_status(reader, key)?;

        if !need_rollback {
            let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
            return Ok((
                ...
            ));
        }
        overlapped_write = rollback_overlapped_write;
    }

    if lock.is_pessimistic_lock() {
        let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
        let overlapped_write_res = if lock.is_pessimistic_lock_with_conflict() {
            overlapped_write
        } else {
            reader.get_txn_commit_record(key)?.unwrap_none(region_id)
        };
        Ok((
            ...
        ))
    } else {
        Ok((SecondaryLockStatus::Locked(lock), false, None, None))
    }
}


fn check_determined_txn_status<S: Snapshot>(
    reader: &mut ReaderWithStats<'_, S>,
    key: &Key,
) -> Result<(SecondaryLockStatus, bool, Option<OverlappedWrite>)> {
    match reader.get_txn_commit_record(key)? {
        TxnCommitRecord::SingleRecord { commit_ts, write } => {
            let status = if write.write_type != WriteType::Rollback {
                SecondaryLockStatus::Committed(commit_ts)
            } else {
                SecondaryLockStatus::RolledBack
            };
            // We needn't write a rollback once there is a write record for it:
            // If it's a committed record, it cannot be changed.
            // If it's a rollback record, it either comes from another
            // check_secondary_lock (thus protected) or the client stops commit
            // actively. So we don't need to make it protected again.
            Ok((status, false, None))
        }
        TxnCommitRecord::OverlappedRollback { .. } => {
            Ok((SecondaryLockStatus::RolledBack, false, None))
        }
        TxnCommitRecord::None { overlapped_write } => {
            Ok((SecondaryLockStatus::RolledBack, true, overlapped_write))
        }
    }
}

ResolveLock

通过 checkTxnStatus 查询到 primary key 的事务状态后,就需要 ResolveLocksecondary key 进行提交或者回滚。如果 primary key 已经提交了,那么 ResolveLocksecondary key 进行提交。如果 primary key 已经回滚了,那么 ResolveLocksecondary key 进行回滚。

参数

  • start_ts:事务的开始 ts
  • commit_ts: 事务的提交 ts。当需要回滚的时候,该值为 0;否则的话,该值不为 0
  • resolve_keys: 需要提交或者回滚的 secondary keys

简化代码

代码非常简单了,直接调用提交或者回滚的函数即可。注意对于 secondary key 来说,回滚是非保护模式的。

impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLockLite {
    fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
        ...
        for key in self.resolve_keys {
            released_locks.push(if !self.commit_ts.is_zero() {
                commit(&mut txn, &mut reader, key, self.commit_ts)?
            } else {
                cleanup(&mut txn, &mut reader, key, TimeStamp::zero(), false)?
            });
        }

        
        Ok(WriteResult {
            ...
        })
    }
}

0
2
1
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论