Linux rcu经典实现GP宽限期检测与quiescent state
2026/6/22 7:12:36 网站建设 项目流程

Linux rcu经典实现GP宽限期检测与quiescent state

`kernel/rcu/tree.c`中的grace period(GP)检测是Tree RCU最核心的闭环逻辑。GP宽限期是指从`rcu_gp_init()`设置`->gp_seq`到`rcu_gp_cleanup()`递增`->gp_seq`的整个区间。期间所有RCU读者都必须经过一个quiescent state,GP才被认为完成。整个检测链路涉及四个层次的交互:per-CPU的`rcu_data`、中间节点的`rcu_node`、全局的`rcu_state`,以及GP kthread的主循环`rcu_gp_kthread()`。

---

## 一、Grace Period生命周期的状态机

GP kthread的主循环由`rcu_gp_kthread()`驱动,定义在`kernel/rcu/tree.c`。其核心是一个四阶段状态机:

```c
static int __noreturn rcu_gp_kthread(void *unused)
{
for (;;) {
/* 阶段1: 等待GP请求 */
wait_event_interruptible(rsp->gp_wq,
READ_ONCE(rsp->gp_flags) & RCU_GP_FLAG_INIT);
/* 阶段2: 初始化GP */
rcu_gp_init(rsp);
/* 阶段3: 等待QS + FQS循环 */
rcu_gp_fqs_loop(rsp);
/* 阶段4: 清理 */
rcu_gp_cleanup(rsp);
}
}
```

状态转换通过`rsp->gp_state`追踪,取值包括`RCU_GP_DONE_GPS`、`RCU_GP_ONOFF`、`RCU_GP_INIT`、`RCU_GP_WAIT_GPS`和`RCU_GP_WAIT_FQS`。其中`RCU_GP_WAIT_FQS`是耗时最长的阶段,等待所有CPU报告QS或FQS定时器超时触发强制扫描。

## 二、GP初始化:qsmask的建立

`rcu_gp_init()`的核心工作是将当前在线CPU集合的快照写入每个`rcu_node`的`->qsmask`字段,作为GP完成的判定依据。

```c
static bool rcu_gp_init(struct rcu_state *rsp)
{
unsigned long oldmask;
struct rcu_node *rnp_root = rcu_get_root(rsp);

WRITE_ONCE(rsp->gp_state, RCU_GP_ONOFF);

/* 阶段A: 应用热插拔缓冲 */
rcu_for_each_leaf_node(rsp, rnp) {
raw_spin_lock_irq_rcu_node(rnp);
rnp->qsmaskinit = rnp->qsmaskinitnext;
WRITE_ONCE(rnp->gp_seq, rsp->gp_seq);
raw_spin_unlock_irq_rcu_node(rnp);
}

/* 阶段B: 将qsmaskinit复制到qsmask */
rcu_for_each_node_breadth_first(rsp, rnp) {
raw_spin_lock_irq_rcu_node(rnp);
WRITE_ONCE(rnp->qsmask, rnp->qsmaskinit);
rnp->rcu_gp_init_mask = rnp->qsmaskinit;
if (rnp == rnp_root)
break;
raw_spin_unlock_irq_rcu_node(rnp);
}

WRITE_ONCE(rsp->gp_state, RCU_GP_WAIT_FQS);
return true;
}
```

这里存在一个关键的竞态条件:`rcu_gp_init()`运行时可能正好有CPU hotplug事件发生。`rcutree_report_cpu_dead()`会设置`rnp->qsmaskinitnext`,而`rcu_gp_init()`通过读取`qsmaskinitnext`来初始化`qsmask`。如果在线CPU在`rcu_gp_init()`之后立即下线,其位仍在`qsmask`中,但该CPU永远不会再主动报告QS。为此,`rcu_gp_init()`在复制`qsmask`之后立即处理已离线CPU:

```c
mask = rnp->qsmask & ~rnp->qsmaskinitnext;
if (mask) {
raw_spin_lock_irq_rcu_node(rnp);
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
raw_spin_unlock_irq_rcu_node(rnp);
}
```

如果不执行这一步,离线CPU的位会永远卡死在`qsmask`中,导致GP永远无法完成,最终触发RCU stall。

## 三、QS检测路径与传播协议

QS的检测发生在以下四个上下文:

1. **调度器切换**:`rcu_qctr_help()`在`__schedule()`末尾被调用,递增当前CPU的`rdp->rcu_qs_ctr_snap`。
2. **用户模式退出**:`rcu_user_enter()`/`rcu_user_exit()`在`context_tracking`框架中标记QS。
3. **idle进入/退出**:`rcu_dynticks_eqs_enter()`/`rcu_dynticks_eqs_exit()`通过dynticks counter标记扩展QS(EQS)。
4. **RCU_SOFTIRQ**:`rcu_core()`中的`rcu_check_quiescent_state()`按tick周期性检查。

### 3.1 `rcu_report_qs_rdp()` - per-CPU上报入口

这是CPU级QS上报的单一入口点:

```c
static void rcu_report_qs_rdp(struct rcu_data *rdp)
{
unsigned long flags;
unsigned long mask;
bool needwake = false;
struct rcu_node *rnp;

rnp = rdp->mynode;
raw_spin_lock_irqsave_rcu_node(rnp, flags);

/* 边界条件1: GP已经过期 */
if (rdp->gp_seq != rnp->gp_seq) {
rdp->passed_quiesc = 0;
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return;
}

mask = rdp->grpmask;
if ((rnp->qsmask & mask) == 0) {
/* 边界条件2: 其他CPU已经代为上报 */
rdp->core_needs_qs = false;
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return;
}

WRITE_ONCE(rnp->qsmask, rnp->qsmask & ~mask);
rdp->passed_quiesc = 0;
rdp->core_needs_qs = false;

if (rnp->qsmask != 0 || rcu_preempt_blocked_readers_cgp(rnp)) {
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return;
}

/* 向父节点传播 */
needwake = rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
if (needwake)
rcu_gp_kthread_wake();
}
```

边界条件1处理的是典型的GP过期场景:当前CPU的GP序列号落后于`rcu_node`的`->gp_seq`,说明该CPU之前错过了某个GP,现在才补报QS——必须丢弃。边界条件2处理的是`force_qs_rnp()`已经抢先清除了该位的情况,此时`rdp->core_needs_qs`如果不清零,会导致后续不必要的RCU softirq重入。

### 3.2 `rcu_report_qs_rnp()` - 树形传播

`rcu_report_qs_rnp()`负责将QS从叶子节点逐级向根节点传播:

```c
static bool rcu_report_qs_rnp(unsigned long mask, struct rcu_node *rnp,
unsigned long gp_seq, unsigned long flags)
{
bool wake = false;

raw_spin_lock_irqsave_rcu_node(rnp, flags);
if (rnp->gp_seq != gp_seq) {
/* GP已经前进,当前上报作废 */
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return false;
}

rnp->qsmask &= ~mask;
if (rnp->qsmask != 0) {
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return false;
}

/* 检查是否有阻塞的preemptible RCU读者需要优先提升 */
if (rcu_preempt_blocked_readers_cgp(rnp)) {
if (rnp->gp_tasks != NULL)
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return false;
}

/* 向父节点传播 */
if (rnp->parent != NULL) {
return rcu_report_qs_rnp(rnp->grpmask, rnp->parent,
gp_seq, flags);
}

/* 到达根节点:GP完成 */
WRITE_ONCE(rsp->gp_state, RCU_GP_DONE_GPS);
/* 唤醒GP kthread进入清理 */
wake = true;
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return wake;
}
```

传播过程中`rnp->gp_seq != gp_seq`检查是关键同步边界。如果父节点上的GP序号已经前进(即嵌套GP已经开始),当前QS上报的掩码引用的是旧GP的位图,必须丢弃。这防止了`qsmask`在新旧GP之间交叉访问导致的无限等待。

### 3.3 `rcu_report_qs_rsp()` - GP终止信号

当根节点的`qsmask`归零后,`rcu_gp_fqs_loop()`退出等待,进入`rcu_gp_cleanup()`。清理函数会检查:

```c
static void rcu_gp_cleanup(struct rcu_state *rsp)
{
unsigned long gp_duration;
struct rcu_node *rnp = rcu_get_root(rsp);
struct rcu_data *rdp;

WRITE_ONCE(rsp->gp_state, RCU_GP_CLEANUP);

/* 诊断检查:所有qsmask必须归零 */
rcu_for_each_node_breadth_first(rsp, rnp) {
if (WARN_ON_ONCE(rcu_preempt_blocked_readers_cgp(rnp)))
continue;
if (WARN_ON_ONCE(rnp->qsmask))
continue;
}

/* 记录GP持续时间 */
gp_duration = rsp->gp_seq - rdp->gp_seq;
trace_rcu_grace_period(rsp->name, gp_duration, "end");

/* 递增GP序列号 */
WRITE_ONCE(rsp->gp_seq, rsp->gp_seq + 1);
WRITE_ONCE(rsp->gp_state, RCU_GP_DONE_GPS);
}
```

