02线程同步精要

最后修改:

并发编程有两种基本模型:

  • message passing (消息传递)
  • shared memory (共享内存)

分布式系统中,运行多台机器上的多个进程并发只有一种实用模型:message passing

线程同步的四项原则:

  1. 首先尽量最低限度地共享对象,减少需要同步地场合。一个对象能不暴露给别地线程不要暴露;如果要暴露,优先考虑 immutable 对象,实在不行才暴露可修改地对象,并用同步措施来充分保护它。
  2. 其次使用高级地并发编程构件。如 TaskQueue、Producer-Consumer Queue、CountDownLatch 等等。
  3. 最后不得已必须使用同步原语时,只用非递归地互斥器和条件变量,慎用读写锁,不要用信号量。
  4. 除了使用 atomic 整数之外,不自己编写 lock-free 代码,也不要用“内核级”同步原语。不凭空猜测“那种性能会更好”,比如 spin lock vs mutex.

互斥锁

原则:

  • 使用 RAII 手法操作 mutex
  • 只用不可重入锁
  • 不手动 lock() unlock(),交给 RAII
  • 不用跨进程地锁,进程间只用 TCP sockets。
  • 必要时候可以考虑用 PTHREAD_MUTEX_ERRORCHECK 来排错。

mutex 分为递归和非递归两种,这是 POSIX 地叫法,另外名字是可重入和非可重入,这两种唯一地区别是同一个线程可以重复对 recursive mutex 加锁,但是不能重复对 non-recursive mutex 加锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include "../Mutex.h"
#include "../Thread.h"
#include <vector>
#include <stdio.h>

using namespace muduo;

class Foo
{
 public:
  void doit() const;
};

MutexLock mutex;
std::vector<Foo> foos;

void post(const Foo& f)
{
  MutexLockGuard lock(mutex);
  foos.push_back(f);
}

void traverse()
{
  MutexLockGuard lock(mutex);
  for (std::vector<Foo>::const_iterator it = foos.begin();
      it != foos.end(); ++it)
  {
    it->doit();
  }
}

void Foo::doit() const
{
  Foo f;
  post(f);
}

int main()
{
  Foo f;
  post(f);
  traverse();
}

  1. post() 和 traverse() 加锁都没问题
  2. doit() 间接调用 post() 导致 mutex 非递归的会死锁,如果 mutex 为递归的,当 push_back() 可能会导致 vector 迭代器失效,程序偶尔 crash

死锁调试可以用 gdb 使用 thread apply all bt 指令

post 可以拆成 2 个函数

1
2
3
4
5
6
7
8
void post(const Foo& f) {
    MutexLockGuard lock(mutex);
    postWithLockHold(f);
}

void postWithLockHold(const Foo& f) {
    foos.push_back(f)
}
  1. 误用加锁版本死锁,可以通过调试解决
  2. 误用不加锁版本,数据损坏了
1
2
3
4
void postWithLockHold(const Foo& f) {
    assert(mutex.isLockedByThisThread());  //muduo::MutexLock 提供了这个成员函数
    foos.push_back(f)
}

windows 的 CRITICAL_SECTION 轻量级的,结合 spin lock。在多 cpu 系统上,如果不能立刻拿到锁,会先 spin 一小段时间,如果还不能拿到锁,才挂起当前线程。Linux 的 PTHREAD_MUTEX_ADAPTIVE_NP 与此类似

死锁

死锁的条件

