美文网首页
c++11 多线程

c++11 多线程

作者: AI科技智库 | 来源:发表于2020-02-24 11:15 被阅读0次

1 互斥量mutex

mutex又称互斥量,用于提供对共享变量的互斥访问。C++11中mutex相关的类都在<mutex>头文件中。

  • std::mutex,最基本的 Mutex 类。
  • std::recursive_mutex,递归 Mutex 类。
  • std::timed_mutex,定时 Mutex 类。
  • std::recursive_timed_mutex,定时递归 Mutex 类。

1.1 lock, try_lock, unlock

lock

锁住互斥量。调用lock时有三种情况:

  • 如果互斥量没有被锁住,则调用线程将该mutex锁住,直到调用线程调用unlock释放。
  • 如果mutex已被其它线程lock,则调用线程将被阻塞,直到其它线程unlock该mutex。
try_lock

尝试锁住mutex,调用该函数同样也有三种情况:

  • 如果互斥量没有被锁住,则调用线程将该mutex锁住(返回true),直到调用线程调用unlock释放。
  • 如果mutex已被其它线程lock,则调用线程将失败,并返回false。
unlock
  • 解锁mutex,释放对mutex的所有权。
#include <iostream>
#include <thread>
#include <mutex>

void inc(std::mutex &mutex, int loop, int &counter) {
    for (int i = 0; i < loop; i++) {
        mutex.lock();
        ++counter;
        mutex.unlock();
    }
}
int main() {
    std::thread threads[5];
    std::mutex mutex;
    int counter = 0;

    for (std::thread &thr: threads) {
        thr = std::thread(inc, std::ref(mutex), 1000, std::ref(counter));
    }
    for (std::thread &thr: threads) {
        thr.join();
    }

    // 输出:5000,如果inc中调用的是try_lock,则此处可能会<5000
    std::cout << counter << std::endl;

    return 0;
}

1.2 lock_guard

C++11在提供了常规mutex的基础上,还提供了一些易用性的类,lock_guard利用了C++ RAII的特性,在构造函数中上锁,析构函数中解锁。lock_guard是一个模板类,其原型为:

template <class Mutex> class lock_guard
  • 模板参数Mutex代表互斥量,可以是std::mutex, std::timed_mutex, std::recursive_mutex, std::recursive_timed_mutex中的任何一个,也可以是std::unique_lock,这些都提供了lock和unlock的能力。
  • lock_guard仅用于上锁、解锁,不对mutex承担供任何生周期的管理,因此在使用的时候,请确保lock_guard管理的mutex一直有效。
  • 同其它mutex类一样,locak_guard不允许拷贝,即拷贝构造和赋值函数被声明为delete。

lock_guard的设计保证了即使程序在锁定期间发生了异常,也会安全的释放锁,不会发生死锁。

#include <iostream>
#include <mutex>

std::mutex mutex;

void safe_thread() {
    try {
        std::lock_guard<std::mutex> _guard(mutex);
        throw std::logic_error("logic error");
    } catch (std::exception &ex) {
        std::cerr << "[caught] " << ex.what() << std::endl;
    }
}
int main() {
    safe_thread();
    // 此处仍能上锁
    mutex.lock();
    std::cout << "OK, still locked" << std::endl;
    mutex.unlock();

    return 0;
}

1.3 unique_lock

lock_guard提供了简单上锁、解锁操作,但当我们需要更灵活的操作时便无能为力了。这些就需要unique_lock上场了。unique_lock拥有对Mutex的所有权,一但初始化了unique_lock,其就接管了该mutex, 在unique_lock结束生命周期前(析构前),其它地方就不要再直接使用该mutex了。

