介绍内存模型和原子操作

  • 内存模型定义了多线程程序中,读写操作如何在不同线程之间可见,以及这些操作在和中顺序下执行,内存模型确保程序的行为在并发环境下是不可预测的
  • 原子操作即不可分割的操作,系统所有的线程不可能观察到原子操作完成了一半(可以吧原子指令理解为一条汇编指令)

原子类型 std::atomic

原子类型的实现可以基于锁或者基于无锁实现(比如CAS) , 每一个原子类型中都含有一个is_lock_free成员函数,表示是否是使用无锁技术实现的,并且在C++17中引入了静态成员 static constexpr数据成员: is_always_lock_free,如果当前环境类型为无锁类型,就会返回true ,并且提供了一组宏定义在编译时对于各种整数原子类型是否无锁进行判断:

// (C++11 起)
#define ATOMIC_BOOL_LOCK_FREE     /* 未指定 */
#define ATOMIC_CHAR_LOCK_FREE     /* 未指定 */
#define ATOMIC_CHAR16_T_LOCK_FREE /* 未指定 */
#define ATOMIC_CHAR32_T_LOCK_FREE /* 未指定 */
#define ATOMIC_WCHAR_T_LOCK_FREE  /* 未指定 */
#define ATOMIC_SHORT_LOCK_FREE    /* 未指定 */
#define ATOMIC_INT_LOCK_FREE      /* 未指定 */
#define ATOMIC_LONG_LOCK_FREE     /* 未指定 */
#define ATOMIC_LLONG_LOCK_FREE    /* 未指定 */
#define ATOMIC_POINTER_LOCK_FREE  /* 未指定 */
// (C++20 起)
#define ATOMIC_CHAR8_T_LOCK_FREE  /* 未指定 */

可以利用预处理指令来判断当前环境是否无锁,宏定义的值和是否有锁的关系如下:

  • 对于一定有锁的内建原子类型是 0
  • 对于有时无锁的内建原子类型是 1
  • 对于一定无锁的内建原子类型是 2
/*  利用预处理指令来确定是否利用锁实现 std::atomic  */
#include<iostream>
#include<atomic>
using namespace std;


int main() {

    if constexpr(atomic<int>::is_always_lock_free) {
        cout << "当前环境 std::atomic<int> 始终无锁 " << endl;
    } else {
        cout << "当前环境 std::atomic<int> 并非无锁 " << endl;
    }

#if ATOMIC_INT_LOCK_FREE == 2
    cout << "int 类型的原子变量一定是无锁的 " << endl;
#elif ATOMIC_INT_LOCK_FREE == 1
    cout << "int 类型的原子变量有时是无锁的 " << endl;
#else 
    cout << "int 类型的原子变量一定是有锁的 " << endl;
#endif
}

注意上面使用了C++17特性: if constexpr, 可以在编译时期进行判断 另外除了使用原子变量本身之外还可以使用原子类型的别名,别名类型如下(其实不使用别名感觉还好记一些):

using atomic_char   = atomic<char>;
using atomic_schar  = atomic<signed char>;
using atomic_uchar  = atomic<unsigned char>;
using atomic_short  = atomic<short>;
using atomic_ushort = atomic<unsigned short>;
using atomic_int    = atomic<int>;
using atomic_uint   = atomic<unsigned int>;
using atomic_long   = atomic<long>;
using atomic_ulong  = atomic<unsigned long>;
using atomic_llong  = atomic<long long>;
using atomic_ullong = atomic<unsigned long long>;

虽然原子变量不可以移动,但是可以通过隐式转换成对应的内置类型,这是由于它具有转换函数,定义如下:

atomic(const atomic&) = delete;
atomic& operator=(const atomic&) = delete;
operator T() const noexcept;  // 转换函数

并且可以使用 load() , store() , exchange() , compare_exchange_weak()compare_exchange_strong() 等成员函数对于 std::atomic进行操作,如果是整数类型年还支持,各种运算符操作,比如 ++ -- += -= &= |= ^= , fetch_add,fetch_sub等操作方式 , 同时自定义类型也可以创建对应的原子对象,不过由于是通用模板,操作仅限于 load() ...等,比如利用元自变量来操作自定义类型的方法如下:

/* 原子类型操作自定义类型 */
#include <future>
#include<iostream>
#include<atomic>
using namespace std;

struct trivial_type {
    int x{};
    float y{};

