01多线程下对象生命周期

最后修改:

多线程下析构函数

race condition

  • 析构对象时,是否有其他线程正在执行这个对象的其他成员函数等操作。
  • 对象执行操作,没有其他线程析构。
  • 对象操作或者析构之前,确定对象存活,没有其他线程操作。

shared_ptr 解决。

线程安全定义

一个线程安全的 class 满足的三个条件:

  1. 多线程同时访问,表现正确
  2. 不管操作系统如何调度、线程执行顺序如何
  3. 调用端代码无须额外的同步或其他协调动作

根据此要求,c++ 标准库中的 string、vector、map 等类型都不满足要求,需要加锁使用。

MutexLock 和 MutexLockGuard

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_MUTEX_H
#define MUDUO_BASE_MUTEX_H

#include "Thread.h"

#include <boost/noncopyable.hpp>
#include <assert.h>
#include <pthread.h>

namespace muduo
{

class MutexLock : boost::noncopyable  // C++ linux 端封装 mutex
{
 public:
  MutexLock()
    : holder_(0)
  {
    pthread_mutex_init(&mutex_, NULL);
  }

  ~MutexLock()
  {
    assert(holder_ == 0);
    pthread_mutex_destroy(&mutex_);
  }

  bool isLockedByThisThread()
  {
    return holder_ == CurrentThread::tid();
  }

  void assertLocked()
  {
    assert(isLockedByThisThread());
  }

  // internal usage

  void lock()
  {
    pthread_mutex_lock(&mutex_);
    holder_ = CurrentThread::tid();
  }

  void unlock()
  {
    holder_ = 0;
    pthread_mutex_unlock(&mutex_);
  }

  pthread_mutex_t* getPthreadMutex() /* non-const */
  {
    return &mutex_;
  }

 private:

  pthread_mutex_t mutex_;
  pid_t holder_;
};

class MutexLockGuard : boost::noncopyable   //RAII 技术
{
 public:
  explicit MutexLockGuard(MutexLock& mutex) : mutex_(mutex)
  {
    mutex_.lock();
  }

  ~MutexLockGuard()
  {
    mutex_.unlock();
  }

 private:

  MutexLock& mutex_;
};

}

// Prevent misuse like:
// MutexLockGuard(mutex_);
// A tempory object doesn't hold the lock for long!
#define MutexLockGuard(x) error "Missing guard object name"

//#define MutexLockGuard(x) static_assert(false, "Missing guard object name") //书中用得断言方式

//这样写成宏,只要这样写就会报错,必须 MutexLockGuard  lock(mutex); 定义,不能匿名定义

#endif  // MUDUO_BASE_MUTEX_H
特性重入锁(递归锁)不可重入锁(默认)
同一线程加锁次数允许多次加锁仅允许一次加锁
死锁风险需保证解锁次数与加锁次数一致,否则仍会死锁同一线程多次加锁直接死锁
创建方式需显式设置属性默认即支持
性能开销略高(需维护锁的递归计数)开销低
适用场景递归函数、同一线程多步骤加锁简单互斥场景,避免逻辑错误

c++11 实现的自旋锁,也是不可重入的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Copyright (c) 2020 by Chrono

#ifndef _SPIN_LOCK_HPP
#define _SPIN_LOCK_HPP

#include "cpplang.hpp"

BEGIN_NAMESPACE(cpp_study)

// atomic spinlock with TAS
class SpinLock final
{
public:
    using this_type   = SpinLock;
    using atomic_type = std::atomic_flag;

public:
    SpinLock() = default;
   ~SpinLock() = default;

    SpinLock(const this_type&) = delete;
    SpinLock& operator=(const this_type&) = delete;
public:
    void lock() noexcept
    {
        for(;;) {
            if (!m_lock.test_and_set()) {
                return;
            }

            std::this_thread::yield();
        }
    }

    bool try_lock() noexcept
    {
        return !m_lock.test_and_set();
    }

    void unlock() noexcept
    {
        m_lock.clear();
    }
private:
    atomic_type m_lock {false};
};

// RAII for lock
// you can change it to a template class
class SpinLockGuard final
{
public:
    using this_type      = SpinLockGuard;
    using spin_lock_type = SpinLock;
public:
    SpinLockGuard(spin_lock_type& lock) noexcept
        : m_lock(lock)
    {
        m_lock.lock();
    }

