美文网首页
brpc之mutex源码分析

brpc之mutex源码分析

作者: fooboo | 来源:发表于2019-08-04 23:36 被阅读0次

这篇分析的不完整,没有彻底弄明白。在此brpc之前,基本所有使用过或者分析过的mutex只是纯粹的封装下接口,使用类似scope_mutex这种手法(raii)构造时加锁,析构时解锁这样的简单形式。

早期分析过一篇brpc中关于定时器和futex实现brpc中定时器的实现
,其中对于成员变量internal::FastPthreadMutex _mutex则是简单的跳过未作详细分析,因为都知道mutex用于同步临界区使的此时只有一个工作线程在里面操作,不至于出现数据的不一致性,对于linux版本的mutex的实现,这里有linux 2.6 互斥锁的实现-源码分析,网上大部分都是转来转去,抄来抄去,有兴趣的可以再对着源码分析下mutex,这里主要是分析brpc中的mutex实现。

从声明开始,这里有两种实现,先看第二种,第一种后面再简单介绍下,futex相关的实现参考上面的链接:

 71 namespace internal {
 72 #ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
 73 class FastPthreadMutex {
 74 public:
 75     FastPthreadMutex() : _futex(0) {}
 76     ~FastPthreadMutex() {}
 77     void lock();
 78     void unlock();
 79     bool try_lock();
 80 private:
 81     DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
 82     int lock_contended();
 83     unsigned _futex;
 84 };  
 85 #else
 86 typedef butil::Mutex FastPthreadMutex;
 87 #endif
 88 }

 26 #if !defined(BUTIL_CXX11_ENABLED)
 27 #define BUTIL_DELETE_FUNCTION(decl) decl
 28 #else
 29 #define BUTIL_DELETE_FUNCTION(decl) decl = delete
 30 #endif

 42 #define DISALLOW_COPY_AND_ASSIGN(TypeName)                      \
 43     BUTIL_DELETE_FUNCTION(TypeName(const TypeName&));            \
 44     BUTIL_DELETE_FUNCTION(void operator=(const TypeName&))

对于mutex是禁止拷贝赋值操作的,由于编译器会在某些情况下(认为它需要的时候)合成构造拷贝等几个特殊成员函数,c++11版本以上使用delete关键字即可,否则只能声明为private不定义,链接时出现error以示错误。

 45 // NOTE: Not aligned to cacheline as the container of Mutex is practically aligned
 46 class Mutex {
 47 public:
 48     typedef bthread_mutex_t* native_handler_type;
 49     Mutex() {
 50         int ec = bthread_mutex_init(&_mutex, NULL);
 51         if (ec != 0) {
 52             throw std::system_error(std::error_code(ec, std::system_category()), "Mutex construc    tor failed");
 53         }
 54     }
 55     ~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
 56     native_handler_type native_handler() { return &_mutex; }
 57     void lock() {
 58         int ec = bthread_mutex_lock(&_mutex);
 59         if (ec != 0) {
 60             throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock fai    led");
 61         }
 62     }
 63     void unlock() { bthread_mutex_unlock(&_mutex); }
 64     bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
 66 private:
 67     DISALLOW_COPY_AND_ASSIGN(Mutex);
 68     bthread_mutex_t _mutex;
 69 };

155 typedef struct {
156     int64_t duration_ns;
157     size_t sampling_range;
158 } bthread_contention_site_t;
159 
160 typedef struct {
161     unsigned* butex;
162     bthread_contention_site_t csite;
163 } bthread_mutex_t;

以上便是mutex类声明,对象池ObjectPool实现后期分析一下,初始化如下bthread_mutex_init

701 int bthread_mutex_init(bthread_mutex_t* __restrict m,
702                        const bthread_mutexattr_t* __restrict) {
703     bthread::make_contention_site_invalid(&m->csite);
704     m->butex = bthread::butex_create_checked<unsigned>();
705     if (!m->butex) {
706         return ENOMEM;
707     }   
708     *m->butex = 0; //初始化为0
709     return 0;
710 } 

363 BUTIL_FORCE_INLINE void
364 make_contention_site_invalid(bthread_contention_site_t* cs) {
365     cs->sampling_range = 0;
366 }

 39 // Check width of user type before casting.
 40 template <typename T> T* butex_create_checked() {
 41     BAIDU_CASSERT(sizeof(T) == sizeof(int), sizeof_T_must_equal_int);
 42     return static_cast<T*>(butex_create());
 43 }

