同步操作: "同步操作"是指在计算机科学和信息技术中的一种操作方式,其中不同的任务或操作按顺序执行,一个操作完成后才能开始下一个操作。在多线程编程中,各个任务通常需要通过同步设施进行相互协调和等待,以确保数据的一致性正确性

等待事件或条件

  • 等待事件发生或者条件成立的三种方式:
    • 忙等待(自旋)
    • 延时等待
    • 条件变量

条件变量

  • C++标准库中提供了条件变量的两种实现方法: std::condition_variablestd::condition_variable_any,其中std::condition_variable必须工作在 unique_lock上面,但是 std::condition_variable_any可以工作与任何可锁定对象上面,但是开销比较大,所以一般情况下使用 std::condition_variable,使用例子如下:
/* 条件变量 condition_variable 的使用 */
#include<iostream>
#include<thread>
#include<condition_variable>
#include<mutex>
using namespace std;

mutex mtx;
condition_variable cv;
bool arrived;

void wait_for_arrival() {
    unique_lock<mutex> lck{mtx};
    cv.wait(lck , []() { return arrived; });
    cout << "到达目的地,准备下车了 ..." << endl;
}

void simulate_arrival() {
    this_thread::sleep_for(chrono::seconds(5));
    {
        lock_guard<mutex> l(mtx);
        arrived = true;
    }

    cv.notify_one();
}


int main() {
    thread t1{wait_for_arrival};
    thread t2{simulate_arrival};
    t1.join();
    t2.join();
}

注意到unique_locklock_guard其实都是对于互斥锁的包装,底层都需要使用mutex初始化,其中条件变量 cv.wait(lck , []{return arrival;}); 会首先释放掉底层的互斥锁,之后等待条件满足重新上锁(这里和POSIX中的条件变量不同它,条件不需要循环等待),其中wait函数的声明如下:

void wait(std::unique_lock<std::mutex>& lock);                 // 1

template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred); // 2

其实就等价于:

while(!pred()) {
	wait(lock);
}

线程安全的队列

  • 利用条件变量可以实现线程安全的队列,并且基于该数据结构实现生产-消费者模型:
#include <chrono>
#include<iostream>
#include<queue>
#include<mutex>
#include<condition_variable>
#include <ratio>
#include<thread>
#include<memory>
using namespace std;

template<typename T>
class threadsafe_queue {
private:
    mutable mutex m;
    condition_variable data_cond;
    queue<T> data_queue;
public:
    void push(T value) {
        {
            lock_guard<mutex> l{m};
            data_queue.push(value);
            cout << "thread: " << this_thread::get_id() << " push " << value << " into the queue " << endl;
        } 
        data_cond.notify_one();
    }

    void pop(T& value) {
        unique_lock<mutex> u{m};
        data_cond.wait(u , [this]{ return !data_queue.empty(); });
        value = data_queue.front();
        cout << "thread: " << this_thread::get_id() << " get " << value << " from queue " << endl;
        data_queue.pop();
    }
    
    shared_ptr<T> pop() {
        unique_lock<mutex> u{m}; 
        data_cond.wait(u , [this]{ return !data_queue.empty(); });
        shared_ptr<T> res{make_shared<T>(data_queue.front())};
        data_queue.pop();
        return res;
    }

    bool empty() const {
        lock_guard<mutex> l{m};
        return data_queue.empty();
    }
};

void producer(threadsafe_queue<int>& q) {
   for(int i = 0 ; i < 5 ; i ++) {
        q.push(i);
        this_thread::sleep_for(chrono::seconds(2));
   } 
}

void consumer(threadsafe_queue<int>& q) {
    for(int i = 0 ; i < 10 ; i ++) {
        int value{};
        q.pop(value);
        this_thread::sleep_for(chrono::seconds(1));
    }
}

int main() {
    threadsafe_queue<int> q{};
    thread t1{producer , ref(q)};
    thread t2{consumer , ref(q)};
    t1.join();
    t2.join();
}

同时使用条件变量可以实现通知的效果,比如一个线程在等待一个结果的时候,另外一个线程完成操作之后可以通过条件变量来通知阻塞的线程