   ~SpinLockGuard() noexcept
   {
       m_lock.unlock();
   }

public:
    SpinLockGuard(const this_type&) = delete;
    SpinLockGuard& operator=(const this_type&) = delete;
private:
    spin_lock_type& m_lock;
};


END_NAMESPACE(cpp_study)

#endif  //_SPIN_LOCK_HPP

锁的分类与核心特性

1. 按「线程等待机制」分类

锁类型核心特征(等待机制)适用场景典型示例
自旋锁忙等待:获取锁失败时,线程循环重试(不挂起),通过 yield() 让出 CPU,但保持运行态。锁持有时间极短(微秒级),避免线程阻塞/唤醒的内核开销。std::atomic_flag 实现的自旋锁、Linux 内核 spinlock_t
互斥锁阻塞等待:获取锁失败时,线程被内核挂起(阻塞态),锁释放时由内核唤醒。锁持有时间较长(毫秒级),避免自旋导致的 CPU 空耗。std::mutexpthread_mutex_t(默认类型)。

2. 按「重入性」分类

锁类型核心特征(同一线程重复加锁)典型示例
不可重入锁同一线程对已持有的锁再次加锁会导致死锁。默认 std::mutex、基础自旋锁实现。
可重入锁同一线程可多次加锁,需通过「线程 ID + 递归计数」实现(解锁次数需匹配)。std::recursive_mutexPTHREAD_MUTEX_RECURSIVE 类型的 pthread_mutex_t

3. 按「竞争范围」分类

锁类型核心特征(竞争资源的范围)典型示例
线程锁仅同一进程内的线程竞争,锁状态存储在进程地址空间(用户态)。std::mutex、自旋锁、pthread_mutex_t
进程锁多进程间竞争,锁状态存储在共享资源(文件、共享内存)。文件锁(fcntl)、System V 信号量。

4. 按「功能特性」分类(特定场景优化)

锁类型核心特征(功能优化)适用场景典型示例
读写锁区分读写操作:多线程同时读(共享),写操作独占(互斥)。读多写少场景(如缓存、配置文件)。std::shared_mutex(C++17)、pthread_rwlock_t
条件变量配合互斥锁实现「等待-通知」机制,解决盲目等待问题。生产者-消费者模型、线程同步信号。std::condition_variablepthread_cond_t
信号量通过计数器控制资源访问数量(支持 N 个并发访问)。限制资源并发数(如连接池、线程池)。std::counting_semaphore(C++20)、POSIX 信号量。

对象的创建

对象构造要做到线程安全,就不能在构造期间泄露 this 指针

  • 不要再构造函数中注册任何回调
  • 不要再构造函数中把 this 传递给跨线程的对象

对象析构

避免悬空指针和野指针

悬空指针:指向已经消毁的对象或已经回收的地址 野指针:未初始化的指针

面向对象设计中,对象的关系主要有三种:composition(组合/复合)、aggregation(聚合)、asscoiation(关联/联系)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <algorithm>
#include <vector>
#include <stdio.h>

class Observable;

class Observer
{
 public:
  virtual ~Observer();
  virtual void update() = 0;

  void observe(Observable* s);

 protected:
  Observable* subject_;
};

class Observable
{
 public:
  void register_(Observer* x);
  void unregister(Observer* x);

  void notifyObservers()
  {
    for (size_t i = 0; i < observers_.size(); ++i)
    {
      Observer* x = observers_[i];
      if (x) {
        x->update(); // (3)
      }
    }
  }

 private:
  std::vector<Observer*> observers_;
};

Observer::~Observer()
{
  subject_->unregister(this);
}

void Observer::observe(Observable* s)
{
  s->register_(this);
  subject_ = s;
}

void Observable::register_(Observer* x)
{
  observers_.push_back(x);
}

void Observable::unregister(Observer* x)
{
  std::vector<Observer*>::iterator it = std::find(observers_.begin(), observers_.end(), x);
  if (it != observers_.end())
  {
    std::swap(*it, observers_.back());
    observers_.pop_back();
  }
}

// ---------------------

class Foo : public Observer
{
  virtual void update()
  {
    printf("Foo::update() %p\n", this);
  }
};