条件名称核心定义示例破坏该条件的方案破坏方案示例
资源互斥(Mutual Exclusion)资源只能被一个进程/线程独占,其他请求者必须等待(不可同时使用)。打印机同一时间只能被一个程序使用;Java中的synchronized锁是独占锁。用“共享资源”替代“独占资源”(仅适用于可共享场景)将“独占打印机”改为“打印队列”(多进程共享队列,而非直接独占设备)。
持有并等待(Hold and Wait)进程/线程已持有至少一个资源,同时又在等待其他进程/线程持有的资源。线程A已获取“文件读锁”,又请求“文件写锁”(而写锁被线程B持有)。要求进程/线程一次性获取所有所需资源,否则不持有任何资源线程启动前,先申请“锁A+锁B”,只有两者都获取成功才执行;若缺一个,则释放已申请资源并等待。
不可剥夺(No Preemption)进程/线程持有的资源不能被强制剥夺,只能由持有者主动释放。线程A持有的锁,不能被系统强制收回,必须等A执行完unlock()才释放。允许系统/其他进程强制剥夺已持有的资源(需资源支持“可剥夺”特性)数据库事务中,若事务A等待事务B的锁超时,系统可强制回滚事务A,释放其持有的锁。
循环等待(Circular Wait)多个进程/线程形成环形等待链,每个节点都等待下一个节点的资源。线程1等线程2的锁,线程2等线程3的锁,线程3等线程1的锁,形成闭环。给所有资源编号,要求进程/线程按“从小到大”的固定顺序获取资源给锁A编号1、锁B编号2,所有线程必须先获取编号小的锁A,再获取编号大的锁B,避免循环。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include "../Mutex.h"

class Request
{
 public:
  void process() // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    print();
  }

  void print() const // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
  }

 private:
  mutable muduo::MutexLock mutex_;
};

int main()
{
  Request req;
  req.process();
}

process 已经加锁,锁的作用域未整个函数,锁未释放前,调用函数 print 也加锁等待,持续等待无法退出,造成死锁现象。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
#include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <stdio.h>

class Request;

class Inventory
{
 public:
  void add(Request* req)
  {
    muduo::MutexLockGuard lock(mutex_);
    requests_.insert(req);
  }

  void remove(Request* req) __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    requests_.erase(req);
  }

  void printAll() const;

 private:
  mutable muduo::MutexLock mutex_;
  std::set<Request*> requests_;
};

Inventory g_inventory;

class Request
{
 public:
  void process() // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    g_inventory.add(this);
    // ...
  }

  ~Request() __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    sleep(1);
    g_inventory.remove(this);
  }

  void print() const __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    // ...
  }

 private:
  mutable muduo::MutexLock mutex_;
};

void Inventory::printAll() const
{
  muduo::MutexLockGuard lock(mutex_);
  sleep(1);
  for (std::set<Request*>::const_iterator it = requests_.begin();
      it != requests_.end();
      ++it)
  {
    (*it)->print();
  }
  printf("Inventory::printAll() unlocked\n");
}

/*
void Inventory::printAll() const
{
  std::set<Request*> requests
  {
    muduo::MutexLockGuard lock(mutex_);
    requests = requests_;
  }
  for (std::set<Request*>::const_iterator it = requests.begin();
      it != requests.end();
      ++it)
  {
    (*it)->print();
  }
}
*/

void threadFunc()
{
  Request* req = new Request;
  req->process();
  delete req;
}

int main()
{
  muduo::Thread thread(threadFunc);
  thread.start();
  usleep(500 * 1000);
  g_inventory.printAll();
  thread.join();
}

这个是线程 threadFunc 调用 ~Request 获取了 Request 的锁, 主线程调用 printAll 获取了 Inventory 实例的锁,~Request 在调用 remove 后才会释放锁,而 remove 又等待 Inventory 实例的锁释放来移除 Request 实例,两个线程间导致死锁。

条件变量

互斥锁是加锁原语,用来排他性的访问共享数据,它不是等待原语。使用 mutex 时,一般期望加锁不阻塞,能立刻拿到锁,尽快访问数据,尽快解锁,这样才不影响并发性和性能。

如果需要等待某个条件成立,则使用条件变量。条件变量是一个或多个线程等待某个布尔表达式为真,即等待线程的唤醒。条件变量的学名为管程(monitor)。Java Object 内置的 wait()、notify()、notifyAll() 是条件变量。这三个函数容易用错,一般建议用 java.util.concurrent 中的同步原语。