    trivial_type() {}

    trivial_type(int a, float b) : x{ a }, y{ b } {}

    trivial_type(const trivial_type& other) = default;

    trivial_type& operator=(const trivial_type& other) = default;

    ~trivial_type() = default;
};

int main() {
    atomic<trivial_type> atomic_my_type {trivial_type{10 , 20.5f}};    
    trivial_type new_value {30 , 40.5f};
    atomic_my_type.store(new_value);
    trivial_type loadedValue = atomic_my_type.load();
    cout << "x = " << loadedValue.x << " y = " << loadedValue.y << endl;
    // 使用 exchange 操作 
    trivial_type exchanged_value = atomic_my_type.exchange(trivial_type{50 , 60.5f});
    cout << "交换之前 x = " << exchanged_value.x << " 交换之前 y = " << exchanged_value.y << endl;  
    cout << "交换之后 x = " << atomic_my_type.load().x << " 交换之后 y = " << atomic_my_type.load().y << endl;
}

感觉atomic<T>存储一个数字,利用exchange可以把数字交换进去(好像这才是正确的理解),当成一个普通的类型只是支持原子操作而已

注意每一个原子操作的每一个操作函数都有一个内存序参数,用于指定执行顺序,操作的类型分为如下三种:

  1. Store 操作(存储操作):可选的内存序包括 memory_order_relaxedmemory_order_releasememory_order_seq_cst
  2. Load 操作(加载操作):可选的内存序包括 memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_seq_cst
  3. Read-modify-write(读-改-写)操作:可选的内存序包括 memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst。 并且注意到任何 std::atomic类型,初始化都不是原子操作(可以使用 std::call_once来保证初始化的一致性)

std::atomic_flag

std::atomic_flag是最简单的原子类型,这一个类型的对象可以在两种状态之前切换,设置(true)或者清除(false)
初始化方式: 在 C++20之后可以使用默认构造函数初始化,之前需要使用 ATOMIC_FLAG_INIT宏定义来初始化为清除状态 ,并且常用的初始化方法如下(初始化为 清除 false 类型):

std::atomic_flag f ATOMIC_FLAG_INIT;
std::atomic_flag f2 = {};
std::atomic_flag f3{};
std::atomic_flag f4{ 0 };

对象初始化完成之后就只能做三件事:

  • clear(清除): 将标志对象的状态原子地更改为清除(false)
  • test_and_set(测试并设置): 将标志对象的状态原子地更改为设置,并且返回他原来保存的值
  • 销毁: 对象的生命周期结束自动调用析构函数进行销毁操作 并且注意到所有的原子类型的变量都是不可复制,不可移动不可赋值的,这是由于对于两个对象的移动复制等操作需要涉及多个操作,这些操作组合之后破坏了原子性

同时可以使用 std::atomic_flag来实现自旋锁,也就是在等待锁的过程中不会放弃CPU,而是持续检查锁的状态,自旋锁的响应更快但是睡眠锁更加节约资源高效

/* 自旋锁的实现 */
#include<iostream>
#include<mutex>
#include<thread>
#include<atomic>
using namespace std;

class spinlock_mutex {
private:
    atomic_flag flag; 
public:
    spinlock_mutex() noexcept = default;
    void lock() noexcept {
        while(flag.test_and_set(std::memory_order_acquire));
    }

    void unlock() noexcept {
        flag.clear(std::memory_order_release);
    }
};


spinlock_mutex m{};

void test() {
    lock_guard<spinlock_mutex> ls{m};
    cout << "thread: " << this_thread::get_id() << " get the thread!" << endl;
}

int main() {
    // g + C-A 可以递增数字
    thread t1{test};
    thread t2{test};
    thread t3{test};
    thread t4{test};
    thread t5{test};
    thread t6{test};

    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();
}

但是最好不要使用自旋锁,否则就会导致资源浪费严重的问题,但是std::atomic_flag的局限性比较强,所以一般使用 std::atomic<bool> 代替 std::atomic_flag

std::atomic<bool>

最基本的原子类型,相比于std::atomic_flag提供了更加完善的布尔标志,虽然同样不可复制不可移动,但是可以使用非原子的bool类型进行构造,初始化为 true或者 false, 并且可以从非原子的bool赋值给 std::atomic<bool>

std::atomic<bool> b{true};
b = false;  // b --> 普通的 bool 类型,底层的转换运算符