243 void* butex_create() {
244     Butex* b = butil::get_object<Butex>();
245     if (b) {
246         return &b->value;
247     } 
248     return NULL;
249 }

116 struct BAIDU_CACHELINE_ALIGNMENT Butex {
117     Butex() {}
118     ~Butex() {}
119     
120     butil::atomic<int> value;
121     ButexWaiterList waiters;//typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
122     internal::FastPthreadMutex waiter_lock;
123 };

上面的Butex类声明其实和mutex类差不多:

struct mutex {
   atomic_t  count; //引用计数器,1: 所可以利用,小于等于0:该锁已被获取,需要等待
   spinlock_t  wait_lock;//自旋锁类型,保证多cpu下,对等待队列访问是安全的。
   struct list_head wait_list;//等待队列,如果该锁被获取,任务将挂在此队列上,等待调度。
};

其中ButexWaiterList为双向链表,简单起见这里使用ButexPthreadWaiter

 86 struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
 87     // tids of pthreads are 0
 88     bthread_t tid;
 89 
 90     // Erasing node from middle of LinkedList is thread-unsafe, we need
 91     // to hold its container's lock.
 92     butil::atomic<Butex*> container;
 93 };

106 // pthread_task or main_task allocates this structure on stack and queue it
107 // in Butex::waiters. 
108 struct ButexPthreadWaiter : public ButexWaiter {
109     butil::atomic<int> sig;
110 };

 84 template <typename T>
 85 class LinkNode {
154  private:
155   LinkNode<T>* previous_;
156   LinkNode<T>* next_;
158   DISALLOW_COPY_AND_ASSIGN(LinkNode);
159 };
729 int bthread_mutex_lock(bthread_mutex_t* m) { 
730     bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
731     if (!split->locked.exchange(1, butil::memory_order_acquire)) {
732         return 0;
733     }   
734     // Don't sample when contention profiler is off.
735     if (!bthread::g_cp) {
736         return bthread::mutex_lock_contended(m);
737     }
738     //sample code...
753 }

615 // Implement bthread_mutex_t related functions
616 struct MutexInternal {
617     butil::static_atomic<unsigned char> locked;
618     butil::static_atomic<unsigned char> contended;
619     unsigned short padding;
620 };

以上是加锁实现,对unsigned* butex强转类型struct MutexInternal。由于初始为0,故第一次加锁是成功的,设置locked为1并返回为0。代码738到752是统计相关的,这里直接分析mutex_lock_contended,即当锁不可用时做哪些工作。

622 const MutexInternal MUTEX_CONTENDED_RAW = {{1},{1},0};
623 const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
624 // Define as macros rather than constants which can't be put in read-only
625 // section and affected by initialization-order fiasco.
626 #define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
627 #define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)

632 inline int mutex_lock_contended(bthread_mutex_t* m) {
633     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
634     while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
635         if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
636             errno != EWOULDBLOCK && errno != EINTR/*note*/) {
637             // a mutex lock should ignore interrruptions in general since
638             // user code is unlikely to check the return value.
639             return errno;
640         } 
641     }
642     return 0;
643 }

以上while会一直尝试加锁,如果加失败即返回值不为0,则尝试butex_wait,如果失败的话则提前返回,否则在成功的情况下会返回0。

611 int butex_wait(void* arg, int expected_value, const timespec* abstime) {
612     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);//根据三个参数求出结构体首地址
613     if (b->value.load(butil::memory_order_relaxed) != expected_value) {
614         errno = EWOULDBLOCK;
615         // Sometimes we may take actions immediately after unmatched butex,
616         // this fence makes sure that we see changes before changing butex.
617         butil::atomic_thread_fence(butil::memory_order_acquire);
618         return -1;
619     }
620     TaskGroup* g = tls_task_group;
621     if (NULL == g || g->is_current_pthread_task()) {
622         return butex_wait_from_pthread(g, b, expected_value, abstime);
623     }
624     //ButexBthreadWaiter bbw code...;
693     return 0;
694 }

这里当尝试获取锁的时候,在wait时,再次load对应的值,如果与BTHREAD_MUTEX_CONTENDED不相等则表示内容变化则重新加锁,否则进行butex_wait_from_pthread,不过在真正wait_pthread前,还会再判断一次。