条件变量只有一种正确使用方式。对于 wait 端:

  1. 必须与 mutex 一起使用,该布尔表达式的读写受锁的保护。
  2. 在 mutex 已经上锁的情况下,才调用 wait()
  3. 把判断布尔条件和 wait() 放到 while 循环中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque<int> queue;

int dequeue() {
    MutexLockGuard lock(mutex);
    while (queue.empty) { //必须用循环;必须在判断之后再 wait() ; if 可能会 spurious wakeup 
        cond.wait(); //这一步会原子地 unlock mutex 并进入等待,不会与 enqueue 死锁, wait() 执行完毕时会自动重新加锁
    }
}

对于 singal/broadcast 端:

  1. 不一定要在 mutex 已上锁地情况下调用 signal。(理论上)
  2. 在 singal 之前一定要修改布尔表达式。
  3. 修改布尔表达式通常要用 mutex 保护(至少用在 full memory barrier)。
  4. 注意区分 singal 与 broadcast:“broadcast 通常用于表明状态变化,signal 通常用于表示资源可用。”
1
2
3
4
5
void enqueue(int x) {
    MutexLockGuard lock(mutex);
    queue.push_back(x);
    cond.notify();  //可用移出临界区之外
}

一般使用类模板:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H

#include "Condition.h"
#include "Mutex.h"

#include <boost/noncopyable.hpp>
#include <deque>
#include <assert.h>

namespace muduo
{

template<typename T>
class BlockingQueue : boost::noncopyable
{
 public:
  BlockingQueue()
    : mutex_(),
      notEmpty_(mutex_),
      queue_()
  {
  }

  void put(const T& x)
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(x);
    notEmpty_.notify();
  }

  T take()
  {
    MutexLockGuard lock(mutex_);
    // always use a while-loop, due to spurious wakeup
    while (queue_.empty())
    {
      notEmpty_.wait();
    }
    assert(!queue_.empty());
    T front(queue_.front());
    queue_.pop_front();
    return front;
  }

  size_t size() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.size();
  }

 private:
  mutable MutexLock mutex_;
  Condition         notEmpty_;
  std::deque<T>     queue_;
};

}

#endif  // MUDUO_BASE_BLOCKINGQUEUE_H

倒计时(CountDownLatch)一种常见地同步手段,主要两个用途:

  1. 主线程发起多个子线程,等待这些子线程各自完成一定地任务之后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
  2. 主线程发起多个子线程,子线程都等待主线程,主线程完成其他的一下任务之后,通知所有子线程开始执行。通常用于多个子线程等待主线程发起“起跑”的命令。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_COUNTDOWNLATCH_H
#define MUDUO_BASE_COUNTDOWNLATCH_H

#include "Mutex.h"
#include "Condition.h"

#include <boost/noncopyable.hpp>

namespace muduo
{

class CountDownLatch : boost::noncopyable
{
 public:

  explicit CountDownLatch(int count)
    : mutex_(),
      condition_(mutex_),
      count_(count)
  {
  }

  void wait()
  {
    MutexLockGuard lock(mutex_);
    while (count_ > 0)
    {
      condition_.wait();
    }
  }

  void countDown()
  {
    MutexLockGuard lock(mutex_);
    --count_;
    if (count_ == 0)
    {
      condition_.notifyAll();
    }
  }

  int getCount() const
  {
    MutexLockGuard lock(mutex_);
    return count_;
  }

 private:
  mutable MutexLock mutex_;
  Condition condition_;
  int count_;
};

}
#endif  // MUDUO_BASE_COUNTDOWNLATCH_H

不要用读写锁和信号量

  • 正确性来说,可能修改代码读锁函数调用了修改状态的函数,导致共享数据被修改。
  • 性能方面,读写锁不见得比 mutex 高效。
  • read lock 可能可以提升为 write lock。
  • read lock 可重入,write lock 不可重入,但为了防止 writer 饥饿, writer lock 通常会阻塞后来得 reader lock,因此 read locker 在重入得时候可能死锁。在追求低延迟读写得场合也不适合读写锁。对读写有极高得性能要求,可考虑 read-copy-update。

