一站式网站建设行业,青锐成长计划网站开发过程,第三方营销平台有哪些,广州网站建设好做吗并发编程有两种基本模型#xff0c;一种是message passing#xff0c;另一种是shared memory。在分布式系统中#xff0c;运行在多台机器上的多个进程的并行编程只有一种实用模型#xff1a;message passing。在单机上#xff0c;我们也可以照搬message passing作为多个进…并发编程有两种基本模型一种是message passing另一种是shared memory。在分布式系统中运行在多台机器上的多个进程的并行编程只有一种实用模型message passing。在单机上我们也可以照搬message passing作为多个进程的并发模型。这样整个分布式系统的架构的一致性很强扩容scale out起来也较容易。在多线程编程中message passing更容易保证程序的正确性有的语音只提供这一种模型。
线程同步的四项原则按重要性排列 1.首要原则是尽量最低限度地共享对象减少需要同步的场合。一个对象能不暴露给别的线程就不要暴露如果要暴露优先考虑immutable对象实在不行才暴露可修改的对象并用同步措施来充分保护它。
2.其次是使用高级的并发编程构件如TaskQueue、Producer-Consumer Queue、CountDownLatch计数器闭锁它是一种计数器初始时设置一个计数值并在多个线程完成各自任务后递减这个计数值当计数值达到零时允许某个等待的线程继续执行等。
3.最后不得已必须使用底层同步原语primitives时只用非递归的互斥器和条件变量慎用读写锁它们适用于读操作频繁而写操作较少的情况使用不当可能会导致性能问题需要仔细评估使用场景不要用信号量。
4.除了使用atomic整数外不自己编写lock-free代码编写无锁lock-free代码是一项极具挑战性的任务需要深入了解硬件和多线程编程的细节也不要用内核级同步原语如memory barrier。不凭空猜测哪种做法性能会更好如spin lock vs. mutex。
互斥器mutex可能是使用得最多的同步原语它保护了临界区任何时刻最多只能有一个线程在此mutex划出的临界区内活动。单独使用mutex时主要为了保护共享数据。作者的原则是 1.用RAII手法封装mutex的创建、销毁、加锁、解锁四个操作。用RAII封装这几个操作是通行的做法几乎是C的标准实践。Java里的synchronized语句和C#的using语句也有类似效果即保证锁额生效期间等于一个作用域scope不会因异常而忘记解锁。
2.只用非递归的mutex即不可重入的mutex。
3.不手工调用lock()和unlock()函数一切交给栈上的Guard对象的构造和析构函数负责。Guard对象的生命期正好等于临界区这样我们保证始终在同一个函数同一个scope里对某个mutex加锁和解锁。避免在foo()里加锁然后跑到bar()里解锁也避免在不同的语句分支中分别加锁、解锁。这种做法称为Scoped Locking。
4.在每次构造Guard对象时思考一路上调用栈上已经持有的锁防止因加锁顺序不同而导致死锁。由于Guard对象时栈上对象看函数调用栈就能分析用锁情况非常便利。
次要原则有 1.不使用跨进程的mutex进程间通信只用TCP sockets。
2.加锁、解锁在同一个线程线程a不能去unlock线程b已经锁住的mutexRAII自动保证。
3.别忘了解锁RAII自动保证。
4.不重复解锁RAII自动保证。
5.必要的时候考虑用PTHREAD_MUTEX_ERRORCHECK来排错。这种类型的互斥锁在锁定和解锁操作时会执行额外的错误检查如果违反了互斥锁的使用规则例如线程尝试解锁未锁定的互斥锁或者尝试嵌套锁定同一个互斥锁互斥锁会返回错误代码而不是阻塞或成功。它的性能更慢通常用于调试和测试阶段。
mutex恐怕是最简单的同步原语按照上面的规则几乎不可能用错。
mutex分为递归recursive和非递归non-recursive两种这是POSIX叫法另外的名字是可重入reentrant与非可重入。这两种mutex作为线程间inter-thread的同步工具时没有区别唯一的区别在于同一个线程可以重复对recursive mutex加锁但不能重复对non-recursive mutex加锁。
首选非递归mutex不是为了性能而是为了体现设计意图。non-recursive和recursive的性能差别其实不大因为少用一个计数器前者略快一点点而已。在同一个线程里多次对non-recursive mutex加锁会立刻导致死锁这是它的有点能帮我们思考代码对锁的需求且及早在编码阶段发现问题。
毫无疑问recursive mutex使用起来方便一些因为不用考虑一个线程自己把自己锁死作者猜测这也是Java和Windows默认提供revursive mutex的原因。Java自带的intrinsic lock是可重入的它的util.concurrent库里提供ReentrantLockWindows的CRITICAL_SECTION也是可重入的似乎它们都不提供轻量级的non-recursive mutex。
正因为recursive mutex方便它可能会隐藏代码里的一些问题。典型情况是你以为拿到一个锁就能修改对象了没想到外层代码已经拿到了锁正在修改或读取同一个对象下面是一个具体的例子
MutexLock mutex;
std::vectorFoo foos;void post(const Foo f)
{MutexLockGuard lock(mutex);foos.push_back(f);
}void traverse()
{MutexLockGuard lock(mutex);for (std::vectorFoo::const_iterator it foos.begin(); it ! foos.end(); it){it-doit();}
}post()加锁然后修改foos对象traverse()加锁然后遍历foos向量。这些都正确。
将来有一天Foo::doit()间接调用了post()可能出现以下结果 1.mutex是非递归的于是死锁了。
2.mutex是递归的由于push_back()可能不总是导致vector迭代器失效程序偶尔会crash。
这是就能体现出non-recursive的优越性把程序的逻辑错误暴露出来。死锁比较容易debug把各个线程的调用栈打出来通过gdb的命令thread apply all bt其中thread apply all告诉调试器要对所有线程执行操作bt告诉调试器执行bt操作只要每个函数不是特别长很容易看出是怎么死锁的。或者用PTHREAD_MUTEX_ERRORCHECK一下就能找到错误前提是自定义MutexLock类中带debug选项。程序反正要死不如死得有意义一点留个全尸让验尸post-mortem更容易些。
如果确实需要在遍历时修改vector有两种做法一是把修改推后记住循环中试图添加或删除哪些元素等循环结束了再依记录修改foos二是用copy-on-write下面会介绍。
如果一个函数既可能在已加锁的情况下调用又可能在未加锁的情况下调用那就拆成两个函数 1.跟原来的函数同名函数加锁转而调用第2个函数。
2.给函数名加上后缀WithLockHold不加锁把原来的函数体搬过来。
考虑一个情景成员函数A需要递归调用在A中会访问共享数据因此需要加锁此时可以加递归锁但最好不用递归锁而是将A函数写两个版本一个是加锁版本一个是不加锁版本递归时调用不加锁版本即可。如果成员函数A和B都访问同一共享数据两者都需要加锁而A也要调用B时类似地也调用B的不加锁版本。
就像这样
void post(const Foo f)
{MutexLockGuard lock(mutex);// 不用担心开销编译器会自动内联的postWithLockHold(f);
}// 引入这个函数是为了体现代码作者的意图此处的push_back通常可以手动内联即不用函数postWithLockHold
void postWithLockHold(const Foo f)
{foos.push_back(f);
}这有可能出现两个问题 1.误用了加锁版本死锁了。
2.误用了不加锁版本数据损坏了。
对于1后面讲到的方法能比较容易地排错。对于2如果Pthread提供isLockedByThisThread()就好办了可以写成
void postWithLockHold(const Foo f)
{// muduo::MutexLock提供了这个成员函数assert(mutex.isLockedByThisThread();
}另外WithLockHold这个显眼的后缀也让程序中的误用容易暴露出来。
C没有annotation注解不能像Java那样给method或field标上GuardedBy注解。在Java中GuardedBy可以标明字段或方法被哪个锁所保护从而可以用辅助工具而非编译器生成警告或建议这些工具可以检查你的代码以确保锁的正确使用。
对于需要使用recursive mutex的情况可以借助wrapper封装可能指的是将一段代码封装一个加锁版本这段代码本身是一个函数其中没有加锁这样就有加锁版本和非加锁版本了改用non-recursive mutex代码只会更清晰。
Pthreads的权威专家《Programming with POSIX Threads》的作者David Butenhof也排斥使用recursive mutex他说 Linux的Pthreads mutex采用futex实现futex是fast user-space mutex的缩写是一种优化了的用户空间互斥锁机制不必每次加锁、解锁都陷入系统调用效率不错。Windows的CRITICAL_SECTION也类似不过它可以嵌入一小段spin lock在多CPU系统上如果不能立刻拿到锁它会先spin一小段时间如果还不能拿到锁才挂起当前线程。
如果坚持只使用Scoped Locking那么在出现死锁时很容易定位考虑下面这个线程自己与自己死锁的例子
class Request
{
public:// __attribute__((noinline))void process(){muduo::MutexLockGuard lock(mutex_);// ...// 原本没有print函数某人为了调试程序不小心添加了print();}// __attribute__((noinline))void print() const {muduo::MutexLockGuard lock(mutex_);// ...}private:mutable muduo::MutexLock mutex_;
};int main()
{Request req;req.process();
}上例中process()里加上print()后会出现死锁要调试定位这种死锁很容易只要把函数调用栈打印出来结合源码一看立刻就会发现第六帧Request::process()和第五帧Request::print()先后对同一个mutex上锁引发了死锁。必要的时候可以在函数前加上__attribute__GCC的扩展来防止函数inline展开因为被内联的函数不会打印在调用栈中。 上图中命令的含义是分析self_deadlock程序的核心转储文件core的内容此时不需要对self_deadlock程序加任何参数。
要修复以上错误也很容易按前面的办法从Request::print()抽取出Request::printWithLockHold()并让Request::print()和Request::process()都调用它即可。
再看一个更真实的两个线程死锁的例子。有一个Inventory清单class记录当前的Request对象
class Inventory
{
public:void add(Request *req){muduo::MutexLockGuard lock(mutex_);requests_.insert(req);}// __attribute__((noinline))void remove(Request *req) {moduo::MutexLockGuard lock(mutex_);requests_.erase(req);}void printAll() const;private:mutable muduo::MutexLock mutex_;std::setRequest * requests_;
};// 简单起见这里使用了全局对象
Inventory g_inventory;可见以上Inventory class的add()和remove()成员函数都是线程安全的它使用了mutex来保护共享数据requests_。
Request class与Inventory class的交互逻辑很简单在处理process请求的时候往g_inventory中添加自己在析构的时候从g_inventory中移除自己目前来看整个程序还是线程安全的。
class Request
{
public:// __attribute__((noinline))void process(){muduo::MutexLockGuard lock(mutex_);g_inventory.add(this);// ...}~Request() __attribute__((noinline)) {muduo::MutexLockGuard lock(mutex_);// 为了复现死锁这里用了延时sleep(1);g_inventory.remove(this);}void print() const __attribute__((noinline)){muduo::MutexLockGuard lock(mutex_);// ...}private:mutable muduo::MutexLock mutex_;
};Inventory class还有一个功能是打印全部已知的Request对象。Inventory::printfAll()里的逻辑单独看是没问题的但它可能引发死锁
void Inventory::printAll() const
{muduo::MutexLockGuard lock(mutex_);// 为了容易复现死锁这里用了延时sleep(1);for (std::setRequest *::const_iterator it requests_.begin(); it ! requests_.end(); it){(*it)-print();}printf(Inventory::printAll() unlocked\n);
}下面这个程序运行起来会发生死锁
void threadFunc()
{Request *req new Request;req-process();delete req;
}int main()
{muduo::Thread thread(threadFunc);thread.start();// 为了让另一个线程等待在Request::~Request()的sleep上usleep(500 * 1000);g_inventory.printAll();thread.join();
}通过gdb查看两个线程的函数调用栈我们发现两个线程都等在mutex上__lll_lock_wait估计是发生了死锁因为一个程序中的线程一般只会等在condition variable或epoll_wait上 注意到main()线程是先调用Inventory::printAll#6再调用Request::print#5而threadFunc()线程是先调用Request::~Request#6再调用Inventory::remove#5。这两个调用序列对两个mutex的加锁顺序正好相反于是造成了经典的死锁。如下图 Inventory class的mutex的临界区由灰底表示Request class的mutex的临界区由斜纹表示一旦main()线程中的printAll()在另一个线程的~Request()和remove()之间开始执行死锁就不可避免。
如果printAll()晚于remove()执行此时threadFunc()已经获取到了两个锁就不会再出现死锁了。
让~Request()在PrintAll()和print()之间执行也会出现死锁。
这里也出现了第一章中的race condition即一个线程正在析构对象另一个线程却在调用它的成员函数。
为了解决死锁要么把print()移除pringAll()的临界区这可以用下面介绍的方法要么把remove()移出~Request()的临界区如交换Request的析构函数中加锁和调用remove的顺序。但这没有解决对象析构的race condition。
Inventory::printAll-Request::print不会与Request::process-Inventory::add发生死锁因为如果Inventory::printAll先获得了Inventory的锁则Request::process在将自身放入Inventory前获取不到Inventory的锁此时Inventory::printAll不会打印出当前正在竞争的Request如果Request::process先获得了Inventory的锁则Inventory::printAll需要等待将Request加入requests_后才能继续打印而Inventory::printAll获得Inventory的锁时Request已加入Inventory完毕正在或已经解锁了两个锁。
死锁会让程序行为失常而其他锁使用不当会影响性能。编写高性能多线程程序还需了解false sharing和CPU cache效应 1.false sharing这是一种性能问题它发生在多线程程序中的不同线程同时访问共享内存中不同变量或数据结构的不同部分而这些变量或数据结构被映射到相同的缓存行cache line。因为现代CPU通常以缓存行为单位来加载和存储数据当不同线程访问同一缓存行时会导致缓存一致性开销可能会使程序的性能大幅下降。False sharing会引发不必要的缓存失效因为一个线程的写操作会导致其他线程的缓存无效尽管它们实际上并没有修改相同的数据。
解决False Sharing的方法通常包括使用线程本地存储Thread-Local Storage或者对共享数据进行对齐以确保不同线程访问的数据不会映射到相同的缓存行。
缓存行cache line用于缓存数据以提高内存访问性能。缓存行是缓存的最小单位通常由一组相邻的字节或字组成。当处理器从主内存中加载数据时它会将一整个缓存行的数据复制到高速缓存中。这个缓存行中的数据可以是指令或数据具体取决于缓存的类型指令缓存或数据缓存。
2.CPU Cache效应指利用CPU的缓存来提高程序性能的现象。CPU缓存是一种用于存储频繁访问的数据的高速存储器以减少对主内存的访问延迟。当多线程程序访问相同的数据时合理地利用缓存可以显著提高性能。但同时不恰当的缓存使用也可能导致性能下降。
CPU Cache效应包括缓存命中数据在缓存中和缓存未命中数据不在缓存中。为了获得最佳性能我们需要考虑数据的局部性以减少缓存未命中并避免False Sharing。
互斥器是加锁原语用来排他性地访问共享数据它不是等待原语。在使用mutex时我们一般都会期望加锁不要阻塞总是能立刻拿到锁然后尽快访问数据用完后尽快解锁这样才能不影响并发性和性能。
如果需要等待某个条件成立我们应使用条件变量condition variable。条件变量顾名思义是一个或多个线程等待某个布尔表达式为真即等待别的线程唤醒它。条件变量的学名叫管程monitor。Java Object内置的wait()、notify()、notifyAll是条件变量。
条件变量只有一种正确使用方式对于wait端 1.必须与mutex一起使用该布尔表达式的读写需要受到此mutex的保护。
2.在mutex已上锁的时候才能调用wait()。
3.把判断布尔条件和wait()放到while循环中。
写成代码是
muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::dequeint queue;int dequeue()
{MutexLockGuard lock(mutex);// 必须用循环必须在判断后再waitwhile (queue.empty()){// 这一步会原子地unlock mutex并进入等待不会与enqueue死锁cond.wait();// wait()执行完毕会自动重新加锁}assert(!queue.empty());int top queue.front();queue.pop_front();return top;
}上面的代码必须用while循环来等待条件变量而不能用if语句原因是spurious wakeup。Spurious wakeup是多线程编程中一个可能会发生的现象特别是在使用条件变量等线程同步机制时。当一个线程在等待某个条件满足时它可能在没有明确信号的情况下被唤醒。这种唤醒是“虚假的”或“伪唤醒”因为它没有基于条件的实际变化来触发。
Spurious wakeup通常是由操作系统或编程语言库的内部实现引起的。虽然spurious wakeup在理论上是可能的但在实践中发生的频率相对较低通常情况下不会引起严重问题。
对于signal/boradcast端 1.不一定要在mutex已上锁的情况下调用signal理论上。
2.在signal前一般要修改布尔表达式。
3.修改布尔表达式通常要用mutex保护至少用作full memory barrier。内存屏障是一种同步机制用于确保线程之间的内存访问顺序和可见性。Full memory barrier提供了以下保证 1保证顺序性Full memory barrier确保在内存屏障之前的所有读写操作都在内存屏障之前执行而在内存屏障之后的所有读写操作都在内存屏障之后执行。这防止了指令重排序确保了操作的顺序性。指令重排序是一种CPU优化技术用于提高指令执行效率在处理器内部有多个执行单元它们可以同时执行不依赖于彼此结果的指令以充分利用硬件资源。指令重排序允许处理器在不违反程序语义的前提下重新排列指令的执行顺序以加速执行。多线程程序的正确性通常依赖于特定指令的执行顺序如果处理器进行了不合理的重排序可能导致数据竞争、不一致的状态和程序错误。Full memory barrier阻止处理器在内存屏障之前的写入操作与内存屏障之后的读取操作之间发生重排序这确保了写入操作在读取操作之前完成防止了数据不一致性。Full memory barrier也禁止了读取操作之间的重排序以及读取操作和写入操作之间的重排序这有助于确保操作之间的顺序性。
2保证可见性Full memory barrier确保在内存屏障之前的写操作对于其他线程可见。这意味着在内存屏障之前的写入将被刷新到主内存并在内存屏障之后的读操作将从主内存中获取最新的值。这有助于确保线程之间对共享数据的正确访问。
4.注意区分signal和broadcastbroadcast通常用于表明状态变化signal通常用于表示资源可用。
写成代码是
void enqueue(int x)
{MutexLockGuard lock(mutex);queue.push_back(x);// notify可以移出临界区之外cond.notify();
}上面的dequeue()/enqueue()实际上实现了一个简单的容量无限的unboundedBlockingQueue阻塞队列。
如果以上代码中enqueue()每次添加元素都会调用Condition::notify()如果改成只在queue.size()从0变1的时候才调用Condition::notify()会造成只有一个线程工作的情形例如当前dequeue中有10个元素且由于只有从0变1时才调用过一次Condition::notify()因此当前只会有一个线程被唤醒取出一个元素然后处理在处理这个元素的过程中如果又有5个元素加入dequeue则加入过程不会唤醒其他线程从而只有一个线程处理完所有元素。
条件变量是非常底层的同步原语很少直接使用一般都是用它来实现高层的同步措施如BlockingQueueT或CountDownLatch。
倒计时CountDownLatch是一种常用且易用的同步手段它的用途有 1.主线程发起多个子线程等这些子线程各自都完成一定的任务后主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
2.主线程发起多个子线程子线程都等待主线程主线程完成一些任务后通知所有子线程开始执行。通常用于多个子线程等待主线程发出起跑命令。
当然我们可以直接用条件变量来实现以上两种同步但如果用CountDownLatch程序的逻辑更清晰。CountDownLatch的接口很简单
class CountDownLatch : boost::noncopyable
{
public:// 倒数几次explicit CountDownLatch(int count);// 等待计数值变为0void wait();// 计数减一void countDown();private:mutable MutexLock mutex_;Condition condition_;int count_;
};CountDownLatch的实现也很简单几乎是条件变量的教科书式应用
void CountDownLatch::wait()
{MutexLockGuard lock(mutex_);while (count_ 0){condition_.wait();}
}void CountDownLatch::countDown()
{MutexLockGuard lock(mutex_);--count_;if (count_ 0) {condition_.notifyAll();}
}注意到CountDownLatch::countDown()使用的是Condition::notifyAll()而前面的enqueue()使用的是Condition::notify()这是因为CountDownLatch::countDown()将计数减少到0时需要通知所有阻塞线程运行而enqueue()只添加了一个元素只需通知一个线程运行即可。
互斥器和条件变量构成了多线程编程的全部必备原语用它们即可完成任何多线程同步任务二者不能相互替代。
读写锁Readers-Writer lock简写为rwlock看上去是个很美的抽象它明确区分了read和write两种行为。
初学者常干的一件事是一见到某个共享数据结构频繁读而很少写就把mutex替换为rwlock甚至首选rwlock来保护共享状态这不见得是正确的。
1.从正确性方面说一种典型的易犯错误是在持有read lock时修改了共享数据。这通常发生在程序的维护阶段为了新增功能程序员不小心在原来read lock保护的函数中调用了会修改状态的函数。这种错误的后果跟无保护并发读写共享数据是一样的。
2.从性能方面说读写锁不见得比mutex更高效无论如何reader lock加锁的开销不会比mutex lock小因为它要更新当前reader的数目。如果临界区很小锁竞争不激烈那么mutex往往更快。
3.reader lock可能允许提升upgrade为writer lock指的是已经获取读锁的情况下尝试再加写锁也可能不允许提升Pthread rwlock不允许提升。考虑到上面的post()和traverse()示例如果用读写锁来保护foos对象那么post()应该持有写锁而traverse()应该持有读锁。如果允许把读锁提升为写锁后果跟使用recursive mutex一样会造成迭代器失效例如获得写锁后如果删除或添加了某个位置的元素会导致这个位置后面的所有迭代器失效程序崩溃。如果不允许提升后果跟使用non-recursive mutex一样会造成死锁先加读锁同一线程再加写锁从而导致死锁。最好程序死锁留个全尸好查验。
4.通常reader lock是可重入的writer lock是不可重入的。但为了防止writer饥饿writer lock通常会阻塞后来的reader lock因此reader lock在重入的时候可能死锁线程A先加了读锁然后线程B加写锁然后线程A再加读锁此时B会等待A释放第一个读锁而A会等待B获得写锁然后解锁写锁另外在追求低延迟读取的场合也不适用读写锁。
muduo线程库有意不提供读写锁的封装因为作者还未在工作中遇到过用rwlock替换普通mutex会显著提高性能的例子相反作者建议首选mutex。
遇到并发读写如果条件合适作者通常用shared_ptr实现copy-on-write见下文而不用读写锁同时避免reader比writer阻塞。如果确实对并发读写有极高的性能要求可以考虑read-copy-updateRCU的基本思想是在写操作时不会立即修改共享数据而是创建一个副本copy并在副本上进行修改。同时读操作会继续访问原始数据而写操作则继续在副本上进行修改。一旦写操作完成它会更新共享数据的引用以便读操作可以开始访问新的数据。
作者没有遇到过需要使用信号量的情况且认为信号量不是必备的同步原语因为条件变量配合互斥器完全可以替代其功能且更不容易用错。信号量的一个问题在于它有自己的计数值而通常我们自己的数据结构也有长度值这就造成了同样的信息存了两份需要时刻保持一致这增加了程序员的负担和出错的可能。
作者认为如果程序里需要解决哲学家就餐之类的复杂IPC问题应首先检讨这个设计为什么线程之间会有如此复杂的资源争抢一个线程要同时抢到两个资源一个资源可以被两个线程争夺。如果在工作中遇到作者会把想吃饭这个事情专门交给一个为各位哲学家分派餐具的线程来做然后每个哲学家等在一个简单的condition variable上到时间了有人通知他去吃饭。从哲学上说教科书上的解决方案是平权每个哲学家有自己的线程自己去拿筷子作者宁愿用集权方式用一个线程专门管餐具的分配让其他哲学家线程拿个号等在食堂门口好了这样不损失多少效率却让程序简单很多。虽然Windows的WaitForMultipleObjects它允许程序等待多个对象中的一个或多个对象达到可等待状态这个函数通常用于多线程编程和进程间通信以协调多个操作的执行顺序让这个问题trivial化但在Linux下正确模拟WaitForMultipleObjects不是普通程序员该干的。
Pthreads还提供了barrier这个同步原语但不如CountDownLatch实用。
MutexLock、MutexLockGuard、Condition等class都不允许拷贝构造和赋值。
MutexLock和MutexLockGuard这两个class应该能在纸上默写出来没有太多需要解释的。MutexLock的附加值在于提供了isLockedByThisThread()函数用于程序断言它用到的CurrentThread::tid()在第四章介绍。
以下是MutexLock类
class MutexLock : boost::noncopyable
{
public:MutexLock() : holder_(0){pthread_mutex_init(mutex_, NULL);}~MutexLock(){assert(holder_ 0);pthread_mutex_destory(mutex_);}bool isLockedByThisThread(){return holder_ CurrentThread::tid();}void assertLocked(){assert(isLockedByThisThread();}// 仅供MutexLockGuard调用严禁用户代码调用void lock(){// 这两行顺序不能反pthread_mutex_lock(mutex_);holder_ CurrentThread::tid();}// 仅供MutexLockGuard调用严禁用户代码调用void unlock(){// 这两行顺序不能反holder_ 0;pthread_mutex_unlock(mutex_);}// 仅供Condition调用严禁用户代码调用pthread_mutex_t *getPthreadMutex(){return mutex_;}private:pthread_mutex_t mutex_;pid_t holder_;
};class MutexLockGuard : boost::noncopyable
{
public:explicit MutexLockGuard(MutexLock mutex) : mutex_(mutex){mutex_.lock();}~MutexLockGuard(){mutex_.unlock();}private:MutexLock mutex_;
};#define MutexLockGuard(x) static_assert(false, missing mutex guard var name);以上代码最后定义了一个宏作用是防止程序里出现以下错误
void doit()
{// 遗漏变量名产生一个临时对象又马上销毁了MutexLockGuard(mutex);// 没有锁住临界区// 正确写法是MutexLockGuard lock(mutex);// 临界区
}有人把MutexLockGuard写成template以上没有这么做是因为它的模板类型参数只有MutexLock一种可能没有必要随意增加灵活性于是就手工把模板具现化instantiate了。此外一种更激进的写法是把lock/unlock放到private区然后把MutexLockGuard设为MutexLock的friend。以上MutexLock类中给lock()和unlock()加上注释即可在check-in将代码或更改提交到版本控制系统前的code review也很容易发现误用情况grep getPthreadMutex。
以上代码没有达到工业强度 1.mutex创建为PTHREAD_MUTEX_DEFAULT类型而不是我们预想的PTHREAD_MUTEX_NORMAL类型实际上这两者很可能是等同的严格的做法是用mutexattr来显示指定mutex的类型。
2.没有检查锁相关系统调用的返回值。这里不能用assert()检查返回值因为assert()在release build里加上#define NDEBUG宏是空语句。检查返回值的意义在于防止ENOMEM之类的资源不足情况这一般只可能在负载很重的产品程序中出现一旦出现这种情况程序必须立刻清理现场并主动退出否则会莫名其妙地崩溃给事后调查造成困难。这里我们需要non-debug的assert或许google-glog的CHECK()宏是个不错的思路。CHECK宏类似CHECK(x 0) x is not greater than 0!;根据返回值即x的值进行某个行为可以将宏的行为修改。
muduo库的一个特点是只提供最常用、最基本的功能特别有意避免提供多种功能近似的选择删繁就简举重若轻减少选择余地。
MutexLock没有提供tryLock()因为作者没有在代码中用过它不知道什么时候程序需要试着去锁一锁。truyock()可用于观察lock contention。
Pthreads condition variable允许在wait()的时候指定一个mutex但想不出有什么理由一个condition variable会和不同的mutex配合使用两个线程使用同一个条件变量的wait()时可传不同的mutex参数给条件变量的wait()。Java的intrinsic condition和Condition class都不支持这么做因此可以放弃这一灵活性老老实实地一对一。
相反boost::thread的condition_variable是在wait()时指定mutex的。以下是boost的同步原语的庞杂设计 1.Concept有Lockable、TimedLockable、SharedLockable、UpgradeLockable。
2.Lock有lock_guard、unique_lock、shared_lock、upgrade_lock、upgrade_to_unique_lock、scoped_try_lock。
3.Mutex有mutex、try_mutex、timed_mutex、recursive_mutex、recursive_try_mutex、recursive_timed_mutex、shared_mutex。
见到boost::thread这样如Rube Goldberg Machine鲁布·戈尔德伯格机是一种机械装置或装置链旨在以非常复杂和不必要的方式执行相对简单的任务一样让人眼花缭乱的库只得绕道而行。C11的线程库也采纳了这套方案这些class名字也很无厘头不老老实实用readers_writer_lock这样通俗名字非得增加精神负担自己发明新名字。作者不愿为这样的灵活性付出代价宁愿自己做几个简单的一看就明白的class来用这种简单的几行代码的轮子造造也无妨。
以下muduo::Condition class简单地封装了Pthreads condition variable用起来也容易这里用notify/notifyAll作为函数名因为signal有别的含义C里的signal/slot、C里的signal handler等就不overload这个术语了。
class Condition : boost::noncopyable
{
public:explicit Condition(MutexLock mutex) : mutex_(mutex){pthread_cond_init(pcond_, NULL);}~Condition(){pthread_cond_destory(pcond_);}void wait(){pthread_cond_wait(pcond_, mutex_.getPthreadMutex());}void notify(){pthread_cond_signal(pcond_);}void notifyAll(){pthread_cond_broadcast(pcond_);}private:MutexLock mutex_;pthread_cond_t pcond_;
};如果一个class要包含MutexLock和Condition要注意它们的生命顺序和初始化顺序mutex_应先于condition_构造并作为后者的构造参数
class CountDownLatch
{
public:// 初始化列表中初始化的顺序与成员声明的顺序一致CountDownLatch(int count) : mutex_(), condition_(mutex_), count_(count) { }private:// 顺序很重要先mutex后conditionmutable MutexLock mutex_;Condition condition_;int count_;
};虽然本章花了大量篇幅介绍如何正确使用mutex和condition variable但不代表鼓励到处使用它们这两者都是非常底层的同步原语主要用来实现更高级的并发编程工具。一个多线程程序里如果大量使用mutex和condition variable来同步基本跟用铅笔刀锯大树一样。
在程序里使用Pthreads库有一个额外好处分析工具认得它们懂得其语意。线程分析工具如Intel Thread Checker和Valgrind-Helgrind能识别Pthreads调用并依据happens-before关系分析程序有无data race。
研究Singleton的线程安全实现的历史会发现很多有意思的事人们一度认为double checked lockingDCL是王道兼顾了效率和正确性。后来有“神牛”指出由于乱序执行的影响DCL是靠不住的。Java开发者还算幸运可以借助内部静态类的装载来实现。C就比较惨要么次次锁要么eager initializeEager initialization急切初始化是一种初始化数据结构或对象的方法它在程序启动或对象创建时立即进行初始化这意味着在数据结构或对象第一次使用之前它已经包含了所需的初始化值相对应的是lazy initialization懒惰初始化它是一种延迟初始化的方式仅在首次访问时才进行初始化或动用memory barrier这样的“大杀器”。接下来Java 5修订了内存模型并给volatile赋予了acquire/release语义这下DCLwith volatile又是安全的了。但C的内存模型还在修订中C的volatile目前还不能将来也难说保证DCL的正确性。
注acquire和release是一种内存操作语义用于确保对共享数据的读取和写入操作在多线程环境下的正确同步 1.Acquire (获取)当一个线程执行acquire操作时它确保之前的所有读取和写入操作都已经完成。这意味着在acquire操作之前的所有写入都将在该操作之前对其他线程可见。Acquire语义通常与读取操作相关联以确保在读取之前所有必要的写入都已完成。
2.Release (释放)当一个线程执行release操作时它确保之后的所有读取和写入操作都不会提前执行。这意味着在release操作之后的所有写入都将在该操作之后对其他线程可见。Release语义通常与写入操作相关联以确保在写入之后所有必要的读取都不会提前执行。
其实实践中直接用pthread_once就行
template typename T
class Singleton : boost::noncopyable
{
public:static T instance(){pthread_once(ponce_, Singleton::init);return *value_;}private:Singleton();~Singleton();static void init(){value_ new T();}private:static pthread_once_t ponce_;static T* value_;
};// 必须在头文件中定义static变量
templatetypename T
pthread_once_t SingletonT::ponce_ PTHREAD_ONCE_INIT;templatetypename T
T* SingletonT::value_ NULL;上面这个Singleton没有任何花哨的地方它用pthread_once_t来保证lazy-initialization的线程安全。线程安全性由Pthreads库保证如果系统的Pthreads库有bug那就认命吧。
使用方法也很简单
Foo foo SingletonFoo::instance();这个Singleton没有考虑对象的销毁。在长时间运行的服务器程序里这不是一个问题反正进程也不打算正常退出。在短期运行的程序中程序退出的时候自然就释放所有资源了前提是程序里不使用不能由操作系统自动关闭的资源如跨进程的mutex。在实际的muduo::Singleton class中通过atexit函数提供了销毁功能聊胜于无罢了。
另外以上Singleton只能调用默认构造函数。如果用户想要指定T的构造方式可以用模板特化template specialization技术来提供一个定制点这需要引入另一层间接another level of indirection。类似这样
template typename T
class Singleton : boost::noncopyable
{
public:// 模板特化定制构造函数template typename... Argsstatic void initWithArgs(Args... args) {value_ new T(args...);}/* ... */
};// 模板特化版本的Singleton类允许定制构造方式
// 特例化一个函数模板时必须为原模板中每个模板参数都提供实参为指出我们在实例化一个模板
// 应使用template后跟一个空尖括号对空尖括号对指出我们将为原模板的所有模板参数提供实参
template
template typename... Args
void SingletonYourCustomType::initWithArgs(Args... args) {value_ new YourCustomType(args...);
}作者认为sleep()/usleep()/nanosleep()只能出现在测试代码中如写单元测试时或用于有意延长临界区加速复现死锁的情况。sleep不具备memory barrier语义不能保证内存的可见性。
生产代码中线程的等待可分为两种一种是等待资源可用要么等在select/poll/epoll_wait上要么等在条件变量上一种是等着进入临界区等在mutex上以便读写共享数据。后一种等待通常极短否则程序性能和伸缩性指程序能够有效地应对不同规模的工作负载和数据集而不需要重大的修改或性能下降。就会有问题。
在程序的正常执行中如果需要等待一段已知的时间应该往event loop里注册一个timer然后在timer的回调函数里接着干活因为县城是个珍贵的共享资源不能轻易浪费阻塞也是浪费。如果等待某个事件发生那么应该采用条件变量或IO事件回调不能用sleep来轮询。不要使用以下这种业余做法
while (true)
{if (!dataAvailable){sleep(some_time);} else{consumeData();}
}如果多线程的安全性和效率要靠代码主动调用sleep来保证这显然是设计出了问题。等待某个事件发生正确的做法是使用select()等价物或Condition抑或更理想地高层同步工具在用户态做轮询是低效的。
用好以上介绍的内容基本上就能应付多线程服务端开发的各种场合可能有人觉得性能没有发挥到极致但应该先把程序写正确并尽量保持清晰和简单然后再考虑性能优化如果确实还有必要优化的话。让一个正确的程序变快远比让一个快的程序变正确要容易。
在现代的多核计算背景下多线程是不可避免的。尽管在一定程度上可通过framework框架来屏蔽让你感觉像是在写单线程程序如Java Servlet。了解under the hood“在引擎盖下面”通常用来描述某事物的内部工作或机制发生了什么对于编写这种程序也会有帮助。
通篇来看效率不是作者的主要考虑点作者提倡正确加锁而不是自己编写lock-free算法使用原子整数除外更不要想当然地自己发明同步设施。在没有实测数据支持的情况下妄谈哪种做法效率更高是靠不住的不能听信传言或凭感觉优化。很多人误认为用锁会让程序变慢其实真正影响性能的不是锁而是锁争用lock contention。在程序的复杂度和性能之间取得平衡并考虑未来两三年扩容的可能无论是CPU变快、核数变多、机器数量增加、网络升级。在分布式系统中多机伸缩性scale out比单机的性能优化更值得投入精力。
借shared_ptr实现copy-on-write可以解决本章中的几个未决问题 1.post()和traverse()死锁。
2.把Request::print()移出Inventory::printAll()临界区。
3.解决Request对象析构的race condition。
之后再示范用普通mutex替换读写锁。解决办法都基于同一个思路就是用shared_ptr来管理共享数据原理如下 1.shared_ptr是引用计数型智能指针如果当前只有一个观察者那么引用计数的值为1。
2.对于write端如果发现引用计数为1这是可以安全地修改共享对象不必担心有人正在读它。
3.对于read端在读之前把引用计数加1读完之后减1这样保证在读的期间其引用计数大于1可以阻止并发写。
4.比较难的是对于write端如果发现引用计数大于1该如何处理sleep()一小段时间肯定是错的。
先看一个简单的例子解决post()和traverse()死锁。数据结构改成
typedef std::vectorFoo FooList;
typedef boost::shared_ptrFooList FooListPtr;
MutexLock mutex;
FooListPtr g_foos;在read端用一个栈上局部FooListPtr变量当作观察者它使得g_foos的引用计数增加
void traverse()
{FooListPtr foos;// 以下块作用域是traverse函数的临界区临界区里只读了一次共享变量g_foos// 这里多线程并发读写shared_ptr g_foos因此必须用mutex保护{MutexLockGuard lock(mutex);// 由于我们要进行读操作读之前把引用计数加1这样可以保证在读期间其引用计数大于1以阻止并发写foos g_foos;// shared_ptr的unique()成员函数用来判断自己是否是资源的唯一所有者// 此处至少有g_foos和foos在引用该资源一定不是唯一所有者assert(!g_foos.unique());}// assert(!foos.unique()); 这个断言不成立for (std::vectorFoo::const_iterator it foos-begin(); it ! foos-end(); it){it-doit();}
}关键是write端的post()如何写按前面的描述如果g_foos.unique()为true我们可以放心地在原地in-place修改FooList。如果g_foos.unique()为false说明这时别的线程正在读取FooList我们不能原地修改而是复制一份在副本上修改这样就避免了死锁
void post(const Foo f)
{printf(post\n);MutexLockGuardLock(mutex);if (!g_foos.unique()){g_foos.reset(new FooList(*g_foos));printf(copy the whole list\n);}assert(g_foos.unique());g_foos-push_back(f);
}以上post函数的临界区是整个函数其他写法都是错的。找出以下几种写法的错误
// 直接修改g_foos所指的FooList
void post(const Foo f)
{MutexLockGuard lock(mutex);// 此处的push_back和traverse中的遍历过程是并发的可能导致迭代器失效g_foos-push_back(f);
}// 试图缩小临界区把copying移出临界区这样每次都复制整个FooList有性能问题
// 还有逻辑错误如果有两个线程同时写那么两个写线程中第二个写的会把第一个写的覆盖
void post(const Foo f)
{FooListPtr newFoos(new FooList(*g_foos));newFoos-push_back(f);MutexLockGuard lock(mutex);g_foos newFoos; // 或g_foos.swap(newFoos);
}// 把临界区拆成两个小的把copying放到临界区外
void post(const Foo f)
{FooListPtr oldFoos;{MutexLockGuard lock(mutex);oldFoos g_foos;}// 第一次解锁后第二次加锁前可能其他线程改变了oldFoos会覆盖其他线程的修改FooListPtr newFoos(new FooList(*oldFoos));newFoos-push_back(f);MutexLockGuard lock(mutex);g_foos newFoos; // 或g_foos.swap(newFoos);
}解决把Request::print()移出Inventory::printAll()临界区有两个做法其一很简单把requests_复制一份在临界区外遍历这个副本
void Inventory::printAll() const
{std::setRequest* requests;{muduo::MutexLockGuard lock(mutex_);requests requests_;}// 遍历局部变量requests调用Request::print()
}这样做有一个明显的缺点它复制了整个std::set中的每个元素开销可能会比较大。如果遍历期间没有其他人修改requests_那么我们可以减小开销这就引入了第二种做法。
第二种做法的要点是用shared_ptr管理std::set在遍历时先增加引用计数阻止并发修改。当然Inventory::add()和Inventory::remove()也要相应修改采用前面post()和traverse()的方案。
用普通mutex替换读写锁的一个例子有以下场景一个多线程C程序24h x 5.5d运行。有几个工作线程Thread Worker{0,1,2,3}处理客户发过来的交易请求另外有一个背景线程ThreadBackground不定期更新程序内部的参考数据。这些线程都跟一个hash表打交道工作线程只读背景线程读写必然要用到一些同步机制防止数据损坏。以下示例代码用std::map代替hash表意思是一样的
using namespace std;
typedef mapstring, vectorpairstring, int Map;Map的key是用户名value是一个vector里边存的是不同stock的最小交易间隔vector已经排好序可以用二分查找。
我们的系统要求工作线程的延迟尽可能小可以容忍背景线程的延迟略大。一天之内背景线程对数据更新的次数屈指可数最多一小时一次更新的数据来自于网络所以对更新的及时性不敏感。Map的数据量也不大大约一千多条数据。
最简单的同步办法是用读写锁工作线程加读锁背景线程加写锁。但是读写锁的开销比普通mutex要大而且是写锁有限会阻塞后面的读锁。如果工作线程能用最普通的非重入mutex实现同步就不必用读写锁这能减轻工作线程延迟。我们借助shared_ptr做到了这一点
class CustomerData : boost::noncopyable
{
public:CustomerData() : data_(new Map) { }int query(const string customer, const string stock) const;private:typedef std::pairstring, int Entry;typedef std::vectorEntry EntryList;typedef std::mapstring, EntryList Map;typedef boost::shared_ptrMap MapPtr;void update(const string customer, const EntryList entries);// 用lower_bound函数在entries里找stockentries是有序的否则不能用lower_bound函数static int findEntry(const EntryList entries, const string stock);MapPtr getData() const{MutexLockGuard lock(mutex_);return data_;}mutable MutexLock mutex_;MapPtr data_;
}CustomerData::query()就用前面说的引用计数加1的办法用局部MapPtr data变量来持有Map防止并发修改
int CustomerData::query(const string customer, const string stock) const
{MapPtr data getData();// data一旦拿到就不再需要锁了// 取数据的时候只有getData()内部有锁多线程并发读的性能很好// 查找特定键是否存在于map中Map::const_iterator entries data-find(customer);if (entries ! data-end()){return findEntry(entries-second, stock);}else{return -1;}
}关键看CustomerData::update()怎么写。既然要更新数据那肯定得加锁如果此时其他线程正在读那么不能再原来的数据上修改得创建一个副本在副本上修改修改完了再替换。如果没有用户在读那么就能直接修改节约一次Map拷贝
// 每次收到一个customer的数据更新
void CustomerData::update(const string customer, const EntryList entries)
{// update必须全程持锁MutexLockGuard lock(mutex_);if (!data_.unique()){MapPtr newData(new Map(*data_));// 在这里打印日志然后统计日志来判断worst case发生的次数data_.swap(newData);}assert(data_.unique());(*data_)[customer] entries;
}CustomerData::update()中使用了shared_ptr::unique()来判断是不是有人在读如果有人在读那么我们不能直接修改因为query()并没有全程加锁只在getData()内部有锁。shared_ptr::swap()把data_替换为新副本而且我们还在锁里不会有别的线程来读可以放心地更新。如果别的reader线程已经刚刚通过getData()拿到了MapPtr它会读到稍旧的数据这不是问题因为数据更新来自网络如果网络稍有延迟反而正reader线程也会读到旧数据。
如果每次都更新全部数据且始终是在同一个线程更新数据临界区还可以进一步缩小
// 解析收到的消息返回新的MapPtr
MapPtr parseData(const string message);// 函数原型有变此时网络上传来的是完整的Map数据
void CustomerData::update(const string message)
{// 解析新数据在临界区之外MapPtr newData parseData(message);if (newData){MutexLockGuard lock(mutex_);// 不要用data_ newData;它会导致没有读者时在临界区内析构旧数据data_.swap(newData); }// 旧数据的析构也在临界区外进一步缩短了临界区
}根据作者线上环境测试大多情况下更新都是在原来数据上进行的拷贝的比例不到1%很高效。准确地说这不是copy-on-write而是copy-on-other-reading。
将来作者可能会采用无锁数据结构但目前这个实现已经非常好可以满足作者的要求。