并发编程有两种基本模型:
- message passing (消息传递)
- shared memory (共享内存)
分布式系统中,运行多台机器上的多个进程并发只有一种实用模型:message passing
线程同步的四项原则:
- 首先尽量最低限度地共享对象,减少需要同步地场合。一个对象能不暴露给别地线程不要暴露;如果要暴露,优先考虑 immutable 对象,实在不行才暴露可修改地对象,并用同步措施来充分保护它。
- 其次使用高级地并发编程构件。如 TaskQueue、Producer-Consumer Queue、CountDownLatch 等等。
- 最后不得已必须使用同步原语时,只用非递归地互斥器和条件变量,慎用读写锁,不要用信号量。
- 除了使用 atomic 整数之外,不自己编写 lock-free 代码,也不要用“内核级”同步原语。不凭空猜测“那种性能会更好”,比如 spin lock vs mutex.
互斥锁
原则:
- 使用 RAII 手法操作 mutex
- 只用不可重入锁
- 不手动 lock() unlock(),交给 RAII
- 不用跨进程地锁,进程间只用 TCP sockets。
- 必要时候可以考虑用 PTHREAD_MUTEX_ERRORCHECK 来排错。
mutex 分为递归和非递归两种,这是 POSIX 地叫法,另外名字是可重入和非可重入,这两种唯一地区别是同一个线程可以重复对 recursive mutex 加锁,但是不能重复对 non-recursive mutex 加锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| #include "../Mutex.h"
#include "../Thread.h"
#include <vector>
#include <stdio.h>
using namespace muduo;
class Foo
{
public:
void doit() const;
};
MutexLock mutex;
std::vector<Foo> foos;
void post(const Foo& f)
{
MutexLockGuard lock(mutex);
foos.push_back(f);
}
void traverse()
{
MutexLockGuard lock(mutex);
for (std::vector<Foo>::const_iterator it = foos.begin();
it != foos.end(); ++it)
{
it->doit();
}
}
void Foo::doit() const
{
Foo f;
post(f);
}
int main()
{
Foo f;
post(f);
traverse();
}
|
- post() 和 traverse() 加锁都没问题
- doit() 间接调用 post() 导致 mutex 非递归的会死锁,如果 mutex 为递归的,当 push_back() 可能会导致 vector 迭代器失效,程序偶尔 crash
死锁调试可以用 gdb 使用 thread apply all bt 指令
post 可以拆成 2 个函数
1
2
3
4
5
6
7
8
| void post(const Foo& f) {
MutexLockGuard lock(mutex);
postWithLockHold(f);
}
void postWithLockHold(const Foo& f) {
foos.push_back(f)
}
|
- 误用加锁版本死锁,可以通过调试解决
- 误用不加锁版本,数据损坏了
1
2
3
4
| void postWithLockHold(const Foo& f) {
assert(mutex.isLockedByThisThread()); //muduo::MutexLock 提供了这个成员函数
foos.push_back(f)
}
|
windows 的 CRITICAL_SECTION 轻量级的,结合 spin lock。在多 cpu 系统上,如果不能立刻拿到锁,会先 spin 一小段时间,如果还不能拿到锁,才挂起当前线程。Linux 的 PTHREAD_MUTEX_ADAPTIVE_NP 与此类似
死锁
死锁的条件
| 条件名称 | 核心定义 | 示例 | 破坏该条件的方案 | 破坏方案示例 |
|---|
| 资源互斥(Mutual Exclusion) | 资源只能被一个进程/线程独占,其他请求者必须等待(不可同时使用)。 | 打印机同一时间只能被一个程序使用;Java中的synchronized锁是独占锁。 | 用“共享资源”替代“独占资源”(仅适用于可共享场景) | 将“独占打印机”改为“打印队列”(多进程共享队列,而非直接独占设备)。 |
| 持有并等待(Hold and Wait) | 进程/线程已持有至少一个资源,同时又在等待其他进程/线程持有的资源。 | 线程A已获取“文件读锁”,又请求“文件写锁”(而写锁被线程B持有)。 | 要求进程/线程一次性获取所有所需资源,否则不持有任何资源 | 线程启动前,先申请“锁A+锁B”,只有两者都获取成功才执行;若缺一个,则释放已申请资源并等待。 |
| 不可剥夺(No Preemption) | 进程/线程持有的资源不能被强制剥夺,只能由持有者主动释放。 | 线程A持有的锁,不能被系统强制收回,必须等A执行完unlock()才释放。 | 允许系统/其他进程强制剥夺已持有的资源(需资源支持“可剥夺”特性) | 数据库事务中,若事务A等待事务B的锁超时,系统可强制回滚事务A,释放其持有的锁。 |
| 循环等待(Circular Wait) | 多个进程/线程形成环形等待链,每个节点都等待下一个节点的资源。 | 线程1等线程2的锁,线程2等线程3的锁,线程3等线程1的锁,形成闭环。 | 给所有资源编号,要求进程/线程按“从小到大”的固定顺序获取资源 | 给锁A编号1、锁B编号2,所有线程必须先获取编号小的锁A,再获取编号大的锁B,避免循环。 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| #include "../Mutex.h"
class Request
{
public:
void process() // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
print();
}
void print() const // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
}
private:
mutable muduo::MutexLock mutex_;
};
int main()
{
Request req;
req.process();
}
|
process 已经加锁,锁的作用域未整个函数,锁未释放前,调用函数 print 也加锁等待,持续等待无法退出,造成死锁现象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
| #include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <stdio.h>
class Request;
class Inventory
{
public:
void add(Request* req)
{
muduo::MutexLockGuard lock(mutex_);
requests_.insert(req);
}
void remove(Request* req) __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
requests_.erase(req);
}
void printAll() const;
private:
mutable muduo::MutexLock mutex_;
std::set<Request*> requests_;
};
Inventory g_inventory;
class Request
{
public:
void process() // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
g_inventory.add(this);
// ...
}
~Request() __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
sleep(1);
g_inventory.remove(this);
}
void print() const __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
}
private:
mutable muduo::MutexLock mutex_;
};
void Inventory::printAll() const
{
muduo::MutexLockGuard lock(mutex_);
sleep(1);
for (std::set<Request*>::const_iterator it = requests_.begin();
it != requests_.end();
++it)
{
(*it)->print();
}
printf("Inventory::printAll() unlocked\n");
}
/*
void Inventory::printAll() const
{
std::set<Request*> requests
{
muduo::MutexLockGuard lock(mutex_);
requests = requests_;
}
for (std::set<Request*>::const_iterator it = requests.begin();
it != requests.end();
++it)
{
(*it)->print();
}
}
*/
void threadFunc()
{
Request* req = new Request;
req->process();
delete req;
}
int main()
{
muduo::Thread thread(threadFunc);
thread.start();
usleep(500 * 1000);
g_inventory.printAll();
thread.join();
}
|
这个是线程 threadFunc 调用 ~Request 获取了 Request 的锁, 主线程调用 printAll 获取了 Inventory 实例的锁,~Request 在调用 remove 后才会释放锁,而 remove 又等待 Inventory 实例的锁释放来移除 Request 实例,两个线程间导致死锁。
条件变量
互斥锁是加锁原语,用来排他性的访问共享数据,它不是等待原语。使用 mutex 时,一般期望加锁不阻塞,能立刻拿到锁,尽快访问数据,尽快解锁,这样才不影响并发性和性能。
如果需要等待某个条件成立,则使用条件变量。条件变量是一个或多个线程等待某个布尔表达式为真,即等待线程的唤醒。条件变量的学名为管程(monitor)。Java Object 内置的 wait()、notify()、notifyAll() 是条件变量。这三个函数容易用错,一般建议用 java.util.concurrent 中的同步原语。
条件变量只有一种正确使用方式。对于 wait 端:
- 必须与 mutex 一起使用,该布尔表达式的读写受锁的保护。
- 在 mutex 已经上锁的情况下,才调用 wait()
- 把判断布尔条件和 wait() 放到 while 循环中
1
2
3
4
5
6
7
8
9
10
| muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque<int> queue;
int dequeue() {
MutexLockGuard lock(mutex);
while (queue.empty) { //必须用循环;必须在判断之后再 wait() ; if 可能会 spurious wakeup
cond.wait(); //这一步会原子地 unlock mutex 并进入等待,不会与 enqueue 死锁, wait() 执行完毕时会自动重新加锁
}
}
|
对于 singal/broadcast 端:
- 不一定要在 mutex 已上锁地情况下调用 signal。(理论上)
- 在 singal 之前一定要修改布尔表达式。
- 修改布尔表达式通常要用 mutex 保护(至少用在 full memory barrier)。
- 注意区分 singal 与 broadcast:“broadcast 通常用于表明状态变化,signal 通常用于表示资源可用。”
1
2
3
4
5
| void enqueue(int x) {
MutexLockGuard lock(mutex);
queue.push_back(x);
cond.notify(); //可用移出临界区之外
}
|
一般使用类模板:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| // excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)
#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H
#include "Condition.h"
#include "Mutex.h"
#include <boost/noncopyable.hpp>
#include <deque>
#include <assert.h>
namespace muduo
{
template<typename T>
class BlockingQueue : boost::noncopyable
{
public:
BlockingQueue()
: mutex_(),
notEmpty_(mutex_),
queue_()
{
}
void put(const T& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(x);
notEmpty_.notify();
}
T take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(queue_.front());
queue_.pop_front();
return front;
}
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
private:
mutable MutexLock mutex_;
Condition notEmpty_;
std::deque<T> queue_;
};
}
#endif // MUDUO_BASE_BLOCKINGQUEUE_H
|
倒计时(CountDownLatch)一种常见地同步手段,主要两个用途:
- 主线程发起多个子线程,等待这些子线程各自完成一定地任务之后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化。
- 主线程发起多个子线程,子线程都等待主线程,主线程完成其他的一下任务之后,通知所有子线程开始执行。通常用于多个子线程等待主线程发起“起跑”的命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| // excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)
#ifndef MUDUO_BASE_COUNTDOWNLATCH_H
#define MUDUO_BASE_COUNTDOWNLATCH_H
#include "Mutex.h"
#include "Condition.h"
#include <boost/noncopyable.hpp>
namespace muduo
{
class CountDownLatch : boost::noncopyable
{
public:
explicit CountDownLatch(int count)
: mutex_(),
condition_(mutex_),
count_(count)
{
}
void wait()
{
MutexLockGuard lock(mutex_);
while (count_ > 0)
{
condition_.wait();
}
}
void countDown()
{
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
{
condition_.notifyAll();
}
}
int getCount() const
{
MutexLockGuard lock(mutex_);
return count_;
}
private:
mutable MutexLock mutex_;
Condition condition_;
int count_;
};
}
#endif // MUDUO_BASE_COUNTDOWNLATCH_H
|
不要用读写锁和信号量
- 正确性来说,可能修改代码读锁函数调用了修改状态的函数,导致共享数据被修改。
- 性能方面,读写锁不见得比 mutex 高效。
- read lock 可能可以提升为 write lock。
- read lock 可重入,write lock 不可重入,但为了防止 writer 饥饿, writer lock 通常会阻塞后来得 reader lock,因此 read locker 在重入得时候可能死锁。在追求低延迟读写得场合也不适合读写锁。对读写有极高得性能要求,可考虑 read-copy-update。
信号量:不是必备同步原语,条件变量加互斥锁完全可替代。同时信号量有自己得计数值,通常数据结构中也会有长度值,需要时刻保持一致,增加复杂度,控制并发度,可考虑 ThreadPool
信号量(Semaphore)与互斥锁(Mutex)特性对比表
| 特性 | 信号量(Semaphore) | 互斥锁(Mutex) |
|---|
| 所有权 | 无(任何线程可获取/释放许可) | 有(仅持有线程可释放锁) |
| 典型操作示例 | 线程A获取许可后,线程B可释放该许可(语法合法) | 线程A加锁后,线程B尝试解锁会触发错误(如死锁) |
| 核心用途 | 控制并发访问数量(如限制5个线程同时访问资源) | 保证资源独占访问(如同一时间仅1个线程操作共享数据) |
封装 MutexLock 、MutexLockGuard 、Condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
| // excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)
#ifndef MUDUO_BASE_MUTEX_H
#define MUDUO_BASE_MUTEX_H
#include "Thread.h"
#include <boost/noncopyable.hpp>
#include <assert.h>
#include <pthread.h>
namespace muduo
{
class MutexLock : boost::noncopyable // C++ linux 端封装 mutex
{
public:
MutexLock()
: holder_(0)
{
pthread_mutex_init(&mutex_, NULL);
}
~MutexLock()
{
assert(holder_ == 0);
pthread_mutex_destroy(&mutex_);
}
bool isLockedByThisThread()
{
return holder_ == CurrentThread::tid();
}
void assertLocked()
{
assert(isLockedByThisThread());
}
// internal usage
void lock()
{
pthread_mutex_lock(&mutex_);
holder_ = CurrentThread::tid();
}
void unlock()
{
holder_ = 0;
pthread_mutex_unlock(&mutex_);
}
pthread_mutex_t* getPthreadMutex() /* non-const */
{
return &mutex_;
}
private:
pthread_mutex_t mutex_;
pid_t holder_;
};
class MutexLockGuard : boost::noncopyable //RAII 技术
{
public:
explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex)
{
mutex_.lock();
}
~MutexLockGuard()
{
mutex_.unlock();
}
private:
MutexLock& mutex_;
};
}
// Prevent misuse like:
// MutexLockGuard(mutex_);
// A tempory object doesn't hold the lock for long!
#define MutexLockGuard(x) error "Missing guard object name"
//#define MutexLockGuard(x) static_assert(false, "Missing guard object name") //书中用得断言方式
//这样写成宏,只要这样写就会报错,必须 MutexLockGuard lock(mutex); 定义,不能匿名定义
#endif // MUDUO_BASE_MUTEX_H
|
以上代码达不到工业强度:
- mutex 创建用PTHREAD_MUTEX_DEFAULT 类型,不是 PTHREAD_MUTEX_NORMAL 类型,严格说使用 mutexattr 指定锁类型
- 没有检测返回值。assert 使用 release build 会被优化成空语句。goolge-glog 的 CHECK() 宏是个思路。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| // excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)
#ifndef MUDUO_BASE_CONDITION_H
#define MUDUO_BASE_CONDITION_H
#include "Mutex.h"
#include <boost/noncopyable.hpp>
#include <pthread.h>
#include <errno.h>
namespace muduo
{
class Condition : boost::noncopyable
{
public:
explicit Condition(MutexLock& mutex) : mutex_(mutex)
{
pthread_cond_init(&pcond_, NULL);
}
~Condition()
{
pthread_cond_destroy(&pcond_);
}
void wait()
{
pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
}
// returns true if time out, false otherwise.
bool waitForSeconds(int seconds)
{
struct timespec abstime;
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += seconds;
return ETIMEDOUT == pthread_cond_timedwait(&pcond_, mutex_.getPthreadMutex(), &abstime);
}
void notify()
{
pthread_cond_signal(&pcond_);
}
void notifyAll()
{
pthread_cond_broadcast(&pcond_);
}
private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};
}
#endif // MUDUO_BASE_CONDITION_H
|
像上面 CountDownLatch 代码,使用 mutex 和 condition 同时时,注意初始化顺序,先 mutex 后 condition
线程安全的 Singleton 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| // excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)
#ifndef MUDUO_BASE_SINGLETON_H
#define MUDUO_BASE_SINGLETON_H
#include <boost/noncopyable.hpp>
#include <pthread.h>
#include <stdlib.h> // atexit
namespace muduo
{
template<typename T>
class Singleton : boost::noncopyable
{
public:
static T& instance()
{
pthread_once(&ponce_, &Singleton::init);
return *value_;
}
private:
Singleton();
~Singleton();
static void init()
{
value_ = new T();
::atexit(destroy);
}
static void destroy()
{
delete value_;
}
private:
static pthread_once_t ponce_;
static T* value_;
};
template<typename T>
pthread_once_t Singleton<T>::ponce_ = PTHREAD_ONCE_INIT;
template<typename T>
T* Singleton<T>::value_ = NULL;
}
#endif
|
sleep(3) 不是同步原语
生产代码中线程的等待分两种:一种是等待资源可用(要么等待在 select/poll/epoll_wait上,要么等待条件变量上);一种是等待进入临界区(mutex 上)以便读取数据。后一种等待通常极短,否则程序的性能和伸缩性有问题。
归纳总结
- 线程的四项原则,尽量使用高层同步设施(线程池、队列、倒计时);
- 使用普通的互斥锁和条件变量完成剩余的同步任务,使用 RAII 手法和 Scoped Locking。
借 shared_ptr 实现 copy-on-write
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| #include "../Mutex.h"
#include "../Thread.h"
#include <vector>
#include <boost/shared_ptr.hpp>
#include <stdio.h>
using namespace muduo;
class Foo
{
public:
void doit() const;
};
typedef std::vector<Foo> FooList;
typedef boost::shared_ptr<FooList> FooListPtr;
FooListPtr g_foos;
MutexLock mutex;
void post(const Foo& f)
{
printf("post\n");
MutexLockGuard lock(mutex);
if (!g_foos.unique())
{
g_foos.reset(new FooList(*g_foos)); //复制当前容器内容创建新容器,再让共享指针指向新容器
printf("copy the whole list\n");
}
assert(g_foos.unique()); //新的为 1 写入数据。另一份通过 shared_ptr 自动管理生命周期释放。
g_foos->push_back(f);
}
void traverse()
{
FooListPtr foos;
{
MutexLockGuard lock(mutex); //多线程读写 shared_ptr ,必须用 mutex 保护
foos = g_foos;
assert(!g_foos.unique());
}
// assert(!foos.unique()); this may not hold
for (std::vector<Foo>::const_iterator it = foos->begin();
it != foos->end(); ++it)
{
it->doit();
}
}
void Foo::doit() const
{
Foo f;
post(f);
}
int main()
{
g_foos.reset(new FooList);
Foo f;
post(f);
traverse();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
| #include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <boost/shared_ptr.hpp>
#include <stdio.h>
class Request;
class Inventory
{
public:
Inventory()
: requests_(new RequestList)
{
}
void add(Request* req)
{
muduo::MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
printf("Inventory::add() copy the whole list\n");
}
assert(requests_.unique());
requests_->insert(req);
}
void remove(Request* req) // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
printf("Inventory::remove() copy the whole list\n");
}
assert(requests_.unique());
requests_->erase(req);
}
void printAll() const;
private:
typedef std::set<Request*> RequestList;
typedef boost::shared_ptr<RequestList> RequestListPtr;
RequestListPtr getData() const
{
muduo::MutexLockGuard lock(mutex_);
return requests_;
}
mutable muduo::MutexLock mutex_;
RequestListPtr requests_;
};
Inventory g_inventory;
class Request
{
public:
Request()
: x_(0)
{
}
~Request() __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
x_ = -1;
sleep(1);
g_inventory.remove(this);
}
void process() // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
g_inventory.add(this);
// ...
}
void print() const __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
printf("print Request %p x=%d\n", this, x_);
}
private:
mutable muduo::MutexLock mutex_;
int x_;
};
void Inventory::printAll() const
{
RequestListPtr requests = getData();
sleep(1);
for (std::set<Request*>::const_iterator it = requests->begin();
it != requests->end();
++it)
{
(*it)->print();
}
}
void threadFunc()
{
Request* req = new Request;
req->process();
delete req;
}
int main()
{
muduo::Thread thread(threadFunc);
thread.start();
usleep(500*1000);
g_inventory.printAll();
thread.join();
}
|
Request 析构的 race condition 解决
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
| #include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <stdio.h>
class Request;
typedef boost::shared_ptr<Request> RequestPtr;
class Inventory
{
public:
Inventory()
: requests_(new RequestList)
{
}
void add(const RequestPtr& req)
{
muduo::MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
printf("Inventory::add() copy the whole list\n");
}
assert(requests_.unique());
requests_->insert(req);
}
void remove(const RequestPtr& req) // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
if (!requests_.unique())
{
requests_.reset(new RequestList(*requests_));
printf("Inventory::remove() copy the whole list\n");
}
assert(requests_.unique());
requests_->erase(req);
}
void printAll() const;
private:
typedef std::set<RequestPtr> RequestList;
typedef boost::shared_ptr<RequestList> RequestListPtr;
RequestListPtr getData() const
{
muduo::MutexLockGuard lock(mutex_);
return requests_;
}
mutable muduo::MutexLock mutex_;
RequestListPtr requests_;
};
Inventory g_inventory;
class Request : public boost::enable_shared_from_this<Request>
{
public:
Request()
: x_(0)
{
}
~Request()
{
x_ = -1;
}
void cancel() __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
x_ = 1;
sleep(1);
printf("cancel()\n");
g_inventory.remove(shared_from_this());
}
void process() // __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
g_inventory.add(shared_from_this());
// ...
}
void print() const __attribute__ ((noinline))
{
muduo::MutexLockGuard lock(mutex_);
// ...
printf("print Request %p x=%d\n", this, x_);
}
private:
mutable muduo::MutexLock mutex_;
int x_;
};
void Inventory::printAll() const
{
RequestListPtr requests = getData();
printf("printAll()\n");
sleep(1);
for (std::set<RequestPtr>::const_iterator it = requests->begin();
it != requests->end();
++it)
{
(*it)->print();
}
}
void threadFunc()
{
RequestPtr req(new Request);
req->process();
req->cancel();
}
int main()
{
muduo::Thread thread(threadFunc);
thread.start();
usleep(500*1000);
g_inventory.printAll();
thread.join();
}
|
普通 mutex 替换读写锁
背景线程读写,工作线程只读。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| #include <map>
#include <string>
#include <vector>
#include <boost/shared_ptr.hpp>
#include "../Mutex.h"
using std::string;
class CustomerData : boost::noncopyable
{
public:
CustomerData()
: data_(new Map)
{ }
int query(const string& customer, const string& stock) const;
private:
typedef std::pair<string, int> Entry;
typedef std::vector<Entry> EntryList;
typedef std::map<string, EntryList> Map;
typedef boost::shared_ptr<Map> MapPtr;
void update(const string& customer, const EntryList& entries);
void update(const string& message);
static int findEntry(const EntryList& entries, const string& stock);
static MapPtr parseData(const string& message);
MapPtr getData() const
{
muduo::MutexLockGuard lock(mutex_);
return data_;
}
mutable muduo::MutexLock mutex_;
MapPtr data_;
};
int CustomerData::query(const string& customer, const string& stock) const
{
MapPtr data = getData();
Map::const_iterator entries = data->find(customer);
if (entries != data->end())
return findEntry(entries->second, stock);
else
return -1;
}
void CustomerData::update(const string& customer, const EntryList& entries)
{
muduo::MutexLockGuard lock(mutex_);
if (!data_.unique())
{
MapPtr newData(new Map(*data_));
data_.swap(newData);
}
assert(data_.unique());
(*data_)[customer] = entries;
}
void CustomerData::update(const string& message)
{
MapPtr newData = parseData(message);
if (newData)
{
muduo::MutexLockGuard lock(mutex_);
data_.swap(newData);
}
}
int main()
{
CustomerData data;
}
|
Reference
http://queue.acm.org/detail.cfm?id=1454462
https://computing.llnl.gov/tutorials/pthreads
http://www.thinkingparallel.com/2007/02/19/please-dont-rely-on-memory-barriers-for-synchronization/
http://zaitcev.livejournal.com/144041.html
http://www.kernel.org/doc/Documentation/volatile-considered-harmful.txt
http://www.cs.wustl.edu/~schrnidt/PDF/locking-patterns.pdf Douglas Schmidt 论文
http://llvm.org/devmtg/2011-11/#talk3
http://zaval.org/resources/library/butenhof1.html
http://www.akkadia.org/drepper/futex.pdf
http://msdn.microsoft.com/en-us/library/windows/desktop/ms682530(v=vs.85).aspx
http://blog.csdn.net/panaimin/article/details/5981766
http://www.nwcpp.org/Downloads/2007/Machine_Architecture_-_NWCPP.pdf
http://www.aristeia.com/TalkNotes/ACCU2011_CPUCaches.pdf
http://igoro.com/archive/gallery-of-processor-cache-effects/
http://simplygenius.net/Article/FalseSharing
http://blog.csdn.net/solstice/article/details/11432817
http://blog.csdn.net/solstice/article/details/5829421#comments
http://en.wikipedia.org/wiki/Spurious_wakeup
http://en.wikipedia.org/wiki/Read-copy-update
https://www.artima.com/shop/effective_cpp_in_an_embedded_environment
http://www.drdobbs.com/cpp/lock-free-code-a-false-sense-of-security/210600279
Intel Thread Ckecker
Valgrind-Helgrind
http://valgrind.org/docs/manual/hg-manual.html/#hg-manual.data-races.algorithm
http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf
http://www.cs.wustl.edu/~schmidt/PDF/DC-Locking.pdf
http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
http://www.javaworld.com/jw-02-2001/jw-0209-double.html
http://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf
http://scottmeyers.blogspot.com/2012/04/information-on-c11-memory-model.html
《Ad Hoc Synchronization Considered Harmful》 http://www.usenix.org/events/osdi10/tech/full_papers/Xiong.pdf
http://pdos.csail.mit.edu/papers/linux:osdi10.pdf
http://preshing.com/20111118/locks-arent-slow-lock-contention-is
《Programming with POSIX Threads》
RWC