信号量:不是必备同步原语,条件变量加互斥锁完全可替代。同时信号量有自己得计数值,通常数据结构中也会有长度值,需要时刻保持一致,增加复杂度,控制并发度,可考虑 ThreadPool

信号量(Semaphore)与互斥锁(Mutex)特性对比表

特性信号量(Semaphore)互斥锁(Mutex)
所有权无(任何线程可获取/释放许可)有(仅持有线程可释放锁)
典型操作示例线程A获取许可后,线程B可释放该许可(语法合法)线程A加锁后,线程B尝试解锁会触发错误(如死锁)
核心用途控制并发访问数量(如限制5个线程同时访问资源)保证资源独占访问(如同一时间仅1个线程操作共享数据)

封装 MutexLock 、MutexLockGuard 、Condition

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_MUTEX_H
#define MUDUO_BASE_MUTEX_H

#include "Thread.h"

#include <boost/noncopyable.hpp>
#include <assert.h>
#include <pthread.h>

namespace muduo
{

class MutexLock : boost::noncopyable  // C++ linux 端封装 mutex
{
 public:
  MutexLock()
    : holder_(0)
  {
    pthread_mutex_init(&mutex_, NULL);
  }

  ~MutexLock()
  {
    assert(holder_ == 0);
    pthread_mutex_destroy(&mutex_);
  }

  bool isLockedByThisThread()
  {
    return holder_ == CurrentThread::tid();
  }

  void assertLocked()
  {
    assert(isLockedByThisThread());
  }

  // internal usage

  void lock()
  {
    pthread_mutex_lock(&mutex_);
    holder_ = CurrentThread::tid();
  }

  void unlock()
  {
    holder_ = 0;
    pthread_mutex_unlock(&mutex_);
  }

  pthread_mutex_t* getPthreadMutex() /* non-const */
  {
    return &mutex_;
  }

 private:

  pthread_mutex_t mutex_;
  pid_t holder_;
};

class MutexLockGuard : boost::noncopyable   //RAII 技术
{
 public:
  explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex)
  {
    mutex_.lock();
  }

  ~MutexLockGuard()
  {
    mutex_.unlock();
  }

 private:

  MutexLock& mutex_;
};

}

// Prevent misuse like:
// MutexLockGuard(mutex_);
// A tempory object doesn't hold the lock for long!
#define MutexLockGuard(x) error "Missing guard object name"

//#define MutexLockGuard(x) static_assert(false, "Missing guard object name") //书中用得断言方式

//这样写成宏,只要这样写就会报错,必须 MutexLockGuard  lock(mutex); 定义,不能匿名定义

#endif  // MUDUO_BASE_MUTEX_H

以上代码达不到工业强度:

  1. mutex 创建用PTHREAD_MUTEX_DEFAULT 类型,不是 PTHREAD_MUTEX_NORMAL 类型,严格说使用 mutexattr 指定锁类型
  2. 没有检测返回值。assert 使用 release build 会被优化成空语句。goolge-glog 的 CHECK() 宏是个思路。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_CONDITION_H
#define MUDUO_BASE_CONDITION_H

#include "Mutex.h"

#include <boost/noncopyable.hpp>
#include <pthread.h>
#include <errno.h>

namespace muduo
{

class Condition : boost::noncopyable
{
 public:
  explicit Condition(MutexLock& mutex) : mutex_(mutex)
  {
    pthread_cond_init(&pcond_, NULL);
  }

  ~Condition()
  {
    pthread_cond_destroy(&pcond_);
  }

  void wait()
  {
    pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
  }

  // returns true if time out, false otherwise.
  bool waitForSeconds(int seconds)
  {
    struct timespec abstime;
    clock_gettime(CLOCK_REALTIME, &abstime);
    abstime.tv_sec += seconds;
    return ETIMEDOUT == pthread_cond_timedwait(&pcond_, mutex_.getPthreadMutex(), &abstime);
  }