542 static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
543                                    const timespec* abstime) {
544     // sys futex needs relative timeout.
545     // Compute diff between abstime and now.
546     timespec* ptimeout = NULL;
547     timespec timeout;
548     if (abstime != NULL) {
549         //more code...
557     }       
558             
559     TaskMeta* task = NULL;
560     ButexPthreadWaiter pw;
561     pw.tid = 0;
562     pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
563     int rc = 0;
564 
565     if (g) {
566         //
568     }
569     b->waiter_lock.lock();
570     if (b->value.load(butil::memory_order_relaxed) != expected_value) {
571         b->waiter_lock.unlock();
572         errno = EWOULDBLOCK;
573         rc = -1;
574     } else if (task != NULL && task->interrupted) {
575         //
580     } else {
581         b->waiters.Append(&pw);
582         pw.container.store(b, butil::memory_order_relaxed);
583         b->waiter_lock.unlock();
584 
589         rc = wait_pthread(pw, ptimeout);
593     }
594     if (task) {
595         //more code...
607     }
608     return rc;
609 }

这里把自己加到waiters时再判断一次b->value.load(butil::memory_order_relaxed) != expected_value,接着:

581         b->waiters.Append(&pw);
582         pw.container.store(b, butil::memory_order_relaxed);
583         b->waiter_lock.unlock();

589         rc = wait_pthread(pw, ptimeout);
140 int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
141     while (true) {
142         const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
143         if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
144             // If `sig' is changed, wakeup_pthread() must be called and `pw'
145             // is already removed from the butex.
146             // Acquire fence makes this thread sees changes before wakeup.
147             return rc;
148         }   
149         if (rc != 0 && errno == ETIMEDOUT) {
150             //more code...
165         }
166     }
167 }

下面是bthread_mutex_unlock实现:

787 int bthread_mutex_unlock(bthread_mutex_t* m) {
788     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
789     bthread_contention_site_t saved_csite = {0, 0};
790     if (bthread::is_contention_site_valid(m->csite)) {
791         saved_csite = m->csite;
792         bthread::make_contention_site_invalid(&m->csite);
793     }
794     const unsigned prev = whole->exchange(0, butil::memory_order_release);
795     // CAUTION: the mutex may be destroyed, check comments before butex_create
796     if (prev == BTHREAD_MUTEX_LOCKED) {
797         return 0;//就自己不需要唤醒
798     }
799     // Wakeup one waiter 
800     if (!bthread::is_contention_site_valid(saved_csite)) {
801         bthread::butex_wake(whole);
802         return 0;
803     }
804     //more code...
809     return 0;
810 }

这里解锁,如果之前只有自己加锁,即没有其他在尝试while中通过BTHREAD_MUTEX_CONTENDED加锁成功的,表示不需要进行唤醒,否则需要尝试唤醒(因为自己加锁成功的值是BTHREAD_MUTEX_CONTENDED表示至少有两个线程(bthread))。

265 int butex_wake(void* arg) { 
266     Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
267     ButexWaiter* front = NULL;
268     {   
269         BAIDU_SCOPED_LOCK(b->waiter_lock);
270         if (b->waiters.empty()) {
271             return 0;
272         }
273         front = b->waiters.head()->value();
274         front->RemoveFromList();                                              
275         front->container.store(NULL, butil::memory_order_relaxed);
276     } 
277     if (front->tid == 0) {
278         wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
279         return 1;
280     }
281     //ButexBthreadWaiter code...
290 }

这里从waiters取出第一个进行唤醒,由于ButexPthreadWaitertid=0

128 static void wakeup_pthread(ButexPthreadWaiter* pw) {                          
129     // release fence makes wait_pthread see changes before wakeup.
130     pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
131     // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
132     // In which case, futex_wake_private() should return EFAULT.
133     // If crash happens in future, `pw' can be made TLS and never destroyed
134     // to solve the issue.
135     futex_wake_private(&pw->sig, 1);
136 }

另外由于分析过程中并没有考虑到超时的变量,是为了简化起见,但程序中还是要列一下当有超时变量作用时,而wait该时长后超时:

140 int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
141     while (true) {
142         const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
143         //more code...
149         if (rc != 0 && errno == ETIMEDOUT) {
156             if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
159                 if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
160                     ptimeout = NULL; // already timedout, ptimeout is expired.
161                     continue;
162                 }
163             }
164             return rc;
165         }
166     }
167 }