其中的`WARN_ON_ONCE`是捕获严重Bug的诊断手段。如果存在因race condition导致的`qsmask`残留,或者`rcu_preempt_blocked_readers_cgp(rnp)`非空(preemptible RCU中还有读者在临界区内阻塞),`rcu_gp_cleanup()`不能简单归零——它会尝试通过RCU priority boosting让阻塞读者退出。

## 四、Force Quiescent State的退化路径

当CPU处于长时间idle状态时,tick可能被停止(dynticks),CPU不会主动检查QS。`rcu_gp_fqs_loop()`通过hrtimer定期进入FQS路径。

```c
static void rcu_gp_fqs_loop(struct rcu_state *rsp)
{
unsigned long gf;
unsigned long jiffies_start = jiffies;

for (;;) {
/* 等待FQS定时器超时或QS全部到达 */
wait_event_idle_timeout(xxx, xxx, jiffies_till_next_fqs);

/* 检查是否有紧急FQS请求 */
gf = READ_ONCE(rsp->gp_flags);
if (gf & RCU_GP_FLAG_FQS) {
WRITE_ONCE(rsp->gp_flags, gf & ~RCU_GP_FLAG_FQS);
cond_resched_tasks_rcu_qs(&longterm);
force_qs_rnp(rsp, dyntick_save_progress_counter);
rcu_gp_fqs(rsp, jiffies_start, jiffies);
}

/* GP完成条件 */
if (!READ_ONCE(rnp->qsmask) &&
!rcu_preempt_blocked_readers_cgp(rnp))
break;
}
}
```

### 4.1 `force_qs_rnp()` - 遍历扫描

```c
static void force_qs_rnp(struct rcu_state *rsp,
int (*f)(struct rcu_data *rdp))
{
int cpu;
unsigned long flags;
unsigned long mask;
struct rcu_node *rnp;

rcu_for_each_leaf_node(rsp, rnp) {
raw_spin_lock_irqsave_rcu_node(rnp, flags);
if (rnp->qsmask == 0) {
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
continue;
}

for_each_leaf_node_possible_cpu(rnp, cpu) {
unsigned long bit = leaf_node_cpu_bit(rnp, cpu);
if ((rnp->qsmask & bit) == 0)
continue;
if (f(per_cpu_ptr(rsp->rda, cpu))) {
mask |= bit;
}
}

if (mask) {
rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
} else {
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
}
}
}
```

### 4.2 `dyntick_save_progress_counter()` - idle CPU检测

对于dynticks idle的CPU,不能依赖它主动上报QS。`dyntick_save_progress_counter()`通过dynticks counter的快照判断CPU是否经历了dyntick idle状态:

```c
static int dyntick_save_progress_counter(struct rcu_data *rdp)
{
unsigned long cbs;

cbs = READ_ONCE(rdp->dynticks->dynticks);
if (rdp->dynticks_snap == cbs) {
/* dynticks未变化,CPU可能在长时间idle */
return 0;
}

/* dynticks已变化,CPU经过了EQS,即等同于QS */
rdp->dynticks_snap = cbs;
return 1;
}
```

### 4.3 `rcu_implicit_dynticks_qs()` - 强制探测

当dynticks counter始终不变时(即CPU在idle中完全没有中断活动),判断逻辑升级:

```c
static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
{
unsigned long jtsq;
int *rcrmp;
unsigned long rjlmc;
struct rcu_node *rnp = rdp->mynode;

/* 已过信号量检查 */
if (rdp->rcu_qs_ctr_snap != per_cpu(rcu_qs_ctr, rdp->cpu))
return 1;

/* 如果CPU已下线,直接上报QS */
if (!rcu_cpu_online(rdp->cpu)) {
rcu_report_qs_rdp(rdp);
return 1;
}

/* CPU长时间无响应 -> 触发RCU stall警告 */
jtsq = jiffies - rdp->gp_start;

/* 检查是否已经超时 */
if (jtsq > jiffies_till_stall_check) {
rcu_dump_cpu_stacks(rnp);
if (jtsq > urc) {
/* 最终:触发紧急RCU stall */
rcu_dump_cpu_stacks(rnp);
return 0;
}
}

return 0;
}
```

这里存在一个隐蔽的边界条件:`rcu_qs_ctr_snap`在调度点通过`rcu_qsctr_help()`递增,但如果CPU处于完全无调度的idle循环中(如`ARCH_HAS_TICK_BROADCAST`下的deep idle),QS永远无法通过调度触发。此时dynticks EQS是唯一的QS来源。

