本篇内容介绍了“C#中怎么使用CAS实现无锁算法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
CAS 的基本概念
CAS(Compare-and-Swap)是一种多线程并发编程中常用的原子操作,用于实现多线程间的同步和互斥访问。 它操作通常包含三个参数:一个内存地址(通常是一个共享变量的地址)、期望的旧值和新值。
CompareAndSwap(内存地址,期望的旧值,新值)
CAS 操作会比较内存地址处的值与期望的旧值是否相等,如果相等,则将新值写入该内存地址; 如果不相等,则不进行任何操作。这个比较和交换的操作是一个原子操作,不会被其他线程中断。
CAS 通常是通过硬件层面的CPU指令实现的,其原子性是由硬件保证的。具体的实现方式根据环境会有所不同。
CAS 操作通常会有一个返回值,用于表示操作是否成功。返回结果可能是true或false,也可能是内存地址处的旧值。
相比于传统的锁机制,CAS 有一些优势:
原子性:CAS 操作是原子的,不需要额外的锁来保证多线程环境下的数据一致性,避免了锁带来的性能开销和竞争条件。
无阻塞:CAS 操作是无阻塞的,不会因为资源被锁定而导致线程的阻塞和上下文切换,提高了系统的并发性和可伸缩性。
适用性:CAS 操作可以应用于广泛的数据结构和算法,如自旋锁、计数器、队列等,使得它在实际应用中具有较大的灵活性和适用性。
C# 中如何使用 CAS
在 C# 中,我们可以使用 Interlocked 类来实现 CAS 操作。
Interlocked 类提供了一组 CompareExchange 的重载方法,用于实现不同类型的数据的 CAS 操作。
public static int CompareExchange(ref int location1, int value, int comparand); public static long CompareExchange(ref long location1, long value, long comparand); // ... 省略其他重载方法 public static object CompareExchange(ref object location1, object value, object comparand); public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;
CompareExchange 方法将 location1 内存地址处的值与 comparand 比较,如果相等,则将 value 写入 location1 内存地址处,否则不进行任何操作。
该方法返回 location1 内存地址处的值。
通过判断方法返回值与 comparand 是否相等,我们就可以知道 CompareExchange 方法是否执行成功。
算法示例
在使用 CAS 实现无锁算法时,通常我们不光是为了比较和更新一个数据,还需要在更新成功后进行下一步的操作。结合 while(true) 循环,我们可以不断地尝试更新数据,直到更新成功为止。
伪代码如下:
while (true) { // 读取数据 oldValue = ...; // 计算新值 newValue = ...; // CAS 更新数据 result = CompareExchange(ref location, newValue, oldValue); // 判断 CAS 是否成功 if (result == oldValue) { // CAS 成功,执行后续操作 break; } }
在复杂的无锁算法中,因为每一步操作都是独立的,连续的操作并非原子,所以我们不光要借助 CAS,每一步操作前都应判断是否有其他线程已经修改了数据。
示例1:计数器
下面是一个简单的计数器类,它使用 CAS 实现了一个线程安全的自增操作。
public class Counter { private int _value; public int Increment() { while (true) { int oldValue = _value; int newValue = oldValue + 1; int result = Interlocked.CompareExchange(ref _value, newValue, oldValue); if (result == oldValue) { return newValue; } } } }
CLR 底层源码中,我们也会经常看到这样的代码,比如 ThreadPool 增加线程时的计数器。https://github.com/dotnet/runtime/blob/release/6.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L446
internal void EnsureThreadRequested() { // // If we have not yet requested #procs threads, then request a new thread. // // CoreCLR: Note that there is a separate count in the VM which has already been incremented // by the VM by the time we reach this point. // int count = _separated.numOutstandingThreadRequests; while (count < Environment.ProcessorCount) { int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count); if (prev == count) { ThreadPool.RequestWorkerThread(); break; } count = prev; } }
示例2:队列
下面是一个简单的队列类,它使用 CAS 实现了一个线程安全的入队和出队操作。相较于上面的计数器,这里的操作更加复杂,我们每一步都需要考虑是否有其他线程已经修改了数据。
这样的算法有点像薛定谔的猫,你不知道它是死是活,只有当你试图去观察它的时候,它才可能会变成死或者活。
public class ConcurrentQueue<T> { // _head 和 _tail 是两个伪节点,_head._next 指向队列的第一个节点,_tail 指向队列的最后一个节点。 // _head 和 _tail 会被多个线程修改和访问,所以要用 volatile 修饰。 private volatile Node _head; private volatile Node _tail; public ConcurrentQueue() { _head = new Node(default); // _tail 指向 _head 时,队列为空。 _tail = _head; } public void Enqueue(T item) { var node = new Node(item); while (true) { Node tail = _tail; Node next = tail._next; // 判断给 next 赋值的这段时间,是否有其他线程修改过 _tail if (tail == _tail) { // 如果 next 为 null,则说明从给 tail 赋值到给 next 赋值这段时间,没有其他线程修改过 tail._next, if (next == null) { // 如果 tail._next 为 null,则说明从给 tail 赋值到这里,没有其他线程修改过 tail._next, // tail 依旧是队列的最后一个节点,我们就可以直接将 node 赋值给 tail._next。 if (Interlocked.CompareExchange(ref tail._next, node, null) == null) { // 如果_tail == tail,则说明从上一步 CAS 操作到这里,没有其他线程修改过 _tail,也就是没有其他线程执行过 Enqueue 操作。 // 那么当前线程 Enqueue 的 node 就是队列的最后一个节点,我们就可以直接将 node 赋值给 _tail。 Interlocked.CompareExchange(ref _tail, node, tail); break; } } // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next, else { // 如果没有其他线程修改过 _tail,那么 next 就是队列的最后一个节点,我们就可以直接将 next 赋值给 _tail。 Interlocked.CompareExchange(ref _tail, next, tail); } } } } public bool TryDequeue(out T item) { while (true) { Node head = _head; Node tail = _tail; Node next = head._next; // 判断 _head 是否被修改过 // 如果没有被修改过,说明从给 head 赋值到给 next 赋值这段时间,没有其他线程执行过 Dequeue 操作。 if (head == _head) { // 如果 head == tail,说明队列为空 if (head == tail) { // 虽然上面已经判断过队列是否为空,但是在这里再判断一次 // 是为了防止在给 tail 赋值到给 next 赋值这段时间,有其他线程执行过 Enqueue 操作。 if (next == null) { item = default; return false; } // 如果 next 不为 null,则说明从给 tail 赋值到给 next 赋值这段时间,有其他线程修改过 tail._next,也就是有其他线程执行过 Enqueue 操作。 // 那么 next 就可能是队列的最后一个节点,我们尝试将 next 赋值给 _tail。 Interlocked.CompareExchange(ref _tail, next, tail); } // 如果 head != tail,说明队列不为空 else { item = next._item; if (Interlocked.CompareExchange(ref _head, next, head) == head) { // 如果 _head 没有被修改过 // 说明从给 head 赋值到这里,没有其他线程执行过 Dequeue 操作,上面的 item 就是队列的第一个节点的值。 // 我们就可以直接返回。 break; } // 如果 _head 被修改过 // 说明从给 head 赋值到这里,有其他线程执行过 Dequeue 操作,上面的 item 就不是队列的第一个节点的值。 // 我们就需要重新执行 Dequeue 操作。 } } } return true; } private class Node { public readonly T _item; public Node _next; public Node(T item) { _item = item; } } }
我们可以通过以下代码来进行测试
using System.Collections.Concurrent; var queue = new ConcurrentQueue<int>(); var results = new ConcurrentBag<int>(); int dequeueRetryCount = 0; var enqueueTask = Task.Run(() => { // 确保 Enqueue 前 dequeueTask 已经开始运行 Thread.Sleep(10); Console.WriteLine("Enqueue start"); Parallel.For(0, 100000, i => queue.Enqueue(i)); Console.WriteLine("Enqueue done"); }); var dequeueTask = Task.Run(() => { Thread.Sleep(10); Console.WriteLine("Dequeue start"); Parallel.For(0, 100000, i => { while (true) { if (queue.TryDequeue(out int result)) { results.Add(result); break; } Interlocked.Increment(ref dequeueRetryCount); } }); Console.WriteLine("Dequeue done"); }); await Task.WhenAll(enqueueTask, dequeueTask); Console.WriteLine( $"Enqueue and dequeue done, total data count: {results.Count}, dequeue retry count: {dequeueRetryCount}"); var hashSet = results.ToHashSet(); for (int i = 0; i < 100000; i++) { if (!hashSet.Contains(i)) { Console.WriteLine("Error, missing " + i); break; } } Console.WriteLine("Done");
输出结果:
Dequeue start
Enqueue start
Enqueue done
Dequeue done
Enqueue and dequeue done, total data count: 100000, dequeue retry count: 10586
Done
上述的 retry count 为 797,说明在 100000 次的 Dequeue 操作中,有 10586 次的 Dequeue 操作需要重试,那是因为在 Dequeue 操作中,可能暂时没有数据可供 Dequeue,需要等待其他线程执行 Enqueue 操作。
当然这个 retry count 是不稳定的,因为在多线程环境下,每次执行的结果都可能不一样。