  void notify()
  {
    pthread_cond_signal(&pcond_);
  }

  void notifyAll()
  {
    pthread_cond_broadcast(&pcond_);
  }

 private:
  MutexLock& mutex_;
  pthread_cond_t pcond_;
};

}
#endif  // MUDUO_BASE_CONDITION_H

像上面 CountDownLatch 代码,使用 mutex 和 condition 同时时,注意初始化顺序,先 mutex 后 condition

线程安全的 Singleton 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_SINGLETON_H
#define MUDUO_BASE_SINGLETON_H

#include <boost/noncopyable.hpp>
#include <pthread.h>
#include <stdlib.h> // atexit

namespace muduo
{

template<typename T>
class Singleton : boost::noncopyable
{
 public:
  static T& instance()
  {
    pthread_once(&ponce_, &Singleton::init);
    return *value_;
  }

 private:
  Singleton();
  ~Singleton();

  static void init()
  {
    value_ = new T();
    ::atexit(destroy);
  }

  static void destroy()
  {
    delete value_;
  }

 private:
  static pthread_once_t ponce_;
  static T*             value_;
};

template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;

template<typename T>
T* Singleton<T>::value_ = NULL;

}
#endif

sleep(3) 不是同步原语

生产代码中线程的等待分两种:一种是等待资源可用(要么等待在 select/poll/epoll_wait上,要么等待条件变量上);一种是等待进入临界区(mutex 上)以便读取数据。后一种等待通常极短,否则程序的性能和伸缩性有问题。

归纳总结

  • 线程的四项原则,尽量使用高层同步设施(线程池、队列、倒计时);
  • 使用普通的互斥锁和条件变量完成剩余的同步任务,使用 RAII 手法和 Scoped Locking。

借 shared_ptr 实现 copy-on-write

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "../Mutex.h"
#include "../Thread.h"
#include <vector>
#include <boost/shared_ptr.hpp>
#include <stdio.h>

using namespace muduo;

class Foo
{
 public:
  void doit() const;
};

typedef std::vector<Foo> FooList;
typedef boost::shared_ptr<FooList> FooListPtr;
FooListPtr g_foos;
MutexLock mutex;

void post(const Foo& f)
{
  printf("post\n");
  MutexLockGuard lock(mutex);
  if (!g_foos.unique())
  {
    g_foos.reset(new FooList(*g_foos)); //复制当前容器内容创建新容器,再让共享指针指向新容器
    printf("copy the whole list\n");
  }
  assert(g_foos.unique()); //新的为 1 写入数据。另一份通过 shared_ptr 自动管理生命周期释放。
  g_foos->push_back(f);
}

void traverse()
{
  FooListPtr foos;
  {
    MutexLockGuard lock(mutex); //多线程读写 shared_ptr ,必须用 mutex 保护
    foos = g_foos;
    assert(!g_foos.unique());
  }

  // assert(!foos.unique()); this may not hold

  for (std::vector<Foo>::const_iterator it = foos->begin();
      it != foos->end(); ++it)
  {
    it->doit();
  }
}

void Foo::doit() const
{
  Foo f;
  post(f);
}

int main()
{
  g_foos.reset(new FooList);
  Foo f;
  post(f);
  traverse();
}

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <boost/shared_ptr.hpp>
#include <stdio.h>

class Request;

class Inventory
{
 public:
  Inventory()
    : requests_(new RequestList)
  {
  }

  void add(Request* req)
  {
    muduo::MutexLockGuard lock(mutex_);
    if (!requests_.unique())
    {
      requests_.reset(new RequestList(*requests_));
      printf("Inventory::add() copy the whole list\n");
    }
    assert(requests_.unique());
    requests_->insert(req);
  }

