Event实现分析

Event实现思路

本文借TinyCoro的Lab4a的契机,来分析一下一些底层的同步原语的一些实现细节以及其中能够学习的地方

实现目标

实现一种同步原语event,其的核心在于其的状态管理以及不同状态时其触发的行为。其的状态很简单,只考虑俩种状态,一种是未被set的状态,另外一种是已经被set了的状态。在未被set时,所有对于这个event调用wait操作时都会陷入阻塞态,如果当前的event已经被set,那么在当前尝试wait的,将不会阻塞而是直接执行下一条语句;对于那些已经wait阻塞在event中的,将会接触阻塞而进入运行态;对于那些后续尝试wait的,将不会阻塞而直接执行下一条语句。

额外需要注意的是,由于我们需要基于协程这个场景来实现,所以我们需要考虑我们后续会涉及到的线程安全实现,在这种基础组件的实现中,一般都不优先考虑线程级别的互斥锁,可以考虑一下为什么以及由此需要怎么来实现这种同步原语。

实现思路

状态建模

前文说道,event本身涉及到的状态很简单,其中就是俩个,一个是已经Set,另外一个是还未Set。而且其中只可能存在一个从未Set向已经Set的状态之间的转移,但是没有办法进行从已经Set向未Set的状态之间的转移,也就是说本次设计中考虑的状态转移路径只存在一条。除此之外,对于状态,我们需要明确其的起始状态,这种初始状态的差异也可能导致event表现出不同的行为,如下俩图,你应该能够很轻松的了解到底是为什么需要这么来做。

初始为未Set的状态转移

初始为已Set的状态转移

接下来,关于event的分析,实际上就已经结束了,接下来就需要来进行一些细节上的实现了,我们需要考虑到底如何来实现这种FSM的建模。考虑到我懒,直接来给出对应的代码,自己一点点看吧。

1
2
using awaiter_ptr = uintptr_t;
std::atomic<awaiter_ptr> m_status;

首先,我们使用一种机制来进行状态的压缩,即C++中支持的uintptr_t类型,实际上你可以将其理解为一种更加强大的类型擦除的容器,在其中的数据,外部访问将会将其视为一系列单纯的bit,我们可以对于这些bit通过那些类型解释语法来进行重解释。不过需要注意这个类型的大小,实际上其在语言中的初步定义为unsigned long int,所以在使用的时候需要保证不超过这种类型的长度。而这种长度对于常规的标准数据类型,还有特别是指针类型,其都是能够完全装入的,所以通过这个变量类型,我们就能够实现将各种状态压缩到一个变量中进行观测的效果。

接下来,我们需要来进行规定,既然我们将状态都压缩到一个无状态的变量中,我们需要来规定这个变量的不同取值情况都对应的什么状态。在后续的设计中,我们考虑按照如下的规定来进行状态的设置。

1
2
3
nullptr -> 当前未被Set并且当前没有等待的awaiter
this -> 当前已经被Set,这种this指针理论上可以由任何魔数替换,但是使用this实际上是一种更加优雅的设计
other -> 当前未被Set,但是已经存在等待的awaiter,当前的值就是该等待链表中的头结点

wait核心

对于wait操作,我们首先明确其的调用场景,在外面的期望中,其可以由任意协程来发起,且可以在同一时间中发起,由于其涉及到一些共享状态间的修改,所以其势必需要考虑并发的竞态场景。而对于其的核心逻辑,简单可以概述为以下几条

  • 当一个event还没有被set时,需要将本协程挂起并且注册到内部的等待队列中等待唤醒

  • 当一个event已经被set时,后续所有尝试进行wait的协程都不应该被该event阻塞住

而对应着几个实际的状态值,我们可以简单的考虑如下的程序流程

  • 使用CAS操作获取尝试获取当前的状态
  • 判断当前的状态,如果处于this,代表当前event已经被set,此时就不必再阻塞,直接返回即可
  • 若当前状态不属于this,代表此次操作会导致该协程的阻塞,此时就得考虑是否修改当前event的状态以及如何来挂载当前的协程

实际上实现起来很简单,现有的一个wait实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 本函数的调用场景是当一个协程尝试进行一次event.wait()时被调用
// 其的返回值决定当前的协程是否需要挂起等待
// 当event已经被set时,直接返回false,表示不需要挂起
// 当event未被set时,将当前协程的awaiter加入等待链表,并返回
// 当当前状态为非this时,其的status中的
inline auto event_base::register_awaiter(awaiter_base* awaiter) noexcept -> bool
{
auto status = m_status.load(std::memory_order_acquire);

do{
if(status == reinterpret_cast<awaiter_ptr>(this))
{
return false;
}
// 此时是当前的event没有被set,需要将当前协程的awaiter加入等待链表
// 但是实际上当前可能存在俩种情况,一种是nullptr,一种是实际的awaiter指针
// 对于这俩种场景,实际上的处理逻辑可以统一,只要构建出一个链表即可
// 我们接下来的期望是交换出当前的status,并将当前的awaiter构建进链表中
awaiter->m_next = reinterpret_cast<awaiter_base*>(status);
}while(!m_status.compare_exchange_weak(status,reinterpret_cast<awaiter_ptr>(awaiter),std::memory_order_release,std::memory_order_acquire));
return true;
}

set核心

对于set操作,其的逻辑实际上很简洁,就是将当前的状态进行修改,并将当前所有的阻塞的协程都恢复执行,其的程序流程也很简单,就是如下的考虑

  • 直接原子的修改当前的状态为this,代表当前已经被set,阻挡后续的wait操作阻塞
  • 将等待队列中的系列等待协程按照一定的次序进行释放,提供一种保障

实现起来也并不复杂,一个可能的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
inline auto event_base::set_state() noexcept -> void
{
auto status = m_status.exchange(reinterpret_cast<awaiter_ptr>(this), std::memory_order_acquire);
if (status != reinterpret_cast<awaiter_ptr>(this))
{
auto header = reinterpret_cast<awaiter_base*>(status);
resume_all_awaiters(header);
}
}

inline auto event_base::resume_all_awaiters(awaiter_base *head) noexcept -> void
{
auto current = head;
while(current)
{
auto next = current->get_next();
// 投递回原线程操作避免跨线程唤醒
current->m_ctx.submit_task(current->to_resume);
current = next;
}
}
-------------本文结束 感谢阅读-------------