使用 std::future

  • C++中的std::future用于处理线程中需要等待的某一个事件的情况,线程直到预期结果,等待的同时也可以执行其他的任务(好相类似于js中的promise),标准库中有两种 future , 包含独占的std::future和共享的 std::shared_future,其中std::future只可以与单个指定事件关联,而std::shared_future可以关联多个事件,并且当多个线程需要访问一个独立的 future对象的时候,必须使用互斥量或者类似的同步机制来保护,但是当多个线程访问同一个共享状态,若每一个线程都是通过自身的shared_futrue对象副本进行访问,则是线程安全的,注意std::futrue<T>中的模板参数是一部任务函数的返回值,一个简单的使用例子如下:
/* std::future 的基本使用 */
#include <chrono>
#include <ios>
#include<iostream>
#include<thread>
#include<future>
using namespace std;

int task(int n) {
    cout << "异步任务 ID: " << this_thread::get_id() << endl;
    this_thread::sleep_for(chrono::seconds(5));
    return n * n;
}

int main() {
    future<int> future = async(task , 10);    
    cout << "main: " << this_thread::get_id() << endl;
    cout << boolalpha << future.valid() << endl;
    cout << future.get() << endl;
    cout << boolalpha << future.valid() << endl;
}

同时,std::async也支持各种可调用对象类型,实例如下:

/* async 参数测试 */
#include<iostream>
#include<future>

using namespace std;

class X {
public:
    int operator()(int n) const {
        return n * n;
    }
};

class Y {
public:
    int task_y(int n) {
        return n * n * n;
    } 
};

int f(int& p) {
    return p * p;
}

int main(int argc , char** argv) {
    Y y;
    int n = 9;
    auto t1 = async(X{} , 10);
    auto t2 = async(&Y::task_y , &y , 8);
    auto t3 = async([]{return -1;});
    auto t4 = async(f , ref(n));

    cout << "t1: " << t1.get() << endl;
    cout << "t2: " << t2.get() << endl;
    cout << "t3: " << t3.get() << endl;
    cout << "t4: " << t4.get() << endl;
    return 0;
}

考虑如下情况:

void f(const int& p) {}
void f2(int& p ){}

int n = 0;
std::async(f, n);   // OK! 可以通过编译,不过引用的并非是局部的n
std::async(f2, n);  // Error! 无法通过编译

由于临时对象可以绑定在const修饰的左值上面,而不可以绑定在非const修饰的左值上面,所以下面不可以通过编译

  • 同时对于只支持移动构造或者移动赋值的对象,需要使用移动构造函数

std::async的执行策略

  • 三种执行策略:
    • std::launch::async 在不同线程上执行异步任务
    • std::launch::deferred 惰性求值,不创建线程,等待future对象调用waitget成员函数的时候执行任务
    • std::launch::async | std::launch::deferred 根据实际情况选择执行方法,比如可以根据系统的资源剩余情况选择
  • 使用举例如下:
/* std::async 的执行策略 */
#include<iostream>
#include<future>
#include <thread>
using namespace std;

void f1() {
    cout << "异步任务 f1 开始执行 ..." << "thread: " << this_thread::get_id() <<  endl;
    this_thread::sleep_for(chrono::seconds(3));
}
int main() {
    auto ff1 = async(launch::deferred , f1);    
    cout << "没有开始异步任务 f1" << "main: " << this_thread::get_id() << endl;
    ff1.wait();
    auto ff2 = async(launch::async , f1);
    auto ff3 = async(launch::async | launch::deferred , f1);
}

其中std::launch::deferred相当于不会开始新的线程执行任务,相当于回调函数

std::async的常见问题

  1. 如果使用std::async构造的对象没有移动或者没有绑定到引用,那么完整表达式的结尾std::future的析构函数就会被阻塞,直到异步任务完成,所以导致这样无法创建异步任务:
std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
std::async(std::launch::async, []{ g(); }); // f() 完成前不开始
  1. 被移动的std::future没有所有权,失去了共享状态,不可以调用getwait成员函数,举例如下:
auto t = std::async([] {});
std::future<void> future{ std::move(t) };
t.wait();   // Error! 抛出异常

future 和 std::packaged_task

std::packaged_task
  • std::packaged_task用于包装任何可调用目标(函数,lambda表达时,bind表达时或者其他函数对象) ,使得可以异步调用它,其返回值或者抛出的异常可被存储于能够通过std::future对象访问的共享状态中
  • 使用std::packaged_task 并且获取返回值,需要首先拿到 std::packaged_task关联的std::future对象之后调用get()方法就可以拿到了,举例如下:
/* packaged_task 实例代码 */
#include<iostream>
#include<future>
#include <thread>
using namespace std;
int main() {
    packaged_task<int(int,int)> task([](int a, int b) {
            cout << "thread: " << this_thread::get_id() << endl;
            this_thread::sleep_for(chrono::seconds(2));
            return a * b * b;
            });

    future<int> f = task.get_future();
    task(10 , 2);
    cout << "main: " << this_thread::get_id() << endl;
    cout << "task: " << f.get() << endl;
}

注意这里没有创建新的线程执行异步任务,如果需要创建新的线程执行异步任务,需要关联执行任务的线程,使用方法如下:

/* packaged_task 实例代码 */
#include<iostream>
#include<future>
#include <thread>
using namespace std;
int main() {
    packaged_task<int(int,int)> task([](int a, int b) {
            cout << "thread: " << this_thread::get_id() << endl;
            this_thread::sleep_for(chrono::seconds(2));
            return a * b * b;
            });

    future<int> f = task.get_future();
    /*task(10 , 2);*/
    thread t{std::move(task) , 10 , 2};
    t.join();
    cout << "main: " << this_thread::get_id() << endl;
    cout << "task: " << f.get() << endl;
}

这是由于packaged_task<T>对象重载了operator(),是可调用对象,所以可以传递给std::thread执行,并且可以传递调用参数,并且注意到std::packaged_task对象是之可移动的,不可以赋值的,所以需要使用到移动语义

有了packaged_task,那么在使用多线程并行执行任务的时候就可以首先创建任务,之后关联std::future对象,最后创建线程执行了,最后就可以通过std::future获取到执行结果

使用 std::promise

  • 类模板std::promise用于存储一个值或者一个异常,之后通过std::promise对象创建的std::future对象异步获得,举例如下:
/* std::promise 使用教程 */
#include<iostream>
#include<future>
#include<thread>
using namespace std;

void calculate_square(promise<int> pj , int num) {
    this_thread::sleep_for(chrono::seconds(2));
    pj.set_value(num * num * num);
}

int main() {
    promise<int> promise;
    future<int> ff = promise.get_future();
    int num = 6;
    thread t{calculate_square , std::move(promise) , num};
    cout << "promise 中存储的结果是: " << ff.get() << endl;
    t.join();
}

处理可以设置线程运行时期的产生的值之外,还可以把子线程运行过程产生的异常传递到主线程中抛出并且处理,一般是通过set_exception函数来设置异常,同时并且可以通过cur_exception来设置当前产生的异常,处理代码如下:

void throw_function(std::promise<int> prom) {
    try {
        throw std::runtime_error("一个异常");
    }
    catch (...) {
        prom.set_exception(std::current_exception());
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(throw_function, std::move(prom));

    try {
        std::cout << "等待线程执行,抛出异常并设置\n";
        fut.get();
    }
    catch (std::exception& e) {
        std::cerr << "来自线程的异常: " << e.what() << '\n';
    }
    t.join();
}

注意如果共享状态的promise已经存储值得或者异常,再次调用set_value(set_exception)会抛出异常std::future_error异常,并且将错误码设置为 promise_already_satisfied , 这是由于 promise中只可以存储值或者异常中的其中一种,无法共存,也就是set_valueset_exception二选一,不可以共存,用于不做异常的实例如下:

/* std::promise同时存储异常和值的时候出现错误的情况 */
#include <exception>
#include<iostream>
#include<future>
#include <stdexcept>
#include<thread>
using namespace std;

void throw_function(promise<int> prom) {
   // prom.set_value(100);
    try {
        throw std::runtime_error("a exception");
    } catch(exception& e) {
        try {
            prom.set_exception(current_exception());
        } catch(exception& e) {
            cerr << "来自 set_exception的异常: " << e.what() << endl;
        }
    }
}

int main() {
    promise<int> prom;
    future<int> fut = prom.get_future();
    thread t{throw_function , std::move(prom)};
    cout << "value = " << fut.get() << endl;
    t.join();
}

future 的状态变化

需要注意的是,future是一次性的,所以需要注意移动,并且调用get函数之后,future对象就会失去共享状态

  • 移动语义:这一点很好理解并且常见,因为移动操作标志着所有权的转移,意味着 future 不再拥有共享状态(如之前所提到)。get 和 wait 函数要求 future 对象拥有共享状态,否则会抛出异常。
  • 共享状态失效:调用 get 成员函数时,future 对象必须拥有共享状态,但调用完成后,它就会失去共享状态,不能再次调用 get。这是我们在本节需要特别讨论的内容。 其中futureget函数的源代码如下:
// std::future<void>
void get() {
    // block until ready then return or throw the stored exception
    future _Local{_STD move(*this)};
    _Local._Get_value();
}
// std::future<T>
_Ty get() {
    // block until ready then return the stored result or throw the stored exception
    future _Local{_STD move(*this)};
    return _STD move(_Local._Get_value());
}
// std::future<T&>
_Ty& get() {
    // block until ready then return the stored result or throw the stored exception
    future _Local{_STD move(*this)};
    return *_Local._Get_value();
}

可以发现,底层通过RAII的方式,把std::future对象转移到局部变量身上,并且在函数退出的时候局部变量就会析构,如果需要多次get调用,可以使用 shared_future

多个线程等待的 std::shared_future

  • std::shared_futurestd::future的区别类似于 std::unique_ptrstd::shared_ptr之间的区别,后者可以复制并且多个对象可以指代同一个共享状态
  • 多个线程中对同一个 std::shared_future 对象进行操作时(如果没有进行同步保护)存在条件竞争。而从多个线程访问同一共享状态,若每个线程都是通过其自身的 shared_future 对象副本**进行访问,则是安全的。
  • 可以通过future对象的share方法获取到std::shared_future对象,使用实例如下:
/* std::shared_future 的使用 */
#include<iostream>
#include<future>
using namespace std;

string fetch_data() {
    return "hello world!";
}

int main() {
    future<string> fut = async(fetch_data);
    shared_future<string> fts = fut.share();

    thread t1 {
        [fts]() {
            cout << "thread1 wait ..." << endl;   
            fts.wait();
            cout << "thread1 get data: " << fts.get() << endl;
        }
    };

    thread t2 {
        [fts]() {
            cout << "thread2 wait ... " << endl;     
            fts.wait();
            cout << "thread2 get data: " << fts.get() << endl;
        }
    };

    t1.join();
    t2.join();
}

同样,还可以通过shared_future的构造函数构造这一个对象,其中的一个构造函数的参数就是std::future<T>&&可以使用函数的返回值作为参数构建 std::shared_future

限时等待

超时等待包含两种方式,一种是 "时间段" , 另外一种是 "时间点" , 也就是: this_thread::sleep_forthis_thread::sleep_until的区别,前者需要等待一段时间,后者需要等待到某一个时间点,处理持续时间的函数使用_for结尾,处理绝对时间的函数使用_until作为后缀,比如条件变量std::condition_variable的等待函数有两个超时的版本wait_forwait_until

时间库(chrono)

时钟(clock)

C++标准库中,时钟被时为时间信号的来源,C++中定义了时钟类型,每一种时钟类型都提供了四种不同的信息:

  • 当前时间: 通过静态成员now获取,比如std::chrono::system_clock::now()会返回系统的当前时间,特定的时间点可以通过time_point指定,system_clock::now()的返回类型就是time_point
  • 时间类型
  • 时钟节拍: 被指定为 1/x秒,有时间周期决定,其中 std::ratio<m,n>表示 m秒内n次,其中std::chrono::duration表示时间间隔,定义如下:
template<class Rep , class Period = std::ratio<1>>
class duration;   // 相当于固定 分母

注意标准库定义了很多时间类型,比如std::chrono::minutes就是分钟类型,定义如下:

using minutes = duration<int,ratio<60>>;
  • 稳定时钟: 稳定时钟的主要优点在于,它可以提供相对于起始时间的稳定的递增时间,因此适用于需要保持时间顺序和不受系统时间变化影响的应用场景。相比之下,像 std::chrono::system_clock 这样的系统时钟可能会受到系统时间调整或变化的影响,因此在某些情况下可能不适合对时间间隔进行精确测量。 但是在C++中,可以把任何时间类型转换为 time_t类型,转换和输出的方法如下:
/* std::chrono 表示时钟的方式 */
#include <ctime>
#include<iostream>
#include<chrono>
#include<iomanip>
using namespace std;

int main() {
    auto now = std::chrono::system_clock::now();
    time_t now_time = chrono::system_clock::to_time_t(now);
    cout << "Current time: " << put_time(localtime(&now_time) , "%H:%M:%S`\n");
}
时间段

注意使用类模板: std::chrono::duration,它用于对时间段进行处理,第一个参数表示类型,第二个参数表示节拍,需要传递一个std::ratio类型,并且标准库在std::chrono命名空间中为时间段提供了一系列的类型,都是通过std::chrono::duration定义的别名:

using nanoseconds  = duration<long long, nano>;
using microseconds = duration<long long, micro>;
using milliseconds = duration<long long, milli>;
using seconds      = duration<long long>;
using minutes      = duration<int, ratio<60>>;
using hours        = duration<int, ratio<3600>>;
// CXX20
using days   = duration<int, ratio_multiply<ratio<24>, hours::period>>;
using weeks  = duration<int, ratio_multiply<ratio<7>, days::period>>;
using years  = duration<int, ratio_multiply<ratio<146097, 400>, days::period>>;
using months = duration<int, ratio_divide<years::period, ratio<12>>>;

并且在C++14中引入了时间字面量,存在于std::chrono_literals命名空间中,如下:

using namespace std::chrono_literals;

auto one_nanosecond = 1ns;
auto one_microsecond = 1us;
auto one_millisecond = 1ms;
auto one_second = 1s;
auto one_minute = 1min;
auto one_hour = 1h;

进行时钟类型的转换可以使用std::chrono::duration_cast<>来完成,使用例子如下:

std::chrono::milliseconds ms{ 3999 };
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);
std::cout << s.count() << '\n';

