本篇是《Linux内核学习笔记》系列的第十篇。
并发(Concurrency)是操作系统需要集中关注的问题之一。作为资源的管理者,OS有一些共享资源供多个执行线程访问,而这些线程对资源的同时访问包含着使系统不稳定的隐患。此类错误还通常难以跟踪调试——可以说能够稳定地复现bug就已经做到了debug的一半。
如果学过OS,你会对颇多经典的并发问题有了解,比如生产者-消费者问题、订票问题、哲学家♂吃饭问题等。这里不多赘述,主要侧重于内核对同步机制的实现。
1. 内核同步问题
1.1. 并发执行
内核中经常会出现并发执行,比如对称多处理器导致的真并发(并行),或者在单核上抢占式调度的伪并发。总结下来,并发访问有以下可能的原因:
- 中断。它是异步发生的,不管执行到哪里都可以被中断打断。
- 软中断和tasklet。内核可能在任意时机调度下半部,例如定时事件。
- 内核抢占或睡眠。它们都与处理器上的进程替换有关。
- 对称多处理。这是完全挡不住的真并发。
可以想象,基本上如果内核代码在临界区被中断、抢占或进入睡眠,那么竞争条件的基础就完全具备了。要是有多处理器同时访问一个资源,那用不着前面三种yield,也会发生竞争。其实解决办法相对简单,就是给可能产生并发访问的数据加锁,但是对于已经写好的内核代码,为其追加锁,有点麻烦。所以最好一开始就考虑周到,不要等亡羊补牢。
内核针对上述并发产生的原因,设计了一些机制用以写出避免并发访问的安全的代码,比如中断安全代码、SMP安全代码、抢占安全代码等。后面我们会仔细介绍。
1.2. 死锁
死锁就是硬等。可以是在等别人的资源释放,但别人也在等你释放;还可以是等自己的资源释放。
如果内核运行时出现死锁那一般是有大问题了,不是说杀个线程什么的就可以万事大吉,而是要在写代码时就力加避免死锁。有一些技巧可以帮助避免死锁:
- 如果某一资源要持有多个锁才能访问,那就保证按顺序获取锁。例如某资源有锁A、B、C,大家都按ABC的顺序获取锁就没有问题。但要是有哪怕一个线程非要先获取C,那早晚会出事。这种要求得直接写在注释里面,越醒目越好。释放锁就没那么高的要求了,但最好还是按相反顺序释放。
- 思考:当我等一个锁的时候,有没有无限等待的可能?我必须一直这么等下去吗?还是见好就收?
- 不要重复请求同一个锁。Linux内核不提供可重入锁,请求一个自己持有的锁==死亡。
- 设计得简单点。
Linux内核提供了简单的调试工具用以帮助发现死锁,后面会介绍。但这不等于内核应当、或可以在运行时解除死锁。
1.3. 争用和可扩展性
锁的争用就是说其它线程等待获取锁。高度争用就是说有很多线程在等,这会影响系统性能。可扩展性是对系统可扩展程度的度量。说一个OS的可扩展性如何,就是在谈论它在更多处理器、更多内核、更多进程上的表现如何。
如果加锁粒度过大,那么在线程数增加时,会有很多线程忙等(或阻塞)在锁上,这可能是一个sound的实现,但严重降低可扩展性。为了提高可扩展性,内核需要提供更细致的加锁机制,但锁细就意味着实现复杂,小型机器可能用不到这么细粒度的锁,这是一种浪费。
锁的选择合适与否,往往只在一线之间。一般的设计原则是:越简单越好,需要时再去做细化。
2. 原子操作
原子就是不可分割的东西。试想,如果临界区都变成了一个原子操作,那么可以解决很多资源竞争的问题。这是一个sound的办法,但显然会带来可扩展性的问题。内核只在小规模的数据上实现了原子操作接口。
2.1. 原子整数
atomic_t是一个看起来平平无奇的类型。就是这个东西:
typedef struct {
int counter;
} atomic_t;
#define ATOMIC_INIT(i) { (i) }
#ifdef CONFIG_64BIT
typedef struct {
s64 counter;
} atomic64_t;
#endif
各种各样的原子函数在<linux/atomic.h>中声明,在体系结构对应的<asm/atomic.h>中实现。
非要用一个atomic_t类型做wrapper,有两个好处。其一,原子函数只接受原子操作数,从而避免原子函数滥用。其二,编译器将不对原子更新的操作进行优化,这点是由原子操作的实现(大量使用volatile)决定的。
原子操作的使用很简单。
atomic_t v = ATOMIC_INIT(7);
atomic_sub(3, &v);
atomic_add(2, &v);
printk("%d\n", atomic_read(&v)); // 6
很容易想象,用这样的方法就可以维护一个计数器,轻量,方便。
2.2. 原子位操作
原子位操作和原子整数一样,也是在<linux/bitops.h>下有声明,具体实现在体系结构相关的头文件中。这里不再展开讲述。
3. 排斥方法
原子操作不是万能的。临界区一长,可以跨越多个函数,再把它看成是原子操作就不合理了。这时我们就需要锁,各种各样的锁。
一般推崇给数据上锁,而不是给代码上锁,因为这才是并发访问问题的关键所在。
3.1. 自旋锁spinlock
3.1.1. 简介
自旋锁就是:交换、交换、不停地交换,直到交换成功。我用0去换锁里的那个1,换到了,就是持有了锁;否则(即拿到手里的是0),说明刚才犹豫的一瞬间锁已经被别人拿走了,那么就不停尝试持有,直到别人释放锁,然后自己拿到锁。
自旋锁最大的问题是忙等。这一段期间进程就是非常纯粹地除了反复尝试以外什么都不做,空占着处理器。考虑到这种情况,让进程在自旋锁上忙等的时间不能太久。除此之外,自旋锁使用期间禁止内核抢占。
相比之下,互斥体采用阻塞机制,可以让等待的进程释放出处理器,但是起码会额外耗费两个上下文切换时间,即几十纳秒到几微秒。出于这样的考虑,如果进程等待的时间少于两次上下文切换,那么采用自旋锁是更好的。
自旋锁就是spinlock_t,声明于<linux/spinlock_types.h>。
typedef struct spinlock {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
就是这东西。在我的机子上,CONFIG_DEBUG_LOCK_ALLOC没有被定义。lockdep模块是用来检测死锁的模块,比较方便debug,所以对于稳定、日常使用的内核当然没必要打开这一功能,因此spin_lock就是raw_spin_lock的一层wrapper。
要使用自旋锁需要#include <linux/spinlock.h>,方法如下:
DEFINE_SPINLOCK(lock);
spin_lock(&lock);
/* Critical Section */
spin_unlock(&lock);
3.1.2. 中断上下文使用锁的注意事项
自旋锁当然也可以用在中断处理程序中,但是要多做一点考虑。设想张三线程正在处理器上拿着锁做些事情,我们知道中断是异步的,这时候张三突然被中断打断了,事情还没做完。不巧的是,中断正好要争用这个锁,所以就自旋了,可在同一核上张三完全没有回到处理器把事情做完的机会,因此锁不可能被释放,张三和中断处理程序就🔒死了。
内核提供的解决方案是:张三可以事先用这样的方式来占有和释放锁:
spin_lock_irqsave(&lock);
/* Critical Section */
spin_unlock_irqrestore(&lock);
占有锁的时候就禁止当前处理器上的中断,这样张三就不会被打断了;同时保存先前的中断状态,以便在释放锁的时候进行恢复。进一步说,如果占有锁前处理器禁止中断,那么释放锁后处理器还是禁止中断。不用禁止其它处理器的中断,因为其他处理器的中断不会打断张三的执行,不会产生死锁。
3.1.3. 中断下半部使用锁的注意事项
在中断下半部使用锁存在和上面类似的问题,禁止中断固然是一种解决方式,但还可以只禁止下半部的执行。
spin_lock_bh(&lock);
/* Critical Section */
spin_unlock_bh(&lock);
3.1.3. 其他一些絮叨
Linux的自旋锁是不可重入锁,即不能反复持有,否则会把自己锁♂死。
自旋锁一个非常经典的应用是jiffies,它采用raw_spin_lock。不再将对应的宏展开了,学到这里应该也能对它实际代表的内容猜出个八九。
__cacheline_aligned_in_smp DEFINE_RAW_SPINLOCK(jiffies_lock);
3.2. 读写锁rwlock
rwlock_t和spinlock_t的使用方法基本类似,需要包含头文件<linux/rwlock.h>。
3.3. 互斥体mutex
互斥体和自旋锁从使用上看最大的不同就是:如果获取锁失败,就阻塞在锁上,放出CPU。它只能由上锁的进程释放。此外,使用互斥体时允许抢占。下面是它的定义。
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
};
mutex当然也可以包括用于debug的lockdep成员,上面没有包含进来。
CONFIG_MUTEX_SPIN_ON_OWNER的意思就是:如果mutex的持有者正在处理器上运行(而不是正在休眠),那可以乐观地期望这个锁不久就会被释放,所以spin一下,稍微等等就好!
3.4. 互斥体和自旋锁的选择
在互斥体上阻塞是会休眠的,所以切忌在中断上下文、软中断处理程序等位置使用。这样的地方需改用自旋锁,不过一定要记得关中断,或禁止下半部。
相反,如果我们一定需要等待的进程陷入休眠,那就用互斥体吧。
除以上情况之外,具体选择哪一个,看预计的等待时间就行。不要让可能会等很长时间(几微秒以上)的进程用自旋锁。
3.5. 大内核锁BKL
回到一切的开始,假如你的内核马上需要支持SMP(对称多处理器),你能想到的第一个办法是什么?解决数据竞争的本质就是保护执行流不同时进入临界区,最简单的办法就是给临界区上锁,离开时释放。这就是大内核锁的基本思想。
但是很明显,这种办法的可扩展性不好,所以后面也就废弃了。在今天的内核中已经完全无法再看到它的踪影。
4. 协同方法
还有一些锁机制允许睡眠,从而使不同进程更加高效地轮流利用处理器,接下来进行介绍。
4.1. 信号量semaphore
信号量和互斥锁具有相似的特点:都可能睡眠。但不同的是,信号量可以在任意位置进行PV操作。下面是它的定义:
/* Please don't access any members of this structure directly */
struct semaphore {
raw_spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
它不是基于自旋锁实现,只是用自旋锁保护信号量结构内的各变量,比如对于down操作:
int down_interruptible(struct semaphore *sem)
{
unsigned long flags;
int result = 0;
raw_spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
result = __down_interruptible(sem);
raw_spin_unlock_irqrestore(&sem->lock, flags);
return result;
}
实际上它的实现非常简单,我们仍以down操作为例,看关键部分的代码。这一步将当前进程加到信号量的等待队列中,由current宏我们可以非常肯定它只能在进程上下文使用。
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = current;
waiter.up = false;
这一步是释放锁,激活本地中断,阻塞当前进程,以及判断是否可以恢复运行。一般来说,如果进程阻塞在了信号量上,那么必须由up操作唤醒,才算正常地获取锁。循环中每次都会检查进程是否被外部信号打断,以及是否超时,如果是的话,表示不能再等信号量了,应走goto分支,主动将自己从等待队列上删除。如果waiter.up标志位被设起,说明信号量有了,是时候回去掌管大业了!
for (;;) {
if (signal_pending_state(state, current))
goto interrupted;
if (unlikely(timeout <= 0))
goto timed_out;
__set_current_state(state);
raw_spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
raw_spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
__set_current_state宏用于指定进程状态,如果调用down,就设成不可中断状态;如果调用down_interruptible,就是可中断状态。这命名方式是不是很怪?更怪的是,一般鼓励使用看起来更不自然的后者,因为信号量不像互斥体那样,它太灵活了所以没有lockdep机制帮助debug,故而产生死锁时需要有一种亡羊补牢的方式。
在使用schedule_timeout让出处理器之前,先释放用于保护信号量的锁,并无条件启用中断。这里有一个问题,从启用中断到正式被调度下处理器还有一段距离,这段代码可能会被中断。如果线程原本是在关中断时使用信号量,这是否会违背线程的初衷?我的看法是,使用信号量就意味着线程已经接受被调度的可能性,线程也不太可能会在使用信号量期间完成一件只有在关中断时才能完成的事情。在线程被调度回处理器后,还要再上锁关中断,从而避免竞争条件,并保持锁和中断状态的一致性。
4.2. 读写信号量rw_semaphore
和读写自旋锁有差不多的感觉,略去。
4.3. 完成变量completion
完成变量和信号量有相似的思想,进程阻塞在完成变量上,等待某一时刻被唤醒。它的定义如下:
struct completion {
unsigned int done;
struct swait_queue_head wait;
};
下面以vfork系统调用为例讲解它的使用。在fork系列函数的主干函数kernel_clone中,完成一系列创建后,如果是vfork调用,那么会使用完成变量挂起父进程:
if (clone_flags & CLONE_VFORK) {
p->vfork_done = &vfork;
init_completion(&vfork);
get_task_struct(p);
}
...
if (clone_flags & CLONE_VFORK) {
if (!wait_for_vfork_done(p, &vfork))
...
}
static int wait_for_vfork_done(struct task_struct *child,
struct completion *vfork)
{
int killed;
...
killed = wait_for_completion_killable(vfork);
...
if (killed) {
task_lock(child);
child->vfork_done = NULL;
task_unlock(child);
}
...
return killed;
}
父进程会阻塞在完成变量上等待子进程唤醒。子进程结束时,在mm_release函数中,执行唤醒:
if (tsk->vfork_done)
complete_vfork_done(tsk);
static void complete_vfork_done(struct task_struct *tsk)
{
struct completion *vfork;
task_lock(tsk);
vfork = tsk->vfork_done;
if (likely(vfork)) {
tsk->vfork_done = NULL;
complete(vfork);
}
task_unlock(tsk);
}
if分支语句的含义是,如果期间父进程还没有被杀死,那么才去唤醒父进程。
5. 顺序锁seqlock
下面是顺序锁的定义。
typedef struct {
/*
* Make sure that readers don't starve writers on PREEMPT_RT: use
* seqcount_spinlock_t instead of seqcount_t. Check __SEQ_LOCK().
*/
seqcount_spinlock_t seqcount;
spinlock_t lock;
} seqlock_t;
顺序锁颇有点乐观并发控制的意味,但有所不同。顺序锁通常用于读写共享数据,写者写入数据时必须拿到锁,但读者读入数据时不会等锁,而是直接获取,通过判断读取前后顺序锁中的序列号是否相同,来判断期间是否发生了写入操作。如果是的话,会重新读取。
顺序锁是一个偏向写者的锁。它最有力的例子是jiffies_64。在32位机上,读取64位长的jiffies_64不是原子操作,所以需要同步机制。
#if (BITS_PER_LONG < 64)
u64 get_jiffies_64(void)
{
unsigned int seq;
u64 ret;
do {
seq = read_seqcount_begin(&jiffies_seq);
ret = jiffies_64;
} while (read_seqcount_retry(&jiffies_seq, seq));
return ret;
}
EXPORT_SYMBOL(get_jiffies_64);
#endif
6. percpu数据
在多处理器上,不仅有CPU自身并发访问的问题,还有多处理器并发的可能。针对后者,可以规定一个特定的数据块在某一时刻为某处理器所特有;针对前者,可以进一步禁止该处理器的抢占。
6.1. 编译时percpu数据
<linux/percpu-defs.h>定义了一些相关的接口。首先可以定义percpu变量:
DEFINE_PER_CPU(type, name);
接下来是它的使用,例如为一个int型的percpu变量自增:
get_cpu_var(name)++;
put_cpu_var(name);
前者在返回变量的同时禁止抢占,后者则启用抢占。
/*
* Must be an lvalue. Since @var must be a simple identifier,
* we force a syntax error here if it isn't.
*/
#define get_cpu_var(var) \
(*({ \
preempt_disable(); \
this_cpu_ptr(&var); \
}))
/*
* The weird & is necessary because sparse considers (void)(var) to be
* a direct dereference of percpu variable (var).
*/
#define put_cpu_var(var) \
do { \
(void)&(var); \
preempt_enable(); \
} while (0)
值得注意的是,编译时的percpu数据被链接器链到全局唯一的.data.percpu段中,因而无法在模块中使用。相反,应使用下面所述的动态创建的方式。
6.2. 运行时percpu数据
#define alloc_percpu(type) \
(typeof(type) __percpu *)__alloc_percpu(sizeof(type), \
__alignof__(type))
extern void __percpu *__alloc_percpu(size_t size, size_t align);
extern void free_percpu(void __percpu *__pdata);
用这样的操作就能产生percpu对象实例,alloc_percpu使得变量按给定类型的自然边界对齐。接下来可以对返回的指针进行操作,切记使用get_cpu_ptr和put_cpu_ptr函数,方法和上面的**_var函数很类似。
6.3. 使用percpu数据的好处
首先,不用上锁了。一方面约定该数据只有单CPU可访问,另一方面禁止自身抢占,因而杜绝了并发访问的可能性。关抢占的代价比上锁要小很多。不过,只是约定。还是那句话,内核完全信赖自己,编程的时候一定要自觉。
其次,缓存变得更加稳定。设想一个数据在多个CPU上有缓存,在一个CPU上改动后,其他CPU上的缓存都会失效。有时,这样的缓存可能只是无意带入其它CPU的,却影响了其它CPU的性能。对此,percpu数据创建时按照缓存对齐,确保不会有多个处理器的数据位于同一cacheline上。
综上,percpu数据可以省去不少上锁的工夫,并提高程序性能,而且不管在中断上下文还是进程上下文使用都很安全。但是不能在访问percpu数据的时候睡眠——否则醒来的时候可能已经到了其他处理器上了,数据会不同步,就像这样:进程睡眠的时候还在4号CPU,醒来的时候到了6号,数据和原来就不一样了。

参考资料
朴素,但真的很管用的内核文档:mutex-design
When should you disable all interrupts in a program? – Stack Exchange