template <class Mutex>
class unique_lock
{
public:
    typedef Mutex mutex_type;
    // 空unique_lock对象
    unique_lock() noexcept;
    // 管理m, 并调用m.lock进行上锁,如果m已被其它线程锁定,由该构造了函数会阻塞。
    explicit unique_lock(mutex_type& m);
    // 仅管理m,构造函数中不对m上锁。可以在初始化后调用lock, try_lock, try_lock_xxx系列进行上锁。
    unique_lock(mutex_type& m, defer_lock_t) noexcept;
    // 管理m, 并调用m.try_lock,上锁不成功不会阻塞当前线程
    unique_lock(mutex_type& m, try_to_lock_t);
    // 管理m, 该函数假设m已经被当前线程锁定,不再尝试上锁。
    unique_lock(mutex_type& m, adopt_lock_t);
    // 管理m, 并调用m.try_lock_unitil函数进行加锁
    template <class Clock, class Duration>
        unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);
    // 管理m,并调用m.try_lock_for函数进行加锁
    template <class Rep, class Period>
        unique_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);
    // 析构,如果此前成功加锁(或通过adopt_lock_t进行构造),并且对mutex拥有所有权,则解锁mutex
    ~unique_lock();

    // 禁止拷贝操作
    unique_lock(unique_lock const&) = delete;
    unique_lock& operator=(unique_lock const&) = delete;

    // 支持move语义
    unique_lock(unique_lock&& u) noexcept;
    unique_lock& operator=(unique_lock&& u) noexcept;

    void lock();
    bool try_lock();

    template <class Rep, class Period>
        bool try_lock_for(const chrono::duration<Rep, Period>& rel_time);
    template <class Clock, class Duration>
        bool try_lock_until(const chrono::time_point<Clock, Duration>& abs_time);

    // 显示式解锁,该函数调用后,除非再次调用lock系列函数进行上锁,否则析构中不再进行解锁
    void unlock();

    // 与另一个unique_lock交换所有权
    void swap(unique_lock& u) noexcept;
    // 返回当前管理的mutex对象的指针,并释放所有权
    mutex_type* release() noexcept;

    // 当前实例是否获得了锁
    bool owns_lock() const noexcept;
    // 同owns_lock
    explicit operator bool () const noexcept;
    // 返回mutex指针,便于开发人员进行更灵活的操作
    // 注意:此时mutex的所有权仍归unique_lock所有,因此不要对mutex进行加锁、解锁操作
    mutex_type* mutex() const noexcept;
};

2 条件变量(std::condition_variable)

前面介绍了互斥量(std::mutex),互斥量是多线程间同时访问某一共享变量时,保证变量可被安全访问的手段。在多线程编程中,还有另一种十分常见的行为:线程同步。线程同步是指线程间需要按照预定的先后次序顺序进行的行为。C++11对这种行为也提供了有力的支持,这就是条件变量。条件变量位于头文件condition_variable下。
条件变量提供了两类操作:wait和notify。这两类操作构成了多线程同步的基础。

2.1 wait

wait是线程的等待动作,直到其它线程将其唤醒后,才会继续往下执行。下面通过伪代码来说明其用法:

std::mutex mutex;
std::condition_variable cv;

// 条件变量与临界区有关,用来获取和释放一个锁,因此通常会和mutex联用。
std::unique_lock lock(mutex);
// 此处会释放lock,然后在cv上等待,直到其它线程通过cv.notify_xxx来唤醒当前线程,cv被唤醒后会再次对lock进行上锁,然后wait函数才会返回。
// wait返回后可以安全的使用mutex保护的临界区内的数据。此时mutex仍为上锁状态
cv.wait(lock)

除wait外, 条件变量还提供了wait_for和wait_until,这两个名称是不是看着有点儿眼熟,std::mutex也提供了_for和_until操作。在C++11多线程编程中,需要等待一段时间的操作,一般情况下都会有xxx_for和xxx_until版本。前者用于等待指定时长,后者用于等待到指定的时间。

2.2 notify

notify就简单多了:唤醒wait在该条件变量上的线程。notify有两个版本:notify_one和notify_all。

  • notify_one 唤醒等待的一个线程,注意只唤醒一个。
  • notify_all 唤醒所有等待的线程。
std::mutex mutex;
std::condition_variable cv;

std::unique_lock lock(mutex);
// 所有等待在cv变量上的线程都会被唤醒。但直到lock释放了mutex,被唤醒的线程才会从wait返回。
cv.notify_all(lock)

2.3 BufferQueue的实现

#include "anbox/common/small_vector.h"

#include <condition_variable>
#include <memory>
#include <mutex>

namespace anbox {
namespace graphics {
using Buffer = anbox::common::SmallFixedVector<char, 512>;

class BufferQueue {
 public:
  BufferQueue(size_t capacity);