int main()
{
  Foo* p = new Foo;
  Observable subject;
  p->observe(&subject);
  subject.notifyObservers();
  delete p;
  subject.notifyObservers();
}

shared_ptr 和 weak_ptr

  • shared_ptr 强引用控制对象生命周期,指向的对象,直到最后一个没有析构或者reset()
  • weak_ptr 弱引用不控制对象生命周期,但是知道对象是否还活着,可以提升为 shared_ptr, 提升/lock() 是线程安全的
  • shared_ptr、weak_ptr 主流平台实现都没有用所,用原子能力,性能可以。
  • shared_ptr、weak_ptr、scoped_ptr 都是值语义

c++ 中内存错误大致几个方面:

  • 缓冲区溢出 : 用std::vector/std::string 或自己编写的 Buffer Class 做缓冲区,自动记住缓冲区长度,并通过成员函数而不是裸指针来修改缓冲区
  • 悬空指针/野指针:用 shared_ptr/weak_ptr
  • 重复释放:用 scoped_ptr, 只对对象析构时候释放一次
  • 内存泄漏:用 scoped_ptr, 对象析构时候自动释放内存
  • 不配对的 new[]/delete:把 net[]/delete 改为std::vector/scoped_array
  • 内存碎片
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <algorithm>
#include <vector>
#include <stdio.h>
#include "../Mutex.h"
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>

class Observable;

class Observer : public boost::enable_shared_from_this<Observer>
{
 public:
  virtual ~Observer();
  virtual void update() = 0;

  void observe(Observable* s);

 protected:
  Observable* subject_;
};

class Observable
{
 public:
  void register_(boost::weak_ptr<Observer> x);
  // void unregister(boost::weak_ptr<Observer> x);

  void notifyObservers()
  {
    muduo::MutexLockGuard lock(mutex_);
    Iterator it = observers_.begin();
    while (it != observers_.end())
    {
      boost::shared_ptr<Observer> obj(it->lock());
      if (obj)
      {
        obj->update();
        ++it;
      }
      else
      {
        printf("notifyObservers() erase\n");
        it = observers_.erase(it);
      }
    }
  }

 private:
  mutable muduo::MutexLock mutex_;
  std::vector<boost::weak_ptr<Observer> > observers_;
  typedef std::vector<boost::weak_ptr<Observer> >::iterator Iterator;
};

Observer::~Observer()
{
  // subject_->unregister(this);
}

void Observer::observe(Observable* s)
{
  s->register_(shared_from_this());
  subject_ = s;
}

void Observable::register_(boost::weak_ptr<Observer> x)
{
  observers_.push_back(x);
}

//void Observable::unregister(boost::weak_ptr<Observer> x)
//{
//  Iterator it = std::find(observers_.begin(), observers_.end(), x);
//  observers_.erase(it);
//}

// ---------------------

class Foo : public Observer
{
  virtual void update()
  {
    printf("Foo::update() %p\n", this);
  }
};

int main()
{
  Observable subject;
  {
    boost::shared_ptr<Foo> p(new Foo);
    p->observe(&subject);
    subject.notifyObservers();
  }
  subject.notifyObservers();
}

多线程中同时访问同一个 shared_ptr,正确的做法加锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
MutexLock mutex;
shared_ptr<Foo> globalPtr;

void doit(const shared_ptr<Foo>& pFoo);

void read()
{
shared_ptr<Foo> ptr;
  {
    MutexLock lock(mutex);
    ptr = globalPtr;  // read globalPtr
  }
  // use ptr since here
  doit(ptr);
}
 
 //写入的时候也要加锁:
void write()
{
  shared_ptr<Foo> newptr(new Foo);
  {
    MutexLock lock(mutex);
    globalPtr = newptr;  // write to globalPtr
  } 
  // use newptr since here
  doit(newptr);
}

对象池

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#include <map>

#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>

#include "../Mutex.h"

#include <assert.h>
#include <stdio.h>

using std::string;

class Stock : boost::noncopyable
{
 public:
  Stock(const string& name)
    : name_(name)
  {
    printf(" Stock[%p] %s\n", this, name_.c_str());
  }

  ~Stock()
  {
    printf("~Stock[%p] %s\n", this, name_.c_str());
  }

  const string& key() const { return name_; }

 private:
  string name_;
};

namespace version1
{

// questionable code
class StockFactory : boost::noncopyable
{
 public:

  boost::shared_ptr<Stock> get(const string& key)
  {
    muduo::MutexLockGuard lock(mutex_);
    boost::shared_ptr<Stock>& pStock = stocks_[key];
    if (!pStock)
    {
      pStock.reset(new Stock(key));
    }
    return pStock;
  }


 private:
  mutable muduo::MutexLock mutex_;
  std::map<string, boost::shared_ptr<Stock> > stocks_;
};

}

namespace version2
{

class StockFactory : boost::noncopyable
{
 public:
  boost::shared_ptr<Stock> get(const string& key)
  {
    boost::shared_ptr<Stock> pStock;
    muduo::MutexLockGuard lock(mutex_);
    boost::weak_ptr<Stock>& wkStock = stocks_[key];
    pStock = wkStock.lock();
    if (!pStock)
    {
      pStock.reset(new Stock(key));
      wkStock = pStock;
    }
    return pStock;
  }

 private:
  mutable muduo::MutexLock mutex_;
  std::map<string, boost::weak_ptr<Stock> > stocks_;
};

}

namespace version3
{

class StockFactory : boost::noncopyable
{
 public:

  boost::shared_ptr<Stock> get(const string& key)
  {
    boost::shared_ptr<Stock> pStock;
    muduo::MutexLockGuard lock(mutex_);
    boost::weak_ptr<Stock>& wkStock = stocks_[key];
    pStock = wkStock.lock();
    if (!pStock)
    {
      pStock.reset(new Stock(key),
                   boost::bind(&StockFactory::deleteStock, this, _1));
      wkStock = pStock;
    }
    return pStock;
  }

 private:

  void deleteStock(Stock* stock)
  {
    printf("deleteStock[%p]\n", stock);
    if (stock)
    {
      muduo::MutexLockGuard lock(mutex_);
      stocks_.erase(stock->key());  // This is wrong, see removeStock below for correct implementation.
    }
    delete stock;  // sorry, I lied
  }
  mutable muduo::MutexLock mutex_;
  std::map<string, boost::weak_ptr<Stock> > stocks_;
};

}

namespace version4
{

class StockFactory : public boost::enable_shared_from_this<StockFactory>,
                     boost::noncopyable
{
 public:

  boost::shared_ptr<Stock> get(const string& key)
  {
    boost::shared_ptr<Stock> pStock;
    muduo::MutexLockGuard lock(mutex_);
    boost::weak_ptr<Stock>& wkStock = stocks_[key];
    pStock = wkStock.lock();
    if (!pStock)
    {
      pStock.reset(new Stock(key),
                   boost::bind(&StockFactory::deleteStock,
                               shared_from_this(),  // shared_ptr<StockFactory>
                               _1));
      wkStock = pStock;
    }
    return pStock;
  }

 private:

  void deleteStock(Stock* stock)
  {
    printf("deleteStock[%p]\n", stock);
    if (stock)
    {
      muduo::MutexLockGuard lock(mutex_);
      stocks_.erase(stock->key());  // This is wrong, see removeStock below for correct implementation.
    }
    delete stock;  // sorry, I lied
  }
  mutable muduo::MutexLock mutex_;
  std::map<string, boost::weak_ptr<Stock> > stocks_;
};

}

class StockFactory : public boost::enable_shared_from_this<StockFactory>,
                     boost::noncopyable
{
 public:
  boost::shared_ptr<Stock> get(const string& key)
  {
    boost::shared_ptr<Stock> pStock;
    muduo::MutexLockGuard lock(mutex_);
    boost::weak_ptr<Stock>& wkStock = stocks_[key];
    pStock = wkStock.lock();
    if (!pStock)
    {
      pStock.reset(new Stock(key),
                   boost::bind(&StockFactory::weakDeleteCallback,
                               boost::weak_ptr<StockFactory>(shared_from_this()),
                               _1)); //必须显示将 shared_from_this() 转换为 weak_ptr 类型,bind 拷贝的是实参类型,不是形参类型,只负责拷贝实参本身(无论其类型是否与函数形参匹配),因此实参必须是可拷贝的(或可移动的)
      wkStock = pStock;
    }
    return pStock;
  }

 private:
  static void weakDeleteCallback(const boost::weak_ptr<StockFactory>& wkFactory,
                                 Stock* stock)
  {
    printf("weakDeleteStock[%p]\n", stock);
    boost::shared_ptr<StockFactory> factory(wkFactory.lock());
    if (factory)
    {
      factory->removeStock(stock);
    }
    else
    {
      printf("factory died.\n");
    }
    delete stock;  // sorry, I lied
  }

  void removeStock(Stock* stock)
  {
    if (stock)
    {
      muduo::MutexLockGuard lock(mutex_);
      auto it = stocks_.find(stock->key());
      if (it != stocks_.end() && it->second.expired())
      {
        stocks_.erase(stock->key());
      }
    }
  }

 private:
  mutable muduo::MutexLock mutex_;
  std::map<string, boost::weak_ptr<Stock> > stocks_;
};

