同步操作: "同步操作"是指在计算机科学和信息技术中的一种操作方式,其中不同的任务或操作按顺序执行,一个操作完成后才能开始下一个操作。在多线程编程中,各个任务通常需要通过同步设施进行相互协调和等待,以确保数据的一致性和正确性。
等待事件或条件
- 等待事件发生或者条件成立的三种方式:
- 忙等待(自旋)
- 延时等待
- 条件变量
条件变量
C++
标准库中提供了条件变量的两种实现方法:std::condition_variable
和std::condition_variable_any
,其中std::condition_variable
必须工作在unique_lock
上面,但是std::condition_variable_any
可以工作与任何可锁定对象上面,但是开销比较大,所以一般情况下使用std::condition_variable
,使用例子如下:
/* 条件变量 condition_variable 的使用 */
#include<iostream>
#include<thread>
#include<condition_variable>
#include<mutex>
using namespace std;
mutex mtx;
condition_variable cv;
bool arrived;
void wait_for_arrival() {
unique_lock<mutex> lck{mtx};
cv.wait(lck , []() { return arrived; });
cout << "到达目的地,准备下车了 ..." << endl;
}
void simulate_arrival() {
this_thread::sleep_for(chrono::seconds(5));
{
lock_guard<mutex> l(mtx);
arrived = true;
}
cv.notify_one();
}
int main() {
thread t1{wait_for_arrival};
thread t2{simulate_arrival};
t1.join();
t2.join();
}
注意到unique_lock
和lock_guard
其实都是对于互斥锁的包装,底层都需要使用mutex
初始化,其中条件变量 cv.wait(lck , []{return arrival;});
会首先释放掉底层的互斥锁,之后等待条件满足重新上锁(这里和POSIX
中的条件变量不同它,条件不需要循环等待),其中wait
函数的声明如下:
void wait(std::unique_lock<std::mutex>& lock); // 1
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred); // 2
其实就等价于:
while(!pred()) {
wait(lock);
}
线程安全的队列
- 利用条件变量可以实现线程安全的队列,并且基于该数据结构实现生产-消费者模型:
#include <chrono>
#include<iostream>
#include<queue>
#include<mutex>
#include<condition_variable>
#include <ratio>
#include<thread>
#include<memory>
using namespace std;
template<typename T>
class threadsafe_queue {
private:
mutable mutex m;
condition_variable data_cond;
queue<T> data_queue;
public:
void push(T value) {
{
lock_guard<mutex> l{m};
data_queue.push(value);
cout << "thread: " << this_thread::get_id() << " push " << value << " into the queue " << endl;
}
data_cond.notify_one();
}
void pop(T& value) {
unique_lock<mutex> u{m};
data_cond.wait(u , [this]{ return !data_queue.empty(); });
value = data_queue.front();
cout << "thread: " << this_thread::get_id() << " get " << value << " from queue " << endl;
data_queue.pop();
}
shared_ptr<T> pop() {
unique_lock<mutex> u{m};
data_cond.wait(u , [this]{ return !data_queue.empty(); });
shared_ptr<T> res{make_shared<T>(data_queue.front())};
data_queue.pop();
return res;
}
bool empty() const {
lock_guard<mutex> l{m};
return data_queue.empty();
}
};
void producer(threadsafe_queue<int>& q) {
for(int i = 0 ; i < 5 ; i ++) {
q.push(i);
this_thread::sleep_for(chrono::seconds(2));
}
}
void consumer(threadsafe_queue<int>& q) {
for(int i = 0 ; i < 10 ; i ++) {
int value{};
q.pop(value);
this_thread::sleep_for(chrono::seconds(1));
}
}
int main() {
threadsafe_queue<int> q{};
thread t1{producer , ref(q)};
thread t2{consumer , ref(q)};
t1.join();
t2.join();
}
同时使用条件变量可以实现通知的效果,比如一个线程在等待一个结果的时候,另外一个线程完成操作之后可以通过条件变量来通知阻塞的线程
使用 std::future
C++
中的std::future
用于处理线程中需要等待的某一个事件的情况,线程直到预期结果,等待的同时也可以执行其他的任务(好相类似于js
中的promise
),标准库中有两种future
, 包含独占的std::future
和共享的std::shared_future
,其中std::future
只可以与单个指定事件关联,而std::shared_future
可以关联多个事件,并且当多个线程需要访问一个独立的future
对象的时候,必须使用互斥量或者类似的同步机制来保护,但是当多个线程访问同一个共享状态,若每一个线程都是通过自身的shared_futrue
对象副本进行访问,则是线程安全的,注意std::futrue<T>
中的模板参数是一部任务函数的返回值,一个简单的使用例子如下:
/* std::future 的基本使用 */
#include <chrono>
#include <ios>
#include<iostream>
#include<thread>
#include<future>
using namespace std;
int task(int n) {
cout << "异步任务 ID: " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::seconds(5));
return n * n;
}
int main() {
future<int> future = async(task , 10);
cout << "main: " << this_thread::get_id() << endl;
cout << boolalpha << future.valid() << endl;
cout << future.get() << endl;
cout << boolalpha << future.valid() << endl;
}
同时,std::async
也支持各种可调用对象类型,实例如下:
/* async 参数测试 */
#include<iostream>
#include<future>
using namespace std;
class X {
public:
int operator()(int n) const {
return n * n;
}
};
class Y {
public:
int task_y(int n) {
return n * n * n;
}
};
int f(int& p) {
return p * p;
}
int main(int argc , char** argv) {
Y y;
int n = 9;
auto t1 = async(X{} , 10);
auto t2 = async(&Y::task_y , &y , 8);
auto t3 = async([]{return -1;});
auto t4 = async(f , ref(n));
cout << "t1: " << t1.get() << endl;
cout << "t2: " << t2.get() << endl;
cout << "t3: " << t3.get() << endl;
cout << "t4: " << t4.get() << endl;
return 0;
}
考虑如下情况:
void f(const int& p) {}
void f2(int& p ){}
int n = 0;
std::async(f, n); // OK! 可以通过编译,不过引用的并非是局部的n
std::async(f2, n); // Error! 无法通过编译
由于临时对象可以绑定在const
修饰的左值上面,而不可以绑定在非const
修饰的左值上面,所以下面不可以通过编译
- 同时对于只支持移动构造或者移动赋值的对象,需要使用移动构造函数
std::async的执行策略
- 三种执行策略:
std::launch::async
在不同线程上执行异步任务std::launch::deferred
惰性求值,不创建线程,等待future
对象调用wait
或get
成员函数的时候执行任务std::launch::async | std::launch::deferred
根据实际情况选择执行方法,比如可以根据系统的资源剩余情况选择
- 使用举例如下:
/* std::async 的执行策略 */
#include<iostream>
#include<future>
#include <thread>
using namespace std;
void f1() {
cout << "异步任务 f1 开始执行 ..." << "thread: " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::seconds(3));
}
int main() {
auto ff1 = async(launch::deferred , f1);
cout << "没有开始异步任务 f1" << "main: " << this_thread::get_id() << endl;
ff1.wait();
auto ff2 = async(launch::async , f1);
auto ff3 = async(launch::async | launch::deferred , f1);
}
其中std::launch::deferred
相当于不会开始新的线程执行任务,相当于回调函数
std::async的常见问题
- 如果使用
std::async
构造的对象没有移动或者没有绑定到引用,那么完整表达式的结尾std::future
的析构函数就会被阻塞,直到异步任务完成,所以导致这样无法创建异步任务:
std::async(std::launch::async, []{ f(); }); // 临时量的析构函数等待 f()
std::async(std::launch::async, []{ g(); }); // f() 完成前不开始
- 被移动的
std::future
没有所有权,失去了共享状态,不可以调用get
和wait
成员函数,举例如下:
auto t = std::async([] {});
std::future<void> future{ std::move(t) };
t.wait(); // Error! 抛出异常
future 和 std::packaged_task
std::packaged_task
std::packaged_task
用于包装任何可调用目标(函数,lambda
表达时,bind
表达时或者其他函数对象) ,使得可以异步调用它,其返回值或者抛出的异常可被存储于能够通过std::future
对象访问的共享状态中- 使用
std::packaged_task
并且获取返回值,需要首先拿到std::packaged_task
关联的std::future
对象之后调用get()
方法就可以拿到了,举例如下:
/* packaged_task 实例代码 */
#include<iostream>
#include<future>
#include <thread>
using namespace std;
int main() {
packaged_task<int(int,int)> task([](int a, int b) {
cout << "thread: " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::seconds(2));
return a * b * b;
});
future<int> f = task.get_future();
task(10 , 2);
cout << "main: " << this_thread::get_id() << endl;
cout << "task: " << f.get() << endl;
}
注意这里没有创建新的线程执行异步任务,如果需要创建新的线程执行异步任务,需要关联执行任务的线程,使用方法如下:
/* packaged_task 实例代码 */
#include<iostream>
#include<future>
#include <thread>
using namespace std;
int main() {
packaged_task<int(int,int)> task([](int a, int b) {
cout << "thread: " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::seconds(2));
return a * b * b;
});
future<int> f = task.get_future();
/*task(10 , 2);*/
thread t{std::move(task) , 10 , 2};
t.join();
cout << "main: " << this_thread::get_id() << endl;
cout << "task: " << f.get() << endl;
}
这是由于packaged_task<T>
对象重载了operator()
,是可调用对象,所以可以传递给std::thread
执行,并且可以传递调用参数,并且注意到std::packaged_task
对象是之可移动的,不可以赋值的,所以需要使用到移动语义
有了packaged_task
,那么在使用多线程并行执行任务的时候就可以首先创建任务,之后关联std::future
对象,最后创建线程执行了,最后就可以通过std::future
获取到执行结果
使用 std::promise
- 类模板
std::promise
用于存储一个值或者一个异常,之后通过std::promise
对象创建的std::future
对象异步获得,举例如下:
/* std::promise 使用教程 */
#include<iostream>
#include<future>
#include<thread>
using namespace std;
void calculate_square(promise<int> pj , int num) {
this_thread::sleep_for(chrono::seconds(2));
pj.set_value(num * num * num);
}
int main() {
promise<int> promise;
future<int> ff = promise.get_future();
int num = 6;
thread t{calculate_square , std::move(promise) , num};
cout << "promise 中存储的结果是: " << ff.get() << endl;
t.join();
}
处理可以设置线程运行时期的产生的值之外,还可以把子线程运行过程产生的异常传递到主线程中抛出并且处理,一般是通过set_exception
函数来设置异常,同时并且可以通过cur_exception
来设置当前产生的异常,处理代码如下:
void throw_function(std::promise<int> prom) {
try {
throw std::runtime_error("一个异常");
}
catch (...) {
prom.set_exception(std::current_exception());
}
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(throw_function, std::move(prom));
try {
std::cout << "等待线程执行,抛出异常并设置\n";
fut.get();
}
catch (std::exception& e) {
std::cerr << "来自线程的异常: " << e.what() << '\n';
}
t.join();
}
注意如果共享状态的promise
已经存储值得或者异常,再次调用set_value
(set_exception
)会抛出异常std::future_error
异常,并且将错误码设置为 promise_already_satisfied
, 这是由于 promise
中只可以存储值或者异常中的其中一种,无法共存,也就是set_value
和set_exception
二选一,不可以共存,用于不做异常的实例如下:
/* std::promise同时存储异常和值的时候出现错误的情况 */
#include <exception>
#include<iostream>
#include<future>
#include <stdexcept>
#include<thread>
using namespace std;
void throw_function(promise<int> prom) {
// prom.set_value(100);
try {
throw std::runtime_error("a exception");
} catch(exception& e) {
try {
prom.set_exception(current_exception());
} catch(exception& e) {
cerr << "来自 set_exception的异常: " << e.what() << endl;
}
}
}
int main() {
promise<int> prom;
future<int> fut = prom.get_future();
thread t{throw_function , std::move(prom)};
cout << "value = " << fut.get() << endl;
t.join();
}
future 的状态变化
需要注意的是,future
是一次性的,所以需要注意移动,并且调用get
函数之后,future
对象就会失去共享状态
- 移动语义:这一点很好理解并且常见,因为移动操作标志着所有权的转移,意味着
future
不再拥有共享状态(如之前所提到)。get
和wait
函数要求future
对象拥有共享状态,否则会抛出异常。 - 共享状态失效:调用
get
成员函数时,future
对象必须拥有共享状态,但调用完成后,它就会失去共享状态,不能再次调用get
。这是我们在本节需要特别讨论的内容。 其中future
中get
函数的源代码如下:
// std::future<void>
void get() {
// block until ready then return or throw the stored exception
future _Local{_STD move(*this)};
_Local._Get_value();
}
// std::future<T>
_Ty get() {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move(*this)};
return _STD move(_Local._Get_value());
}
// std::future<T&>
_Ty& get() {
// block until ready then return the stored result or throw the stored exception
future _Local{_STD move(*this)};
return *_Local._Get_value();
}
可以发现,底层通过RAII
的方式,把std::future
对象转移到局部变量身上,并且在函数退出的时候局部变量就会析构,如果需要多次get
调用,可以使用 shared_future
多个线程等待的 std::shared_future
std::shared_future
和std::future
的区别类似于std::unique_ptr
和std::shared_ptr
之间的区别,后者可以复制并且多个对象可以指代同一个共享状态- 多个线程中对同一个
std::shared_future
对象进行操作时(如果没有进行同步保护)存在条件竞争。而从多个线程访问同一共享状态,若每个线程都是通过其自身的shared_future
对象副本**进行访问,则是安全的。 - 可以通过
future
对象的share
方法获取到std::shared_future
对象,使用实例如下:
/* std::shared_future 的使用 */
#include<iostream>
#include<future>
using namespace std;
string fetch_data() {
return "hello world!";
}
int main() {
future<string> fut = async(fetch_data);
shared_future<string> fts = fut.share();
thread t1 {
[fts]() {
cout << "thread1 wait ..." << endl;
fts.wait();
cout << "thread1 get data: " << fts.get() << endl;
}
};
thread t2 {
[fts]() {
cout << "thread2 wait ... " << endl;
fts.wait();
cout << "thread2 get data: " << fts.get() << endl;
}
};
t1.join();
t2.join();
}
同样,还可以通过shared_future
的构造函数构造这一个对象,其中的一个构造函数的参数就是std::future<T>&&
可以使用函数的返回值作为参数构建 std::shared_future
限时等待
超时等待包含两种方式,一种是 "时间段" , 另外一种是 "时间点" , 也就是: this_thread::sleep_for
和this_thread::sleep_until
的区别,前者需要等待一段时间,后者需要等待到某一个时间点,处理持续时间的函数使用_for
结尾,处理绝对时间的函数使用_until
作为后缀,比如条件变量std::condition_variable
的等待函数有两个超时的版本wait_for
和wait_until
时间库(chrono)
时钟(clock)
C++
标准库中,时钟被时为时间信号的来源,C++
中定义了时钟类型,每一种时钟类型都提供了四种不同的信息:
- 当前时间: 通过静态成员
now
获取,比如std::chrono::system_clock::now()
会返回系统的当前时间,特定的时间点可以通过time_point
指定,system_clock::now()
的返回类型就是time_point
- 时间类型
- 时钟节拍: 被指定为
1/x
秒,有时间周期决定,其中std::ratio<m,n>
表示m
秒内n
次,其中std::chrono::duration
表示时间间隔,定义如下:
template<class Rep , class Period = std::ratio<1>>
class duration; // 相当于固定 分母
注意标准库定义了很多时间类型,比如std::chrono::minutes
就是分钟类型,定义如下:
using minutes = duration<int,ratio<60>>;
- 稳定时钟: 稳定时钟的主要优点在于,它可以提供相对于起始时间的稳定的递增时间,因此适用于需要保持时间顺序和不受系统时间变化影响的应用场景。相比之下,像
std::chrono::system_clock
这样的系统时钟可能会受到系统时间调整或变化的影响,因此在某些情况下可能不适合对时间间隔进行精确测量。 但是在C++
中,可以把任何时间类型转换为time_t
类型,转换和输出的方法如下:
/* std::chrono 表示时钟的方式 */
#include <ctime>
#include<iostream>
#include<chrono>
#include<iomanip>
using namespace std;
int main() {
auto now = std::chrono::system_clock::now();
time_t now_time = chrono::system_clock::to_time_t(now);
cout << "Current time: " << put_time(localtime(&now_time) , "%H:%M:%S`\n");
}
时间段
注意使用类模板: std::chrono::duration
,它用于对时间段进行处理,第一个参数表示类型,第二个参数表示节拍,需要传递一个std::ratio
类型,并且标准库在std::chrono
命名空间中为时间段提供了一系列的类型,都是通过std::chrono::duration
定义的别名:
using nanoseconds = duration<long long, nano>;
using microseconds = duration<long long, micro>;
using milliseconds = duration<long long, milli>;
using seconds = duration<long long>;
using minutes = duration<int, ratio<60>>;
using hours = duration<int, ratio<3600>>;
// CXX20
using days = duration<int, ratio_multiply<ratio<24>, hours::period>>;
using weeks = duration<int, ratio_multiply<ratio<7>, days::period>>;
using years = duration<int, ratio_multiply<ratio<146097, 400>, days::period>>;
using months = duration<int, ratio_divide<years::period, ratio<12>>>;
并且在C++14
中引入了时间字面量,存在于std::chrono_literals
命名空间中,如下:
using namespace std::chrono_literals;
auto one_nanosecond = 1ns;
auto one_microsecond = 1us;
auto one_millisecond = 1ms;
auto one_second = 1s;
auto one_minute = 1min;
auto one_hour = 1h;
进行时钟类型的转换可以使用std::chrono::duration_cast<>
来完成,使用例子如下:
std::chrono::milliseconds ms{ 3999 };
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);
std::cout << s.count() << '\n';
注意到其实std::chrono::seconds
类型和 std::chrono::duration<long long>
类型是一样的,所以可以写成如下的形式:
std::chrono::duration<double> s = std::chrono::duration_cast<std::chrono::duration<double>>(ms);
同时也可以进行隐式类型转换,并且时间库可以进行运算,也就是首先根据count
取得对象,之后进行运算再次构造对象即可,std::future
对象可以通过wait_for
方法来等待结果,使用方式如下:
std::future<int> future = std::async([] {return 6; });
if (future.wait_for(35ms) == std::future_status::ready)
std::cout << future.get() << '\n';
三种状态如下:
deferred
: 共享状态持有的函数正在延迟运行,结果将仅在明确请求时计算ready
: 共享状态就绪timeout
: 共享状态在经过指定的等待时间内仍没有就绪 举例如下:
auto future = std::async(std::launch::deferred, []{});
if (future.wait_for(35ms) == std::future_status::deferred)
std::cout << "future_status::deferred " << "正在延迟执行\n";
future.wait(); // 在 wait() 或 get() 调用时执行,不创建线程
时间点
时间点可以使用 std::chrono::time_point<>
来表示,第一个模板参数用来指定使用的时钟,第二个模板参数用来表示时间单位(std::chrono::duration<>
) , 可以使用 now()
成员函数获取到当前时间,返回类型为 std::chrono::time_point
,类型定义如下:
template<
class Clock,
class Duration = typename Clock::duration
> class time_point;
默认根据第一个类型得到,比如:
std::chrono::time_point<std::chrono::system_clock>
// 上面等价于
std::chrono::time_point<std:::chrono::system_clock , std::chrono::system_clock::duration>
// 并且第二个参数的实际类型为
std::chrono::duration<long long,std::ratio<1,10000000>> // 100 ns
并且时间点支持加减操作和比较操作,使用方式如下,时间点的计算方式如下:
/* std::chrono 中的 time_point 对象 */
#include <iomanip>
#include<iostream>
#include<chrono>
using namespace std;
int main() {
auto now = chrono::system_clock::now();
auto now_plus_one = now + chrono::hours(24);
time_t now_t = chrono::system_clock::to_time_t(now);
time_t now_after_day = chrono::system_clock::to_time_t(now_plus_one);
cout << "现在的时间为: " << put_time(localtime(&now_t), "%Y-%m-%d %H:%M:%S") << endl;
cout << "之后的时间为: " << put_time(localtime(&now_after_day), "%Y-%m-%d %H:%M:%S") << endl;
return 0;
}
同时也可以利用时间点来统计代码执行的时间:
#include<iostream>
#include<thread>
#include<chrono>
using namespace std;
int main() {
auto start = chrono::steady_clock::now();
/*int j = 0;*/
/*for(int i = 0 ; i < 10 ; i ++) {*/
/* j ++; */
/*}*/
this_thread::sleep_for(chrono::seconds(1));
auto end = chrono::steady_clock::now();
auto result = chrono::duration_cast<chrono::milliseconds>(end - start);
cout << "花费时间为: " << result.count() << " ns" << endl;
}
同时超时时间可以配合条件变量或者std::futrue
使用,注意超时的返回值:
#include <chrono>
#include<condition_variable>
#include<iostream>
#include<thread>
using namespace std;
condition_variable cv;
mutex m;
bool done = false;
bool wait_loop() {
const auto timeout = chrono::steady_clock::now() + 500ms;
unique_lock<mutex> lk{m};
while(!done) {
if(cv.wait_until(lk , timeout) == cv_status::timeout) {
cout << "thread: " << this_thread::get_id() << " continue ..." << endl;
return false;
}
}
cout << "thread: " << this_thread::get_id() << "successfully exit ... " << endl;
return true;
}
void notify_task() {
this_thread::sleep_for(chrono::seconds(4));
unique_lock<mutex> lk{m};
done = true;
cv.notify_one();
}
int main() {
thread t1{[](){
while(!wait_loop());
}};
thread t2{notify_task};
t1.join();
t2.join();
}
C++20信号量
信号量是一个非常轻量简单的同步设施,它维护一个计数,这个计数不能小于 0
。信号量提供两种基本操作:释放(增加计数)和等待(减少计数)。如果当前信号量的计数值为 0
,那么执行“等待”操作的线程将会一直阻塞,直到计数大于 0
,也就是其它线程执行了“释放”操作。
C++20
中提供了两个信号量类型,std::counting_semaphore
和std::binary_semaphore
,定义与<semaphore>
头文件中,并且binary_semaphore
只是std::counting_semaphore
的一个别名,也就是一个二值信号量:
using binary_semaphore = counting_semaphore<1>;
使用例子如下:
// 全局二元信号量对象
// 设置对象初始计数为 0
std::binary_semaphore smph_signal_main_to_thread{ 0 };
std::binary_semaphore smph_signal_thread_to_main{ 0 };
void thread_proc() {
smph_signal_main_to_thread.acquire();
std::cout << "[线程] 获得信号" << std::endl;
std::this_thread::sleep_for(3s);
std::cout << "[线程] 发送信号\n";
smph_signal_thread_to_main.release();
}
int main() {
std::jthread thr_worker{ thread_proc };
std::cout << "[主] 发送信号\n";
smph_signal_main_to_thread.release();
smph_signal_thread_to_main.acquire();
std::cout << "[主] 获得信号\n";
}
注意在利用信号量实现等待-消费者模型的时候一定需要互斥锁一定是需要保护共享数据,所以加上互斥锁的位置就是共享数据区操作的位置
C++20 闩和屏障
std::latch
闩(latch
)和屏障(barrier
)是线程协调机制,允许任何数量的线程阻塞直到期待数量的线程到达,闩不可以重复使用,但是屏障可以
std::latch
: 单词使用的线程屏障std::barrier
: 可以复用的线程屏障
/* 闩 std::latch 的基本使用方法 */
#include <iostream>
#include <thread>
using namespace std;
std::latch work_start{3};
void work() {
cout << "等待闩阻塞取消..." << endl;
work_start.wait();
cout << "闩的阻塞取消..." << endl;
}
int main() {
std::jthread t{work};
this_thread::sleep_for(3s);
cout << "休眠结束..." << endl;
work_start.count_down();
work_start.count_down(2);
}
闩的作用,比如可以划分线程的工作区间,并且可以同步线程到同一个位置,例子如下:
std::latch latch{ 10 };
void f(int id) {
//todo.. 脑补任务
std::this_thread::sleep_for(1s);
std::cout << std::format("线程 {} 执行完任务,开始等待其它线程执行到此处\n", id);
latch.arrive_and_wait();
std::cout << std::format("线程 {} 彻底退出函数\n", id);
}
int main() {
std::vector<std::jthread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(f,i);
}
}
其中arrive_and_wait
, 等价于count_down() 和 wait()
std::barrier
std::barrier
和 std::latch
最大的不同是,前者可以在阶段完成之后将计数重置为构造时传递的值,而后者只能减少计数,使用例子如下:
std::barrier barrier{ 10,
[n = 1]()mutable noexcept {std::cout << "\t第" << n++ << "轮结束\n"; }
};
void f(int start, int end){
for (int i = start; i <= end; ++i) {
std::osyncstream{ std::cout } << i << ' ';
barrier.arrive_and_wait(); // 减少计数并等待 解除阻塞时就重置计数并调用函数对象
std::this_thread::sleep_for(300ms);
}
}
int main(){
std::vector<std::jthread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(f, i * 10 + 1, (i + 1) * 10);
}
}
其中std::osyncstream
也是C++20
引入的,为了确保输出流在多线程环境中同步,避免除了数据竞争,而且将不使用任何方式穿插或者截断
同时标准库中提供了另外一个函数arrive_and_drop
的作用,把当前计数和重置之后的技术减少 1
注意每一次循环结束调用的lambda
函数(也就是第二个参数),必须声明为noexcept
也就是不可以抛出异常
总结
- 条件变量:
std::condition_variable
, 用于等待某一个特定条件满足,可以用于实现消费者-生产者模型 std::future
对象可以看作一个在未来可能发生但是直到预期结果的时间,和下面这些对象同时使用:std::async
: 类似一个异步任务,可以把这一个对象和std::future
绑定,并且支持延时调用和直接创建新线程调用两种方式,注意使用std::launch::deferred
不会创建新线程,但是使用std::launch::async
会创建新的线程执行任务std::paskaged_task
: 相当于对于某一个任务的包装,并且可以利用get_future
获取到关联的future
对象,本质上也是一个可调用对象,调用结束之后把结果放在future
中,注意本身不会创建线程,除非放在线程中执行,注意这一个对象是move-only
的std::promise
的作用是存储线程运行过程中产生的变量和异常,可以使用get_future
方法获取到关联的std::future
对象,并且通过get
方法获取到存储的值std::shared_future
可以多次使用,但是一定需要注意操作共享的shared_future
的时候的线程安全问题(如果是复制的不用考虑) ,可以使用std::future
的share()
方法获取到这一个对象
- 时间库
std::chrono
明确时间点time_point
和时间间隔:std::chrono::duration<m , ratio<n , k>>
的含义即可 C++20
中的信号量和闩屏障了解即可