  bool can_push_locked() const { return !closed_ && (count_ < capacity_); }
  bool can_pop_locked() const { return count_ > 0U; }
  bool is_closed_locked() const { return closed_; }

  int wait_until_not_empty_locked(std::unique_lock<std::mutex> &lock);

  int try_push_locked(Buffer &&buffer);
  int push_locked(Buffer &&buffer, std::unique_lock<std::mutex> &lock);
  int try_pop_locked(Buffer *buffer);
  int pop_locked(Buffer *buffer, std::unique_lock<std::mutex> &lock);
  void close_locked();

 private:
  size_t capacity_ = 0;    //队列大小
  size_t pos_ = 0;             //pop的位置
  size_t count_ = 0;        //剩余数据块的数量
  bool closed_ = false;
  std::unique_ptr<Buffer[]> buffers_;

  std::condition_variable can_push_;
  std::condition_variable can_pop_;
};

}  // namespace graphics
}  // namespace anbox

#endif
#include "anbox/graphics/buffer_queue.h"

namespace anbox {
namespace graphics {
BufferQueue::BufferQueue(size_t capacity)
    : capacity_(capacity), buffers_(new Buffer[capacity]) {}

int BufferQueue::try_push_locked(Buffer &&buffer) {
  if (closed_)
    return -EIO;

  if (count_ >= capacity_)
    return -EAGAIN;

  size_t pos = pos_ + count_;
  if (pos >= capacity_)
    pos -= capacity_;

  buffers_[pos] = std::move(buffer);
  if (count_++ == 0)
    can_pop_.notify_one();

  return 0;
}

int BufferQueue::push_locked(
    Buffer &&buffer, std::unique_lock<std::mutex> &lock) {
  while (count_ == capacity_) {     //队列已满
    if (closed_)
      return -EIO;
    can_push_.wait(lock);   //lock 锁住
  }
  return try_push_locked(std::move(buffer));
}

int BufferQueue::wait_until_not_empty_locked(std::unique_lock<std::mutex> &lock) {
  while (count_ == 0) {
    if (closed_)
      // Closed queue is empty.
      return -EIO;
    can_pop_.wait(lock);
  }

  return 0;
}

int BufferQueue::try_pop_locked(Buffer *buffer) {
  if (count_ == 0)
    return closed_ ? -EIO : -EAGAIN;

  *buffer = std::move(buffers_[pos_]);
  size_t pos = pos_ + 1;
  if (pos >= capacity_)
    pos -= capacity_;

  pos_ = pos;
  if (count_-- == capacity_)
    can_push_.notify_one();

  return 0;
}

int BufferQueue::pop_locked(
    Buffer *buffer, std::unique_lock<std::mutex> &lock) {
  while (count_ == 0) {
    if (closed_)
      // Closed queue is empty.
      return -EIO;
    can_pop_.wait(lock);
  }
  return try_pop_locked(buffer);
}

// Close the queue, it is no longer possible to push new items
// to it (i.e. push() will always return Result::Error), or to
// read from an empty queue (i.e. pop() will always return
// Result::Error once the queue becomes empty).
void BufferQueue::close_locked() {
  closed_ = true;

  // Wake any potential waiters.
  if (count_ == capacity_) {
    can_push_.notify_all();
  }
  if (count_ == 0) {
    can_pop_.notify_all();
  }
}
}  // namespace graphics
}  // namespace anbox

3 原子类型操作

std::atomic<T>
对基本类型(如int、double等)使用std::atomic<T>模板包装成原子类型。

std::atomic<int> sum = 0; 
//int sum = 0;
//std::mutex mutex;
 
void fun()
{
    for (int i = 0; i<100000; ++i){
         //mutex.lock();
         sum ++;                   //使用原子类型对sum的操作是原子的
         //mutex.unlock();
    }
}
 
int main()
{
    std::cout << "Before joining,sun = " << sum << std::endl;
    std::thread t1(fun);
    std::thread t2(fun);
    t1.join();
    t2.join();
    std::cout << "After joining,sun = " << sum << std::endl;
    system("pause");
    return 0;
}
原子类型.png

参考:

https://www.jianshu.com/p/a31d4fb5594f

相关文章

网友评论

      本文标题:c++11 多线程

      本文链接:https://www.haomeiwen.com/subject/umrnfhtx.html