void testLongLifeFactory()
{
  boost::shared_ptr<StockFactory> factory(new StockFactory);
  {
    boost::shared_ptr<Stock> stock = factory->get("NYSE:IBM");
    boost::shared_ptr<Stock> stock2 = factory->get("NYSE:IBM");
    assert(stock == stock2);
    // stock destructs here
  }
  // factory destructs here
}

void testShortLifeFactory()
{
  boost::shared_ptr<Stock> stock;
  {
    boost::shared_ptr<StockFactory> factory(new StockFactory);
    stock = factory->get("NYSE:IBM");
    boost::shared_ptr<Stock> stock2 = factory->get("NYSE:IBM");
    assert(stock == stock2);
    // factory destructs here
  }
  // stock destructs here
}

int main()
{
  version1::StockFactory sf1;
  version2::StockFactory sf2;
  version3::StockFactory sf3;
  boost::shared_ptr<version3::StockFactory> sf4(new version3::StockFactory);
  boost::shared_ptr<StockFactory> sf5(new StockFactory);

  {
  boost::shared_ptr<Stock> s1 = sf1.get("stock1");
  }

  {
  boost::shared_ptr<Stock> s2 = sf2.get("stock2");
  }

  {
  boost::shared_ptr<Stock> s3 = sf3.get("stock3");
  }

  {
  boost::shared_ptr<Stock> s4 = sf4->get("stock4");
  }

  {
  boost::shared_ptr<Stock> s5 = sf5->get("stock5");
  }

  testLongLifeFactory();
  testShortLifeFactory();
}

enable_shared_from_this 以派生类为模板类型实参的基类模板,继承它,this 指针变为 shared_ptr类型

shared_from_this() 不能在构造函数中调用,因为类型的构造函数中还没有交给 shared_ptr 管理生命周期

把 shared_ptr 邦(std::bind)到 boost::function 里,那么回调的时候 StockFactory 对象始终存在,是安全的,延长了对象的生命周期,使得不短于邦的 boost::function 对象。

使用 weak_ptr 邦到 boost::function 这样,对象的生命周期不会延长,然后再回调的时候尝试提升为 shared_ptr,如果提升成功,说明接受回调的对象还健在,那么执行回调,如果失败,则不处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// excerpts from http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (giantchen at gmail dot com)

#ifndef MUDUO_BASE_WEAKCALLBACK_H
#define MUDUO_BASE_WEAKCALLBACK_H

#include <functional>
#include <memory>

namespace muduo
{

// A barely usable WeakCallback

template<typename CLASS, typename... ARGS>
class WeakCallback
{
 public:

  WeakCallback(const std::weak_ptr<CLASS>& object,
               const std::function<void (CLASS*, ARGS...)>& function)
    : object_(object), function_(function)
  {
  }

  // Default dtor, copy ctor and assignment are okay

  void operator()(ARGS&&... args) const
  {
    std::shared_ptr<CLASS> ptr(object_.lock());
    if (ptr)
    {
      function_(ptr.get(), std::forward<ARGS>(args)...);
    }
  }

 private:

  std::weak_ptr<CLASS> object_;
  std::function<void (CLASS*, ARGS...)> function_;
};

template<typename CLASS, typename... ARGS>
WeakCallback<CLASS, ARGS...> makeWeakCallback(const std::shared_ptr<CLASS>& object,
                                              void (CLASS::*function)(ARGS...))
{
  return WeakCallback<CLASS, ARGS...>(object, function);
}

template<typename CLASS, typename... ARGS>
WeakCallback<CLASS, ARGS...> makeWeakCallback(const std::shared_ptr<CLASS>& object,
                                              void (CLASS::*function)(ARGS...) const)
{
  return WeakCallback<CLASS, ARGS...>(object, function);
}

}

