Channel实现分析

Channel实现思路

本文接TinyCoro的契机,来分析一下如何使用一些相对底层的同步原语来组合成相对高级的同步原语,来分析一种端到端的实践,FSM的设计等等值得推敲的地方。

设计动机

对于一个相对底层的组件来说,我们必须需要了解我们设计其的动机,来约束住我们后续的实现,明确出外面的边界,使得后续的分析过程有一个参考,不至于过分的发散。就我个人理解上来看,我实现这个channel的基础想法是为了来封装一些并发场景的复杂性,特别是对于多生产者多消费者场景来说,我如果能够减少使用多个同步组件(互斥锁、条件变量等)所带来的心智负担。这时候实际上可以参考条件变量本身的出现动机,其本身就是对于一般的生产者消费者场景中mutex这种基础原语的实现复杂性。

没有channel的生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 用 mutex + condition_variable + queue
mutex mtx;
condition_variable cv;
std::queue<int> queue;

task<> producer() {
auto lock = co_await mtx.lock();
queue.push(42);
cv.notify_one();
}

task<> consumer() {
auto lock = co_await mtx.lock();
co_await cv.wait(mtx, [&](){ return !queue.empty(); });
auto data = queue.pop();
}

这种实现的生产者消费者存在几个问题

  • 手动管理互斥锁、条件变量、队列等并发危险资源
  • 关闭语义不清晰(什么时候生产者能够告诉消费者没有数据),其与等待语义相融合了
  • 缓冲区的大小受容器队列限制,无法清晰的端到端控制

使用channel的生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel<int, 10> ch;  // capacity=10 的缓冲 channel

task<> producer() {
co_await ch.send(42); // 自动阻塞(缓冲满)
}

task<> consumer() {
auto data = co_await ch.recv(); // 自动阻塞(缓冲空)
if (data) {
// 有数据
} else {
// channel 已关闭且为空
}
}

channel的直观意义:

  • 封装使用互斥锁、条件变量、队列等组件的复杂性
  • 相对清晰的关闭语义:在一个channel进行close操作之后,recv将会直接返回nullopt,不必用户再进行处理
  • 容量控制:由用户由其使用场景来定制channel的容量大小

Channel内部组件

对于一个Channel来说,其需要管理整个多生产者多消费者场景中的竞态,基本的,其需要至少维护一下几个内部组件。

  • 消费者队列:暂存当前由于无法获取数据而陷入阻塞态的消费者协程
  • 生产者队列:暂存当前由于无法推入数据而陷入阻塞态的生产者协程
  • 数据队列:储存当前Channel中已经被储存的数据,用于被生产者推入数据和消费者拉取数据
  • 状态flag:用于标识当前Channel所处状态,即开启/关闭

这些组件各自都会存在几个状态,由这些状态本身又会交织产生一系列的状态,从小点出发,一点点来剖析整个Channel的状态机模型。

基本状态

对于整个Channel来说,其最简单的状态就是其的flag,即是否关闭,其是一种0/1的状态,即其要么是活跃的,要么是已经被关闭了的。但是存在一个设计上的问题,在Go的Channel设计中,即使一个Channel显式的close了,但是其还是会存活一段时间直到其中的数据彻底被消耗完。

实际上类比网络中TCP的一个概念更加清晰,就是半关闭的状态。在我的理解中,对于一个Channel来说,其的关闭状态本身会被分割为多个阶段,可以将其分割为初始关闭状态,半关闭状态,关闭完全状态。类比TCP全双工的设计,所谓的初始关闭状态,只是简单的设置一个flag标识当前已经关闭然后就进入了半关闭状态。在这个半关闭状态中,Channel将不会接受新的数据的传入,但是此时仍然可以处理数据的消耗。在半关闭状态中数据队列为空时,此时将转移到最后一个状态,可以将其视为“完全关闭状态”。在这个状态中,该Channel可以被视为“死亡”,此时无论是往其中推数据,或者从其中拉数据,都是一种无效的操作。

-------------本文结束 感谢阅读-------------