如果原子变量的赋值操作返回了一个引用,那么依赖这一个结果的代码需要显示的进行加早,从而保证数据的正确性,举例如下:

std::atomic<bool>b {true};
auto& ref = (b = false);  // 假设返回 atomic 引用
bool flag = ref.load();   // 那就必须显式调用 load() 加载

可以使用 load方法获取到原子变量的值,可以使用 store或者exchange来获取到替代std::atomic_flag中的test_and_set函数,使用实例如下:

/* std::atomic<bool> 类型的基本使用 */
#include<atomic>
using namespace std;

int main() {
    atomic<bool> b{false};
    bool x = b.load(std::memory_order_acquire);
    b.store(true);
    x = b.exchange(false , std::memory_order_acq_rel); 
}

同时std::atomic<bool>提供了多个读改写的操作,exchange只是其中之一,它还提供了另外一种存储方式,当前值和预期一致的时候存储新值,这一种操作叫做 "比较/交换" 表现形式为: compare_exchange_weakcompare_exchange_strong,两种类型的不同之处在于:

  • compare_exchange_weak:尝试将原子对象的当前值与预期值进行比较如果相等则将其更新为新值并返回 true;否则,将原子对象的值加载进 expected(进行加载操作)并返回 false此操作可能会由于某些硬件的特性而出现_假失败,需要在循环中重试
  • compare_exchange_strong:类似于 compare_exchange_weak但不会出现假失败,因此不需要重试。适用于需要确保操作成功的场合。 利用std::atomic<bool>实现的自旋锁如下:
/* 利用 std::atomic 实现自旋锁 */
#include<iostream>
#include<mutex>
#include<thread>
#include<atomic>
using namespace  std;

class spin_lock {
private:
    atomic<bool> lock_flag;
public:
    spin_lock(): lock_flag(false) {}
    spin_lock(const spin_lock& sl) = delete;
    spin_lock&  operator=(const spin_lock& sl) = delete;
    void lock() noexcept {
        while(lock_flag.exchange(true , std::memory_order_acquire));
    } 

    void unlock() noexcept {
        lock_flag.store(false , std::memory_order_release);
    }
};

spin_lock sl{};

void f() {
   lock_guard<spin_lock> lg{sl}; 
   cout << "thread: " << this_thread::get_id() << " get the controller!" << endl;
}

int main() {
    thread t1{f}; 
    thread t2{f}; 
    thread t3{f}; 
    thread t4{f}; 
    thread t5{f}; 
    thread t6{f}; 
    thread t7{f}; 
    thread t8{f}; 


    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();
    t7.join();
    t8.join();
}

std::atomic<T*>

std::atomic<T*>是一个原子指针类型,T是指针所指向的对象类型,操作是针对于T类型的指针进行的,不可以被拷贝或者移动,但是可以通过复合类型的指针进行构造和移动,成员函数如下:

  • load(): 以原子方式读取指针值
  • store(): 以原子方式存储指针值
  • exchange(): 以原子方式交换指针值
  • compare_exchange_weak()compare_exchange_strong(): 以原子方式比较并且交换指针值(这里可以参考 std::atomic<bool> 中的 compare and swap 操作) 这些函数接受并且返回的类型都是T*,另外还提供如下的操作:
  • fetch_add: 使用原子方式增加的指针的值,比如 p.fetch_add(1) 会将指针向前面移动一个元素,并且返回操作之前的指针值
  • fetch_sub: 以原子方式减少指针的值,并且返回之前的指针值
  • operator+= operator-=: 使用原子方式增加或者减少指针的值,返回操作之前的指针的值
/* std::atomic<T*> 类型的指针 */
#include<iostream>
#include<cassert>
#include<atomic>
using namespace std;

struct Foo {};

int main() {
    Foo array[5]{};
    atomic<Foo*> p{array};  
    Foo* x= p.fetch_add(2);  // 注意返回操作之前的指针值
    // 用于断言
    assert(x == array); 
    assert(p.load() == &array[2]);
    x = (p -= 1);  // 相当于 x = p - 1
    assert(x == &array[1]); 
    assert(p.load() == &array[1]);
}