#endif

在 C++ 中替换 Observer 可以用 Signal/Slots 方式,指完全靠标准库实现的 thread safe、race condition free、thread contention free 的 Signal/Slots

特性核心要求实现技术(C++ 标准库)
Thread Safe多线程并发并发调用「绑定槽函数」「触发信号」「解绑绑槽函数」时,无内存访问错误(如野指针、数据越界)。1. std::atomic:原子管理槽函数列表的头指针及节点状态(is_valid);
2. 无锁单向链表:节点通过原子指针(std::atomic<Node*>)链接,避免指针访问冲突。
Race Condition Free多线程操作共享数据(如槽函数列表)时,通过原子操作或无锁设计,确保操作结果唯一且可预测(无“读-写”“写-写”冲突)。1. CAS 操作(std::atomic::compare_exchange_weak):原子替换头节点,避免并发修改冲突;
2. 快照遍历:触发信号时加载头节点快照,避免遍历中受其他线程更新影响。
Thread Contention Free线程操作时无需等待其他线程释放锁(无阻塞),即所有共享操作通过原子指令完成,无锁竞争开销。1. 完全无锁设计:摒弃 std::mutex 等锁机制,所有共享操作依赖用户态原子指令;
2. 轻量级重试:CAS 操作失败时仅重试,无内核态线程切换开销。

一对多回调:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#ifndef MUDUO_BASE_SIGNALSLOTTRIVIAL_H
#define MUDUO_BASE_SIGNALSLOTTRIVIAL_H

#include <memory>
#include <vector>

template<typename Signature>
class SignalTrivial;

template <typename RET, typename... ARGS>
class SignalTrivial<RET(ARGS...)>
{
 public:
  typedef std::function<void (ARGS...)> Functor;

  void connect(Functor&& func)
  {
    functors_.push_back(std::forward<Functor>(func));
  }

  void call(ARGS&&... args)
  {
    // gcc 4.6 supports
    //for (const Functor& f: functors_)
    typename std::vector<Functor>::iterator it = functors_.begin();
    for (; it != functors_.end(); ++it)
    {
      (*it)(args...);
    }
  }

 private:
  std::vector<Functor> functors_;
};

#endif // MUDUO_BASE_SIGNALSLOTTRIVIAL_H

thread safe 实现:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#ifndef MUDUO_BASE_SIGNALSLOT_H
#define MUDUO_BASE_SIGNALSLOT_H

#include "Mutex.h"

#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>

#include <vector>

namespace muduo
{

namespace detail
{

template<typename Callback>
struct SlotImpl;

template<typename Callback>
struct SignalImpl : boost::noncopyable
{
  typedef std::vector<boost::weak_ptr<SlotImpl<Callback> > > SlotList;

  SignalImpl()
    : slots_(new SlotList)
  {
  }

  void copyOnWrite()
  {
    mutex_.assertLocked();
    if (!slots_.unique())
    {
      slots_.reset(new SlotList(*slots_));
    }
    assert(slots_.unique());
  }

  void clean()
  {
    MutexLockGuard lock(mutex_);
    copyOnWrite();
    SlotList& list(*slots_);
    typename SlotList::iterator it(list.begin());
    while (it != list.end())
    {
      if (it->expired())
      {
        it = list.erase(it);
      }
      else
      {
        ++it;
      }
    }
  }

  MutexLock mutex_;
  boost::shared_ptr<SlotList> slots_;
};

template<typename Callback>
struct SlotImpl : boost::noncopyable
{
  typedef SignalImpl<Callback> Data;
  SlotImpl(const boost::shared_ptr<Data>& data, Callback&& cb)
    : data_(data), cb_(cb), tie_(), tied_(false)
  {
  }

  SlotImpl(const boost::shared_ptr<Data>& data, Callback&& cb,
           const boost::shared_ptr<void>& tie)
    : data_(data), cb_(cb), tie_(tie), tied_(true)
  {
  }

  ~SlotImpl()
  {
    boost::shared_ptr<Data> data(data_.lock());
    if (data)
    {
      data->clean();
    }
  }