注意到其实std::chrono::seconds类型和 std::chrono::duration<long long>类型是一样的,所以可以写成如下的形式:

std::chrono::duration<double> s = std::chrono::duration_cast<std::chrono::duration<double>>(ms);

同时也可以进行隐式类型转换,并且时间库可以进行运算,也就是首先根据count取得对象,之后进行运算再次构造对象即可,std::future对象可以通过wait_for方法来等待结果,使用方式如下:

std::future<int> future = std::async([] {return 6; });
if (future.wait_for(35ms) == std::future_status::ready)
    std::cout << future.get() << '\n';

三种状态如下:

  • deferred: 共享状态持有的函数正在延迟运行,结果将仅在明确请求时计算
  • ready: 共享状态就绪
  • timeout: 共享状态在经过指定的等待时间内仍没有就绪 举例如下:
auto future = std::async(std::launch::deferred, []{});
if (future.wait_for(35ms) == std::future_status::deferred)
    std::cout << "future_status::deferred " << "正在延迟执行\n";
future.wait(); // 在 wait() 或 get() 调用时执行,不创建线程
时间点

时间点可以使用 std::chrono::time_point<>来表示,第一个模板参数用来指定使用的时钟,第二个模板参数用来表示时间单位(std::chrono::duration<>) , 可以使用 now()成员函数获取到当前时间,返回类型为 std::chrono::time_point,类型定义如下:

template<
    class Clock,
    class Duration = typename Clock::duration
> class time_point;

默认根据第一个类型得到,比如:

std::chrono::time_point<std::chrono::system_clock>
// 上面等价于
std::chrono::time_point<std:::chrono::system_clock , std::chrono::system_clock::duration>
// 并且第二个参数的实际类型为
std::chrono::duration<long long,std::ratio<1,10000000>> // 100 ns 

并且时间点支持加减操作和比较操作,使用方式如下,时间点的计算方式如下:

/* std::chrono 中的 time_point 对象 */
#include <iomanip>
#include<iostream>
#include<chrono>
using namespace std;

int main() {
   auto now = chrono::system_clock::now();
   auto now_plus_one = now + chrono::hours(24);
   time_t now_t = chrono::system_clock::to_time_t(now);
   time_t now_after_day = chrono::system_clock::to_time_t(now_plus_one);
   cout << "现在的时间为: " << put_time(localtime(&now_t), "%Y-%m-%d %H:%M:%S") << endl;
   cout << "之后的时间为: " << put_time(localtime(&now_after_day), "%Y-%m-%d %H:%M:%S") << endl;
   return 0;
}

同时也可以利用时间点来统计代码执行的时间:

#include<iostream>
#include<thread>
#include<chrono>
using namespace std;

int main() {
    auto start = chrono::steady_clock::now();
    /*int j = 0;*/
    /*for(int i = 0 ; i < 10 ; i ++) {*/
    /*    j ++; */
    /*}*/
    this_thread::sleep_for(chrono::seconds(1));
    auto end = chrono::steady_clock::now();
    auto result = chrono::duration_cast<chrono::milliseconds>(end - start);
    cout << "花费时间为: " << result.count() << " ns" << endl;
}

同时超时时间可以配合条件变量或者std::futrue使用,注意超时的返回值:

#include <chrono>
#include<condition_variable>
#include<iostream>
#include<thread>
using namespace std;

condition_variable cv;
mutex m;
bool done = false;

bool wait_loop() {
    const auto timeout = chrono::steady_clock::now() + 500ms; 
    unique_lock<mutex> lk{m};
    while(!done) {
        if(cv.wait_until(lk , timeout) == cv_status::timeout) {
            cout << "thread: " << this_thread::get_id() << " continue ..." << endl;
            return false;
        }
    }
    cout << "thread: " << this_thread::get_id() << "successfully exit ... " << endl;
    return true;
}

void notify_task() {
    this_thread::sleep_for(chrono::seconds(4));
    unique_lock<mutex> lk{m};
    done = true;
    cv.notify_one();
}

int main() {
    thread t1{[](){
        while(!wait_loop()); 
    }};  

    thread t2{notify_task};

    t1.join();
    t2.join();
}

C++20信号量

信号量是一个非常轻量简单的同步设施,它维护一个计数,这个计数不能小于 0。信号量提供两种基本操作:释放(增加计数)和等待(减少计数)。如果当前信号量的计数值为 0,那么执行“等待”操作的线程将会一直阻塞,直到计数大于 0,也就是其它线程执行了“释放”操作。

  • C++20中提供了两个信号量类型,std::counting_semaphorestd::binary_semaphore,定义与<semaphore>头文件中,并且binary_semaphore只是std::counting_semaphore的一个别名,也就是一个二值信号量:
using binary_semaphore = counting_semaphore<1>;

使用例子如下:

// 全局二元信号量对象
// 设置对象初始计数为 0
std::binary_semaphore smph_signal_main_to_thread{ 0 };
std::binary_semaphore smph_signal_thread_to_main{ 0 };

void thread_proc() {
    smph_signal_main_to_thread.acquire();
    std::cout << "[线程] 获得信号" << std::endl;

    std::this_thread::sleep_for(3s);

    std::cout << "[线程] 发送信号\n";
    smph_signal_thread_to_main.release();
}

int main() {
    std::jthread thr_worker{ thread_proc };

    std::cout << "[主] 发送信号\n";
    smph_signal_main_to_thread.release();

    smph_signal_thread_to_main.acquire();
    std::cout << "[主] 获得信号\n";
}