  void remove(Request* req) // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    if (!requests_.unique())
    {
      requests_.reset(new RequestList(*requests_));
      printf("Inventory::remove() copy the whole list\n");
    }
    assert(requests_.unique());
    requests_->erase(req);
  }

  void printAll() const;

 private:
  typedef std::set<Request*> RequestList;
  typedef boost::shared_ptr<RequestList> RequestListPtr;

  RequestListPtr getData() const
  {
    muduo::MutexLockGuard lock(mutex_);
    return requests_;
  }

  mutable muduo::MutexLock mutex_;
  RequestListPtr requests_;
};

Inventory g_inventory;

class Request
{
 public:
  Request()
    : x_(0)
  {
  }

  ~Request() __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    x_ = -1;
    sleep(1);
    g_inventory.remove(this);
  }

  void process() // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    g_inventory.add(this);
    // ...
  }

  void print() const __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    // ...
    printf("print Request %p x=%d\n", this, x_);
  }

 private:
  mutable muduo::MutexLock mutex_;
  int x_;
};

void Inventory::printAll() const
{
  RequestListPtr requests = getData();
  sleep(1);
  for (std::set<Request*>::const_iterator it = requests->begin();
      it != requests->end();
      ++it)
  {
    (*it)->print();
  }
}

void threadFunc()
{
  Request* req = new Request;
  req->process();
  delete req;
}

int main()
{
  muduo::Thread thread(threadFunc);
  thread.start();
  usleep(500*1000);
  g_inventory.printAll();
  thread.join();
}

Request 析构的 race condition 解决

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <stdio.h>

class Request;
typedef boost::shared_ptr<Request> RequestPtr;

class Inventory
{
 public:
  Inventory()
    : requests_(new RequestList)
  {
  }

  void add(const RequestPtr& req)
  {
    muduo::MutexLockGuard lock(mutex_);
    if (!requests_.unique())
    {
      requests_.reset(new RequestList(*requests_));
      printf("Inventory::add() copy the whole list\n");
    }
    assert(requests_.unique());
    requests_->insert(req);
  }

  void remove(const RequestPtr& req) // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    if (!requests_.unique())
    {
      requests_.reset(new RequestList(*requests_));
      printf("Inventory::remove() copy the whole list\n");
    }
    assert(requests_.unique());
    requests_->erase(req);
  }

  void printAll() const;

 private:
  typedef std::set<RequestPtr> RequestList;
  typedef boost::shared_ptr<RequestList> RequestListPtr;

  RequestListPtr getData() const
  {
    muduo::MutexLockGuard lock(mutex_);
    return requests_;
  }

  mutable muduo::MutexLock mutex_;
  RequestListPtr requests_;
};

Inventory g_inventory;

class Request : public boost::enable_shared_from_this<Request>
{
 public:
  Request()
    : x_(0)
  {
  }

  ~Request()
  {
    x_ = -1;
  }

  void cancel() __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    x_ = 1;
    sleep(1);
    printf("cancel()\n");
    g_inventory.remove(shared_from_this());
  }

  void process() // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    g_inventory.add(shared_from_this());
    // ...
  }

  void print() const __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    // ...
    printf("print Request %p x=%d\n", this, x_);
  }

 private:
  mutable muduo::MutexLock mutex_;
  int x_;
};

void Inventory::printAll() const
{
  RequestListPtr requests = getData();
  printf("printAll()\n");
  sleep(1);
  for (std::set<RequestPtr>::const_iterator it = requests->begin();
      it != requests->end();
      ++it)
  {
    (*it)->print();
  }
}

void threadFunc()
{
  RequestPtr req(new Request);
  req->process();
  req->cancel();
}

int main()
{
  muduo::Thread thread(threadFunc);
  thread.start();
  usleep(500*1000);
  g_inventory.printAll();
  thread.join();
}

普通 mutex 替换读写锁

背景线程读写,工作线程只读。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <map>
#include <string>
#include <vector>

#include <boost/shared_ptr.hpp>

#include "../Mutex.h"

using std::string;