## 五、竞态场景深度分析

### 5.1 GP边界上的QS误报

当GP kthread完成`rcu_gp_init()`后,有一个时间窗口:`rsp->gp_seq`已经递增,但`rnp->qsmask`还没有在全部`rcu_node`上写完成。如果此时某个CPU在softirq中调用`rcu_report_qs_rdp()`,它会比较`rdp->gp_seq != rnp->gp_seq`——相等才继续。但`rcu_gp_init()`在内核遍历`rcu_node`树时是广度优先,存在部分节点已经更新`->gp_seq`而部分节点尚未更新的时刻。这种不对称会导致CPU对未初始化完成的树节点上报QS,收到错误的`qsmask`判断。解决方法是在`rcu_gp_init()`结束前设置`rsp->gp_state = RCU_GP_WAIT_FQS`,而`rcu_check_quiescent_state()`检查`gp_state >= RCU_GP_WAIT_FQS`才允许上报。

### 5.2 并发GP请求的合并

多个`call_rcu()`在同一时间触发`rcu_gp_kthread_wake()`时,`rsp->gp_flags`的`RCU_GP_FLAG_INIT`位通过`__call_rcu_core()`原子置位。如果GP kthread已经开始初始化,后续的`call_rcu()`通过`rcu_accelerate_cbs()`将回调迁移至当前GP的等待队列中,不再触发新的GP启动。

### 5.3 热插拔与qsmask的再同步

当CPU hotplug发生时,`rcu_report_dead()`在CPU death announcement路径中被调用,该CPU在`rcu_node`中的位会被从`->qsmaskinitnext`中清除。但如果此时GP已经在进行中,清除`->qsmaskinitnext`并不影响正在运行的GP的`->qsmask`。处理方式分为两个路径:

- 如果`rcu_gp_init()`尚未运行(GP处于wait阶段),`rcutree_report_cpu_dead()`直接清除`rnp->qsmask`的对应位。
- 如果`rcu_gp_init()`已经完成了`qsmask`复制,则等待FQS路径中的`force_qs_rnp()`在发现该CPU不在线时通过`rcu_implicit_dynticks_qs()`自动上报。

### 5.4 expedited GP的short-circuit

当`rsp->gp_exp_help`置位时,expedited GP可以截断正在运行的normal GP。在`rcu_gp_fqs_loop()`中,额外检查`rcu_exp_gp_seq_done(rsp)`:

```c
if (rcu_exp_gp_seq_done(rsp)) {
/* expedited GP已经完成,normal GP可以提前终止 */
rcu_gp_fqs(rsp, jiffies_start, jiffies);
WRITE_ONCE(rsp->gp_exp_help, false);
}
```

在`rcu_gp_cleanup()`中,若`gp_exp_help == true`,`WARN_ON`被压制,`qsmask`和`gp_tasks`被显式清零而非触发诊断警告。

## 六、Cache line与性能影响

`rnp->lock`是RCU子系统中最热门的锁之一。每个QS上报都需要对叶子`rcu_node`加锁,当系统有数百个CPU时,树形层次可以显著缓解锁竞争:

```
(CPU0) rcu_data -> rnp_leaf[0] --|
(CPU1) rcu_data -> rnp_leaf[1] --+-- rnp_internal[0] -- rnp_root
... |
(CPU255) -> rnp_leaf[15] ---------|
```

每个叶子节点只覆盖16个CPU(`CONFIG_RCU_FANOUT_LEAF`),锁的粒度被控制在该范围内。现代内核还引入了`rcu_node`的`->lock`的raw spinlock优先级的优化:在`force_qs_rnp()`中,如果发现`rnp->qsmask`全零,直接跳过而不加锁,这避免了大量空轮询时的缓存行写入。

`rnp->qsmask`本身是一个`unsigned long`,其读写使用`WRITE_ONCE`/`READ_ONCE`而非原子操作,因为所有操作都在`->lock`保护下进行。唯一的无锁读取是`rcu_gp_kthread()`主循环中检查`!READ_ONCE(rnp->qsmask)`——这只是一个快速路径的"peek",后续确认仍需要加锁。

`rdp->passed_quiesc`的写操作分布是另一个性能陷阱。在每个tick的`rcu_check_quiescent_state()`中,如果`rdp->passed_quiesc`已经为true,则跳过后续流程。但`force_qs_rnp()`可能通过`dyntick_save_progress_counter()`间接置位`rdp->passed_quiesc`,导致`rcu_report_qs_rdp()`中的`passed_quiesc = 0`写回与tick路径产生竞争。由于`passed_quiesc`没有用`->lock`保护(这是一个单线程读写优化),其可见性依赖于`rcu_node`锁的间接同步。