注意在利用信号量实现等待-消费者模型的时候一定需要互斥锁一定是需要保护共享数据,所以加上互斥锁的位置就是共享数据区操作的位置

C++20 闩和屏障

std::latch

闩(latch)和屏障(barrier)是线程协调机制,允许任何数量的线程阻塞直到期待数量的线程到达,闩不可以重复使用,但是屏障可以

  • std::latch: 单词使用的线程屏障
  • std::barrier: 可以复用的线程屏障
/* 闩 std::latch 的基本使用方法 */
#include <iostream>
#include <thread>
using namespace std;

std::latch work_start{3};
void work() {
  cout << "等待闩阻塞取消..." << endl;
  work_start.wait();
  cout << "闩的阻塞取消..." << endl;
}

int main() {
  std::jthread t{work};
  this_thread::sleep_for(3s);
  cout << "休眠结束..." << endl;
  work_start.count_down();
  work_start.count_down(2);
}

闩的作用,比如可以划分线程的工作区间,并且可以同步线程到同一个位置,例子如下:

std::latch latch{ 10 };

void f(int id) {
    //todo.. 脑补任务
    std::this_thread::sleep_for(1s);
    std::cout << std::format("线程 {} 执行完任务,开始等待其它线程执行到此处\n", id);
    latch.arrive_and_wait();
    std::cout << std::format("线程 {} 彻底退出函数\n", id);
}

int main() {
    std::vector<std::jthread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(f,i);
    }
}

其中arrive_and_wait , 等价于count_down() 和 wait()

std::barrier

std::barrier 和 std::latch 最大的不同是,前者可以在阶段完成之后将计数重置为构造时传递的值,而后者只能减少计数,使用例子如下:

std::barrier barrier{ 10,
    [n = 1]()mutable noexcept {std::cout << "\t第" << n++ << "轮结束\n"; }
};

void f(int start, int end){
    for (int i = start; i <= end; ++i) {
        std::osyncstream{ std::cout } << i << ' '; 
        barrier.arrive_and_wait(); // 减少计数并等待 解除阻塞时就重置计数并调用函数对象
        
        std::this_thread::sleep_for(300ms);
    }
}

int main(){
    std::vector<std::jthread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(f, i * 10 + 1, (i + 1) * 10);
    }
}

其中std::osyncstream也是C++20引入的,为了确保输出流在多线程环境中同步,避免除了数据竞争,而且将不使用任何方式穿插或者截断

同时标准库中提供了另外一个函数arrive_and_drop的作用,把当前计数和重置之后的技术减少 1

注意每一次循环结束调用的lambda函数(也就是第二个参数),必须声明为noexcept也就是不可以抛出异常

总结

  • 条件变量: std::condition_variable , 用于等待某一个特定条件满足,可以用于实现消费者-生产者模型
  • std::future对象可以看作一个在未来可能发生但是直到预期结果的时间,和下面这些对象同时使用:
    • std::async: 类似一个异步任务,可以把这一个对象和std::future绑定,并且支持延时调用和直接创建新线程调用两种方式,注意使用std::launch::deferred不会创建新线程,但是使用 std::launch::async会创建新的线程执行任务
    • std::paskaged_task: 相当于对于某一个任务的包装,并且可以利用get_future获取到关联的future对象,本质上也是一个可调用对象,调用结束之后把结果放在future中,注意本身不会创建线程,除非放在线程中执行,注意这一个对象是 move-only
    • std::promise的作用是存储线程运行过程中产生的变量和异常,可以使用get_future方法获取到关联的std::future对象,并且通过get方法获取到存储的值
    • std::shared_future可以多次使用,但是一定需要注意操作共享的shared_future的时候的线程安全问题(如果是复制的不用考虑) ,可以使用std::futureshare()方法获取到这一个对象
  • 时间库std::chrono明确时间点 time_point和时间间隔: std::chrono::duration<m , ratio<n , k>>的含义即可
  • C++20中的信号量和闩屏障了解即可