`std::atomicstd::shared_ptr

std::shared_ptr不是线程安全的,在C++20中原子模板std::atomic引入了一个偏特化版本的 std::atomic<std::shared_ptr> ,允许用于原子操纵shared_ptr对象,并且对于它的所有操作都是原子操作

若多个执行线程不同步地同时访问同一个 std::shared_ptr对象,并且任何这些访问使用了 shared_ptr的非const成员函数,就会出现数据竞争,除非通过 std::atomic<std::shared_ptr> 的实例进行所有访问

产生竞争的原因就是std::shared_ptr底层包含两个指针,一个指向实例的指针,另外一个指向控制块的指针(包含执行管理对象的指针,删除器,分配器,持有被管理对象的 shared_ptr数量, 设计被管理对象的 weak_ptr的数量) , 指向控制块的指针是线程安全的,但是指向实例的指针和指向控制块的指针,atomic<shared_ptr<T>>的使用方式如下:

std::atomic<std::shared_ptr<Data>> data = std::make_shared<Data>();

void writer() {
    for (int i = 0; i < 10; ++i) {
        std::shared_ptr<Data> new_data = std::make_shared<Data>(i);
        data.store(new_data); // 原子地替换所保有的值
        std::this_thread::sleep_for(10ms);
    }
}

void reader() {
    for (int i = 0; i < 10; ++i) {
        if (auto sp = data.load()) {
            std::cout << "读取线程值: " << sp->get_value() << std::endl;
        }
        else {
            std::cout << "没有读取到数据" << std::endl;
        }
        std::this_thread::sleep_for(10ms);
    }
}

注意在使用 atomic<shared_ptr<T>>的时候,一定需要注意对于共享数据,一定不要把受保护的指针或者引用传递到互斥量作用域之外,如果需要对于其中的内容进行操作,可以使用 atomic_ref<T>类型来操作 atomic<shared_ptr<T>> 指向的数据

/* std::atomic_ref 的基本使用 */
#include<iostream>
#include<atomic>
#include<memory>

int main() {
    std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>(10);
    std::atomic_ref<int> ref{*ptr.load()}; 
    ref = 100;
    std::cout << *ptr.load() << std::endl;
}

但是好像在 C++20中才可以成功编译上面的代码

另外在 C++20 中,任何 atomic的特化都拥有这些成员函数,使用方式类似于条件变量,使用方式如下:

std::atomic<std::shared_ptr<int>> ptr = std::make_shared<int>();

void wait_for_wake_up(){
    std::osyncstream{ std::cout }
        << "线程 "
        << std::this_thread::get_id()
        << " 阻塞,等待更新唤醒\n";

    // 等待 ptr 变为其它值
    ptr.wait(ptr.load());

    std::osyncstream{ std::cout }
        << "线程 "
        << std::this_thread::get_id()
        << " 已被唤醒\n";
}

void wake_up(){
    std::this_thread::sleep_for(5s);

    // 更新值并唤醒
    ptr.store(std::make_shared<int>(10));
    ptr.notify_one();
}

内存次序

  • 指令重排:
    • 编译器的指令重排
    • CPU的指令重排
  • 内存次序指定了线程中的变量对于另外一个变量是否可见(也就是一个线程中的变量的修改对于另外一个线程是否可见) , 通过规定内存次序来指定,C++标准通过内存次序来规定两个线程中的数据的可见性,利用这一种可见性来防止数据竞争

std::memory_order

规定了指定原子操作的内存顺序,影响了这些操作的行为:

typedef enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
} memory_order;

// C++20 起则为:

enum class memory_order : /* 未指明 */ {
    relaxed, consume, acquire, release, acq_rel, seq_cst
};
inline constexpr memory_order memory_order_relaxed = memory_order::relaxed;
inline constexpr memory_order memory_order_consume = memory_order::consume;
inline constexpr memory_order memory_order_acquire = memory_order::acquire;
inline constexpr memory_order memory_order_release = memory_order::release;
inline constexpr memory_order memory_order_acq_rel = memory_order::acq_rel;
inline constexpr memory_order memory_order_seq_cst = memory_order::seq_cst;
  1. memory_order_relaxed 宽松定序:不是定序约束,仅对此操作要求原子性
  2. memory_order_seq_cst 序列一致定序,这是库中所有原子操作的默认行为,也是最严格的内存次序,是绝对安全的。

剩下的就是第三类。

另外不同的指令集架构中使用的内存模型也不一样,比如 x86架构中使用强一致模型,但是 ARM , RISC-V架构中使用弱序内存模型