1 互斥量mutex
mutex又称互斥量,用于提供对共享变量的互斥访问。C++11中mutex相关的类都在<mutex>头文件中。
- std::mutex,最基本的 Mutex 类。
- std::recursive_mutex,递归 Mutex 类。
- std::timed_mutex,定时 Mutex 类。
- std::recursive_timed_mutex,定时递归 Mutex 类。
1.1 lock, try_lock, unlock
lock
锁住互斥量。调用lock时有三种情况:
- 如果互斥量没有被锁住,则调用线程将该mutex锁住,直到调用线程调用unlock释放。
- 如果mutex已被其它线程lock,则调用线程将被阻塞,直到其它线程unlock该mutex。
try_lock
尝试锁住mutex,调用该函数同样也有三种情况:
- 如果互斥量没有被锁住,则调用线程将该mutex锁住(返回true),直到调用线程调用unlock释放。
- 如果mutex已被其它线程lock,则调用线程将失败,并返回false。
unlock
- 解锁mutex,释放对mutex的所有权。
#include <iostream>
#include <thread>
#include <mutex>
void inc(std::mutex &mutex, int loop, int &counter) {
for (int i = 0; i < loop; i++) {
mutex.lock();
++counter;
mutex.unlock();
}
}
int main() {
std::thread threads[5];
std::mutex mutex;
int counter = 0;
for (std::thread &thr: threads) {
thr = std::thread(inc, std::ref(mutex), 1000, std::ref(counter));
}
for (std::thread &thr: threads) {
thr.join();
}
// 输出:5000,如果inc中调用的是try_lock,则此处可能会<5000
std::cout << counter << std::endl;
return 0;
}
1.2 lock_guard
C++11在提供了常规mutex的基础上,还提供了一些易用性的类,lock_guard利用了C++ RAII的特性,在构造函数中上锁,析构函数中解锁。lock_guard是一个模板类,其原型为:
template <class Mutex> class lock_guard
- 模板参数Mutex代表互斥量,可以是std::mutex, std::timed_mutex, std::recursive_mutex, std::recursive_timed_mutex中的任何一个,也可以是std::unique_lock,这些都提供了lock和unlock的能力。
- lock_guard仅用于上锁、解锁,不对mutex承担供任何生周期的管理,因此在使用的时候,请确保lock_guard管理的mutex一直有效。
- 同其它mutex类一样,locak_guard不允许拷贝,即拷贝构造和赋值函数被声明为delete。
lock_guard的设计保证了即使程序在锁定期间发生了异常,也会安全的释放锁,不会发生死锁。
#include <iostream>
#include <mutex>
std::mutex mutex;
void safe_thread() {
try {
std::lock_guard<std::mutex> _guard(mutex);
throw std::logic_error("logic error");
} catch (std::exception &ex) {
std::cerr << "[caught] " << ex.what() << std::endl;
}
}
int main() {
safe_thread();
// 此处仍能上锁
mutex.lock();
std::cout << "OK, still locked" << std::endl;
mutex.unlock();
return 0;
}
1.3 unique_lock
lock_guard提供了简单上锁、解锁操作,但当我们需要更灵活的操作时便无能为力了。这些就需要unique_lock上场了。unique_lock拥有对Mutex的所有权,一但初始化了unique_lock,其就接管了该mutex, 在unique_lock结束生命周期前(析构前),其它地方就不要再直接使用该mutex了。
template <class Mutex>
class unique_lock
{
public:
typedef Mutex mutex_type;
// 空unique_lock对象
unique_lock() noexcept;
// 管理m, 并调用m.lock进行上锁,如果m已被其它线程锁定,由该构造了函数会阻塞。
explicit unique_lock(mutex_type& m);
// 仅管理m,构造函数中不对m上锁。可以在初始化后调用lock, try_lock, try_lock_xxx系列进行上锁。
unique_lock(mutex_type& m, defer_lock_t) noexcept;
// 管理m, 并调用m.try_lock,上锁不成功不会阻塞当前线程
unique_lock(mutex_type& m, try_to_lock_t);
// 管理m, 该函数假设m已经被当前线程锁定,不再尝试上锁。
unique_lock(mutex_type& m, adopt_lock_t);
// 管理m, 并调用m.try_lock_unitil函数进行加锁
template <class Clock, class Duration>
unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);
// 管理m,并调用m.try_lock_for函数进行加锁
template <class Rep, class Period>
unique_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);
// 析构,如果此前成功加锁(或通过adopt_lock_t进行构造),并且对mutex拥有所有权,则解锁mutex
~unique_lock();
// 禁止拷贝操作
unique_lock(unique_lock const&) = delete;
unique_lock& operator=(unique_lock const&) = delete;
// 支持move语义
unique_lock(unique_lock&& u) noexcept;
unique_lock& operator=(unique_lock&& u) noexcept;
void lock();
bool try_lock();
template <class Rep, class Period>
bool try_lock_for(const chrono::duration<Rep, Period>& rel_time);
template <class Clock, class Duration>
bool try_lock_until(const chrono::time_point<Clock, Duration>& abs_time);
// 显示式解锁,该函数调用后,除非再次调用lock系列函数进行上锁,否则析构中不再进行解锁
void unlock();
// 与另一个unique_lock交换所有权
void swap(unique_lock& u) noexcept;
// 返回当前管理的mutex对象的指针,并释放所有权
mutex_type* release() noexcept;
// 当前实例是否获得了锁
bool owns_lock() const noexcept;
// 同owns_lock
explicit operator bool () const noexcept;
// 返回mutex指针,便于开发人员进行更灵活的操作
// 注意:此时mutex的所有权仍归unique_lock所有,因此不要对mutex进行加锁、解锁操作
mutex_type* mutex() const noexcept;
};
2 条件变量(std::condition_variable)
前面介绍了互斥量(std::mutex),互斥量是多线程间同时访问某一共享变量时,保证变量可被安全访问的手段。在多线程编程中,还有另一种十分常见的行为:线程同步。线程同步是指线程间需要按照预定的先后次序顺序进行的行为。C++11对这种行为也提供了有力的支持,这就是条件变量。条件变量位于头文件condition_variable下。
条件变量提供了两类操作:wait和notify。这两类操作构成了多线程同步的基础。
2.1 wait
wait是线程的等待动作,直到其它线程将其唤醒后,才会继续往下执行。下面通过伪代码来说明其用法:
std::mutex mutex;
std::condition_variable cv;
// 条件变量与临界区有关,用来获取和释放一个锁,因此通常会和mutex联用。
std::unique_lock lock(mutex);
// 此处会释放lock,然后在cv上等待,直到其它线程通过cv.notify_xxx来唤醒当前线程,cv被唤醒后会再次对lock进行上锁,然后wait函数才会返回。
// wait返回后可以安全的使用mutex保护的临界区内的数据。此时mutex仍为上锁状态
cv.wait(lock)
除wait外, 条件变量还提供了wait_for和wait_until,这两个名称是不是看着有点儿眼熟,std::mutex也提供了_for和_until操作。在C++11多线程编程中,需要等待一段时间的操作,一般情况下都会有xxx_for和xxx_until版本。前者用于等待指定时长,后者用于等待到指定的时间。
2.2 notify
notify就简单多了:唤醒wait在该条件变量上的线程。notify有两个版本:notify_one和notify_all。
- notify_one 唤醒等待的一个线程,注意只唤醒一个。
- notify_all 唤醒所有等待的线程。
std::mutex mutex;
std::condition_variable cv;
std::unique_lock lock(mutex);
// 所有等待在cv变量上的线程都会被唤醒。但直到lock释放了mutex,被唤醒的线程才会从wait返回。
cv.notify_all(lock)
2.3 BufferQueue的实现
#include "anbox/common/small_vector.h"
#include <condition_variable>
#include <memory>
#include <mutex>
namespace anbox {
namespace graphics {
using Buffer = anbox::common::SmallFixedVector<char, 512>;
class BufferQueue {
public:
BufferQueue(size_t capacity);
bool can_push_locked() const { return !closed_ && (count_ < capacity_); }
bool can_pop_locked() const { return count_ > 0U; }
bool is_closed_locked() const { return closed_; }
int wait_until_not_empty_locked(std::unique_lock<std::mutex> &lock);
int try_push_locked(Buffer &&buffer);
int push_locked(Buffer &&buffer, std::unique_lock<std::mutex> &lock);
int try_pop_locked(Buffer *buffer);
int pop_locked(Buffer *buffer, std::unique_lock<std::mutex> &lock);
void close_locked();
private:
size_t capacity_ = 0; //队列大小
size_t pos_ = 0; //pop的位置
size_t count_ = 0; //剩余数据块的数量
bool closed_ = false;
std::unique_ptr<Buffer[]> buffers_;
std::condition_variable can_push_;
std::condition_variable can_pop_;
};
} // namespace graphics
} // namespace anbox
#endif
#include "anbox/graphics/buffer_queue.h"
namespace anbox {
namespace graphics {
BufferQueue::BufferQueue(size_t capacity)
: capacity_(capacity), buffers_(new Buffer[capacity]) {}
int BufferQueue::try_push_locked(Buffer &&buffer) {
if (closed_)
return -EIO;
if (count_ >= capacity_)
return -EAGAIN;
size_t pos = pos_ + count_;
if (pos >= capacity_)
pos -= capacity_;
buffers_[pos] = std::move(buffer);
if (count_++ == 0)
can_pop_.notify_one();
return 0;
}
int BufferQueue::push_locked(
Buffer &&buffer, std::unique_lock<std::mutex> &lock) {
while (count_ == capacity_) { //队列已满
if (closed_)
return -EIO;
can_push_.wait(lock); //lock 锁住
}
return try_push_locked(std::move(buffer));
}
int BufferQueue::wait_until_not_empty_locked(std::unique_lock<std::mutex> &lock) {
while (count_ == 0) {
if (closed_)
// Closed queue is empty.
return -EIO;
can_pop_.wait(lock);
}
return 0;
}
int BufferQueue::try_pop_locked(Buffer *buffer) {
if (count_ == 0)
return closed_ ? -EIO : -EAGAIN;
*buffer = std::move(buffers_[pos_]);
size_t pos = pos_ + 1;
if (pos >= capacity_)
pos -= capacity_;
pos_ = pos;
if (count_-- == capacity_)
can_push_.notify_one();
return 0;
}
int BufferQueue::pop_locked(
Buffer *buffer, std::unique_lock<std::mutex> &lock) {
while (count_ == 0) {
if (closed_)
// Closed queue is empty.
return -EIO;
can_pop_.wait(lock);
}
return try_pop_locked(buffer);
}
// Close the queue, it is no longer possible to push new items
// to it (i.e. push() will always return Result::Error), or to
// read from an empty queue (i.e. pop() will always return
// Result::Error once the queue becomes empty).
void BufferQueue::close_locked() {
closed_ = true;
// Wake any potential waiters.
if (count_ == capacity_) {
can_push_.notify_all();
}
if (count_ == 0) {
can_pop_.notify_all();
}
}
} // namespace graphics
} // namespace anbox
3 原子类型操作
std::atomic<T>
对基本类型(如int、double等)使用std::atomic<T>模板包装成原子类型。
std::atomic<int> sum = 0;
//int sum = 0;
//std::mutex mutex;
void fun()
{
for (int i = 0; i<100000; ++i){
//mutex.lock();
sum ++; //使用原子类型对sum的操作是原子的
//mutex.unlock();
}
}
int main()
{
std::cout << "Before joining,sun = " << sum << std::endl;
std::thread t1(fun);
std::thread t2(fun);
t1.join();
t2.join();
std::cout << "After joining,sun = " << sum << std::endl;
system("pause");
return 0;
}

网友评论