  boost::weak_ptr<Data> data_;
  Callback cb_;
  boost::weak_ptr<void> tie_;
  bool tied_;
};

}

/// This is the handle for a slot
///
/// The slot will remain connected to the signal fot the life time of the
/// returned Slot object (and its copies).
typedef boost::shared_ptr<void> Slot;

template<typename Signature>
class Signal;

template <typename RET, typename... ARGS>
class Signal<RET(ARGS...)> : boost::noncopyable
{
 public:
  typedef std::function<void (ARGS...)> Callback;
  typedef detail::SignalImpl<Callback> SignalImpl;
  typedef detail::SlotImpl<Callback> SlotImpl;

  Signal()
    : impl_(new SignalImpl)
  {
  }

  ~Signal()
  {
  }

  Slot connect(Callback&& func)
  {
    boost::shared_ptr<SlotImpl> slotImpl(
        new SlotImpl(impl_, std::forward<Callback>(func)));
    add(slotImpl);
    return slotImpl;
  }

  Slot connect(Callback&& func, const boost::shared_ptr<void>& tie)
  {
    boost::shared_ptr<SlotImpl> slotImpl(new SlotImpl(impl_, func, tie));
    add(slotImpl);
    return slotImpl;
  }

  void call(ARGS&&... args)
  {
    SignalImpl& impl(*impl_);
    boost::shared_ptr<typename SignalImpl::SlotList> slots;
    {
      MutexLockGuard lock(impl.mutex_);
      slots = impl.slots_;
    }
    typename SignalImpl::SlotList& s(*slots);
    for (typename SignalImpl::SlotList::const_iterator it = s.begin(); it != s.end(); ++it)
    {
      boost::shared_ptr<SlotImpl> slotImpl = it->lock();
      if (slotImpl)
      {
        boost::shared_ptr<void> guard;
        if (slotImpl->tied_)
        {
          guard = slotImpl->tie_.lock();
          if (guard)
          {
            slotImpl->cb_(args...);
          }
        }
        else
        {
          slotImpl->cb_(args...);
        }
      }
    }
  }

 private:

  void add(const boost::shared_ptr<SlotImpl>& slot)
  {
    SignalImpl& impl(*impl_);
    {
      MutexLockGuard lock(impl.mutex_);
      impl.copyOnWrite();
      impl.slots_->push_back(slot);
    }
  }

  const boost::shared_ptr<SignalImpl> impl_;
};

}

#endif // MUDUO_BASE_SIGNALSLOT_H

Reference

http://en.wikipedia.org/wiki/Abstraction_layer

http://www.cs.utexas.edu/~EWD/transcriptions/EWD03xx/EWD340html

http://blog.csdn.net/myan/article/details/1906

http://blog.csdn.net/myan/article/details/1482614

http://blog.csdn.net/myan/article/details/3144661

http://trac.nginx.org/nginx/ticket/134

http://trac.nginx.org/nginx/ticket/135

http://trac.nginx.org/nginx/ticket/162

https://www.boost.org/doc/libs/latest/libs/smart_ptr/shared_ptr.htm#ThreadSafety

http://blog.csdn.net/Solstice/article/details/8547547

http://www.artima.com/cppsource/top_cpp_aha_moments.html

http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern

http://blog.csdn.net/Solstice/article/details/5238671

http://files.cppblog.com/Solstice/dtor_meets_mt.pdf

https://blog.csdn.net/Solstice/article/details/5950190

http://en.wikipedia.org/wiki/Reinventing_the_wheel

https://blog.csdn.net/ilvu999/article/details/8095009?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-8095009-blog-5950190.235^v43^pc_blog_bottom_relevance_base1&spm=1001.2101.3001.4242.1&utm_relevant_index=3

《Java Concurrency in Practice》 《操作系统设计与实现》《现代操作系统》《操作系统概念》任选一本 《C++ 沉思录》 muduo、leveldb、redis、libco 算法的学习,推荐《算法导论》,《剑指offer》 leetcode与牛客网 linux网络编程相关,推荐《UNIX网络编程》 对于C++,推荐的书是《C++ Primer》 《异类》 基础知识+项目+算法+个人理解=offer https://mp.weixin.qq.com/s/Tfa2bpTJbALf4lQ2o6i3bw

山重水复疑无路,柳暗花明又一村
使用 Hugo 构建
主题 StackJimmy 设计