## 七、边界条件备忘录

- **CPU 0的特殊性**:`rcu_gp_kthread()`运行在CPU 0上,但触发QS后`rcu_report_qs_rdp()`不会特殊处理CPU 0的QS。不过`rcu_gp_fqs_loop()`中调用`cond_resched_tasks_rcu_qs()`时,GP kthread本身会为运行它的CPU隐式上报一个QS。
- **NMI上下文中的QS**:`rcu_nmi_enter()`/`rcu_nmi_exit()`会在dynticks counter上增加偏移量,即使idle CPU处理NMI时也不会被误判为"始终idle"。
- **nocb(no-CB)callback offloading**:当`rcu_nocbs`启用了特定CPU的callback offloading,`rdp->cblist`的管理权移交给`rcuog`/`rcuop` kthread,但QS上报仍然通过相同的`rcu_report_qs_rdp()`路径,只是`rcu_accelerate_cbs()`的调用被绕过,以避免GP kthread在callback处理上产生额外唤醒。
- **bypass list**:对于offloaded CPU,`call_rcu()`先写入`rdp->nocb_bypass`链表,当该链表超过阈值或经过一个GP后,再刷新到`rdp->cblist`。这减少了`rcu_node`锁的竞争频率。
- **RCU stall**:当`jiffies - rdp->gp_start > jiffies_till_stall_check`且`rnp->qsmask`仍有未清位时,`rcu_implicit_dynticks_qs()`调用`rcu_dump_cpu_stacks()`打印堆栈并触发hard lockup检测。这是一条"最后一公里"的兜底逻辑,不应该在正常系统中被触发。
- **kfree_rcu()的特殊性**:`kfree_rcu()`背后使用`kvfree_rcu_bulk()`,在新的`rcu_tasks`等变体中被隔离管理,不直接参与`rcu_gp_fqs_loop()`的传统GP生命周期。
- **SMP memory ordering**:`rcu_gp_init()`中`WRITE_ONCE(rnp->qsmask, ...)`之后必须有`smp_mb__after_unlock_lock()`(内嵌于`raw_spin_unlock`之后的隐含屏障),以保证其他CPU读取`qsmask`时能看到完整的GP序列号更新。
- **tick_nohz_full的干扰**:在adaptive-ticks模式下,housekeeping CPU之外的CPU在用户态运行时不产生tick。这些CPU只能通过`rcu_user_exit()`路径上报QS,而`force_qs_rnp()`必须依赖`dyntick_save_progress_counter()`来检测,这导致GP完成时间被FQS定时器间隔`jiffies_till_next_fqs`(默认约3ms)钳制。

```c
/* kernel/rcu/tree_plugin.h: rcu_qsctr_help - 调度路径QS计数器 */
static void rcu_qsctr_help(struct rcu_data *rdp)
{
rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_qs_ctr);
WRITE_ONCE(rdp->passed_quiesc, 1);
}

/* kernel/rcu/tree.c: __note_gp_changes - GP边界检查 */
static bool __note_gp_changes(struct rcu_state *rsp, struct rcu_node *rnp,
struct rcu_data *rdp)
{
if (rdp->gp_seq == rnp->gp_seq)
return false;

rdp->gp_seq = rnp->gp_seq;
/* 如果当前GP需要当前CPU的QS */
if (rnp->qsmask & rdp->grpmask) {
rdp->qs_pending = true;
rdp->core_needs_qs = true;
}
return true;
}
```

`kernel/rcu/tree_plugin.h`中的`rcu_qsctr_help()`是调度路径上的QS辅助函数,在`__schedule()`的末尾被调用。它递增per-CPU的`rcu_qs_ctr`变量,该变量被`rcu_implicit_dynticks_qs()`作为判断依据之一。这个per-CPU计数器没有原子操作开销,因为它是单写者(仅当前CPU写)、多读者(FQS扫描时其他CPU读)的模式。

`kernel/rcu/tree.c`是整个Grace Period & QS检测逻辑的主战场。其状态机设计保证了每个CPU在GP宽限期内恰好报告一次QS,而树形`qsmask`传播在`O(log N)`的锁获取次数内完成GP终止判定。当CPU长时间处于dynticks idle状态时,FQS路径通过dynticks counter快照对比实现无侵入检测;当dynticks counter不变化时,`rcu_implicit_dynticks_qs()`逐步升级到RCU stall警告——这一整套机制保证了GP检测在极端条件下的完备性。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询