class CustomerData : boost::noncopyable
{
 public:
  CustomerData()
    : data_(new Map)
  { }

  int query(const string& customer, const string& stock) const;

 private:
  typedef std::pair<string, int> Entry;
  typedef std::vector<Entry> EntryList;
  typedef std::map<string, EntryList> Map;
  typedef boost::shared_ptr<Map> MapPtr;
  void update(const string& customer, const EntryList& entries);
  void update(const string& message);

  static int findEntry(const EntryList& entries, const string& stock);
  static MapPtr parseData(const string& message);

  MapPtr getData() const
  {
    muduo::MutexLockGuard lock(mutex_);
    return data_;
  }

  mutable muduo::MutexLock mutex_;
  MapPtr data_;
};

int CustomerData::query(const string& customer, const string& stock) const
{
  MapPtr data = getData();

  Map::const_iterator entries = data->find(customer);
  if (entries != data->end())
    return findEntry(entries->second, stock);
  else
    return -1;
}

void CustomerData::update(const string& customer, const EntryList& entries)
{
  muduo::MutexLockGuard lock(mutex_);
  if (!data_.unique())
  {
    MapPtr newData(new Map(*data_));
    data_.swap(newData);
  }
  assert(data_.unique());
  (*data_)[customer] = entries;
}

void CustomerData::update(const string& message)
{
  MapPtr newData = parseData(message);
  if (newData)
  {
    muduo::MutexLockGuard lock(mutex_);
    data_.swap(newData);
  }
}

int main()
{
  CustomerData data;
}

Reference

http://queue.acm.org/detail.cfm?id=1454462

https://computing.llnl.gov/tutorials/pthreads

http://www.thinkingparallel.com/2007/02/19/please-dont-rely-on-memory-barriers-for-synchronization/

http://zaitcev.livejournal.com/144041.html

http://www.kernel.org/doc/Documentation/volatile-considered-harmful.txt

http://www.cs.wustl.edu/~schrnidt/PDF/locking-patterns.pdf Douglas Schmidt 论文

http://llvm.org/devmtg/2011-11/#talk3

http://zaval.org/resources/library/butenhof1.html

http://www.akkadia.org/drepper/futex.pdf

http://msdn.microsoft.com/en-us/library/windows/desktop/ms682530(v=vs.85).aspx

http://blog.csdn.net/panaimin/article/details/5981766

http://www.nwcpp.org/Downloads/2007/Machine_Architecture_-_NWCPP.pdf

http://www.aristeia.com/TalkNotes/ACCU2011_CPUCaches.pdf

http://igoro.com/archive/gallery-of-processor-cache-effects/

http://simplygenius.net/Article/FalseSharing

http://blog.csdn.net/solstice/article/details/11432817 http://blog.csdn.net/solstice/article/details/5829421#comments

http://en.wikipedia.org/wiki/Spurious_wakeup

http://en.wikipedia.org/wiki/Read-copy-update

https://www.artima.com/shop/effective_cpp_in_an_embedded_environment

http://www.drdobbs.com/cpp/lock-free-code-a-false-sense-of-security/210600279

Intel Thread Ckecker

Valgrind-Helgrind

http://valgrind.org/docs/manual/hg-manual.html/#hg-manual.data-races.algorithm

http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf

http://www.cs.wustl.edu/~schmidt/PDF/DC-Locking.pdf

http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html

http://www.javaworld.com/jw-02-2001/jw-0209-double.html

http://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf

http://scottmeyers.blogspot.com/2012/04/information-on-c11-memory-model.html

《Ad Hoc Synchronization Considered Harmful》 http://www.usenix.org/events/osdi10/tech/full_papers/Xiong.pdf

http://pdos.csail.mit.edu/papers/linux:osdi10.pdf

http://preshing.com/20111118/locks-arent-slow-lock-contention-is

《Programming with POSIX Threads》

RWC

山重水复疑无路,柳暗花明又一村
使用 Hugo 构建
主题 StackJimmy 设计