462 inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
463     // `bw' is guaranteed to be valid inside this function because waiter
464     // will wait until this function being cancelled or finished.
465     // NOTE: This function must be no-op when bw->container is NULL.
466     bool erased = false;
467     Butex* b;
468     int saved_errno = errno;
469     while ((b = bw->container.load(butil::memory_order_acquire))) {
470         // b can be NULL when the waiter is scheduled but queued.
471         BAIDU_SCOPED_LOCK(b->waiter_lock);
472         if (b == bw->container.load(butil::memory_order_relaxed)) {
473             bw->RemoveFromList();
474             bw->container.store(NULL, butil::memory_order_relaxed);
475             if (bw->tid) {
476                 //ButexBthreadWaiter code...
477             }
478             erased = true;
479             break;
480         }
481     }
482     if (erased && wakeup) {
483         if (bw->tid) {
484             //ButexBthreadWaiter code...
486         } else {
487             ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
488             wakeup_pthread(pw);
489         }
490     }
491     errno = saved_errno;
492     return erased;
493 }

如文章开始,使用编译选项来决定使用哪一种mutex,而如宏定义BTHREAD_USE_FAST_PTHREAD_MUTEX,此时第一种可能更快一些。

662 int FastPthreadMutex::lock_contended() {
663     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
664     while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
665         if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
666             && errno != EWOULDBLOCK) {
667             return errno;
668         }
669     }
670     return 0;
671 }
672 
673 void FastPthreadMutex::lock() {
674     bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
675     if (split->locked.exchange(1, butil::memory_order_acquire)) {
676         (void)lock_contended();
677     }
678 }

685 void FastPthreadMutex::unlock() {
686     butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
687     const unsigned prev = whole->exchange(0, butil::memory_order_release);
688     // CAUTION: the mutex may be destroyed, check comments before butex_create
689     if (prev != BTHREAD_MUTEX_LOCKED) {
690         futex_wake_private(whole, 1);//尝试唤醒
691     }   
692 }

_futex共有三种可能的值:0表示未加锁,1表示 BTHREAD_MUTEX_LOCKED表示正常加锁,{1,1,0}表示BTHREAD_MUTEX_CONTENDED表示在加锁失败时后来加锁成功;用以区分后面unlock时是否要进行唤醒。

总结,感觉有些复杂,有一些疑问,会在后面分析其他部分代码时再回过头来想想为什么;还没体会到这种实现的原因,还是要多结合项目经历,同时也要对比着linux's mutex的实现源码分析。

要学(掌握)的基础知识太多太多,brpc值得学习的东西也太多太多。

参考资料:
atomic_instructions.md
brpc的研发经历

相关文章

  • brpc之rpc流程分析(上)

    之前关于brpc的几篇分析:brpc之mutex源码分析brpc之ResourcePool源码分析brpc之bth...

  • brpc之mutex源码分析

    这篇分析的不完整,没有彻底弄明白。在此brpc之前,基本所有使用过或者分析过的mutex只是纯粹的封装下接口,使用...

  • brpc之ResourcePool源码分析

    该类在多个模块中使用到,是一种资源预分配的获取及回收,value是uint64_t一般作为index。从memor...

  • Golang之Mutex

    引用 sync.mutex 源代码分析 Golang 中 Mutex 的源码实现 建议阅读 Golang中Mute...

  • brpc之负载均衡源码分析

    在分析bthread相关实现时,从使用中发现有LoadBalancerWithNaming类的使用,这块也比较独立...

  • Linux 之mutex 源码分析

    mutex相关的函数并不是linux kernel实现的,而是glibc实现的,源码位于nptl目录下。 http...

  • brpc之Most Recently Used

    好久没有分析brpc相关的源码了,这次分析下mru相关的实现,在分析此之前,先分析下lru,以leveldb中的l...

  • brpc之消息处理流程

    中间大约有段时间没有继续分析brpc源码,因为有些其他事情,这里分析下当client发送消息后,server收到请...

  • golang源码阅读-sync.Mutex

    【golang源码分析】sync.Mutex 概述 Mutex只有两个阻塞方法Lock和Unlock,有两种工作模...

  • brpc之bthread_id源码分析

    在之前分析rpc时,发现在代码中使用到此结构,当时看到时候感觉挺复杂的,从名字上看还以为仅仅是个uint64_t的...

网友评论

      本文标题:brpc之mutex源码分析

      本文链接:https://www.haomeiwen.com/subject/gheedctx.html