第02章:OpenMIT-系统架构
- author: zhouyongsdzh@foxmail.com
- date: 2017-08-05
- weibo: @周永_52ML
目录
0. 写在前面
1. 参数服务器架构
2. PS系统启动过程
前面了解到,PS任务集群由1个scheduler节点、m个server节点和n个worker节点组成。这些节点中,首先启动的是scheduler节点,然后启动worker和server节点,这些节点的资源分配是由tracker中的PSTracker完成。这里我们说的PS启动过程包括以下几个核心问题:
- scheduler节点如何管理worker/server节点的?
- scheduler节点如何做心跳管理和容错的?
PS启动是从调用PS::Start()
或PS::StartAsync()
方法开始的,两个方法的区别在于前者启动是阻塞的,即启动完自身节点后会一直等待直至所有节点都启动完成;后者则不等待其它节点。对应的方法:
1 | // This function will block until every nodes are started. |
从代码可以看到,它们都是调用了Postoffice
类中的Start
方法。我们看这个方法的实现过程。 Postoffice
是一个单例类,Postoffice::Get()
生成一个单例对象。我们先看下Get
过程发生了什么?核心是Postoffice
的构造函数:
1 | Postoffice::Postoffice() { |
邮局Postoffice
构造函数比较简单,它首先创建自己的“货车”成员van_
(用于传送接收消息用),然后获取环境变量,并从环境变量中知道了集群worker个数(DMLC_NUM_WORKER
)、server个数(DMLC_NUM_SERVER
)以及当前节点的角色(DMLC_ROLE
,是”worker”或”server”或”scheduler”),最后初始化日志等级verbose_
。
在Start
方法中做了哪些事情呢? 初始化log没什么好说的,在初始化节点信息方面以worker为例:
1 | void Postoffice::Start(const char* argv0, const bool do_barrier) { |
构造函数我们知道了worker的个数,那么这里每个worker节点的序号(rank)为{0, .., num_worker-1}
。但是对于sever节点的序号来说也是从0开始的,从全局看rank=0的节点分不清属于worker还是server。这里Postoffice对全局节点进行了统一的编号,对于worker和server节点分别根据WorkerRankToID
和ServerRankToID
方法进行节点编号转化。转化过程也比较简单:
1 | static inline int WorkerRankToID(int rank) { |
我们可以看到转化后的worker ID都是奇数(从9开始),server ID都是偶数(从8开始)。那么问题来了:为什么从8开始而不是从1开始编号?1到7的ID干嘛用了? 这些ID由专门的用途,具体见下表:
ID | 用途 | ID | 用途 |
---|---|---|---|
1 | kScheduler |
3 | kScheduler + kServerGroup |
2 | kServerGroup |
5 | kScheduler + kWorkerGroup |
4 | kWorkerGroup |
6 | kServerGroup + kWorkerGroup |
- | - | 7 | kScheduler + kServerGroup + kWorkerGroup |
我们可以看到,1到7的ID用于表示scheduler节点、组节点以及跨组的组合ID了。初始化节点信息主要是在为不同的组ID对应的ID集合,成员变量std::unordered_map<int, std::vector<int>> node_ids_;
存储的就是每个ID对应的与之相关的ID集合。
注意:到这里为止,划分节点信息的归属,但是并没有将每个节点的信息(hostname/port等) 与ID“绑定”起来。怎么完成“绑定”过程?这时候需要启动“货车”ZMQVan::Start
来完成。
zmq的初始化比较简单,new一个zmq对象,并设置最大的sockets。
1 | ZMQVan::Start() override { |
“货车”Van::Start()
启动会完成以下功能:
获取scheduler信息和获取当前节点信息。主要是从环境变量中获取,包括hostname, port等,保存在
Node
对象中。之所以每个节点都要获取scheduler信息,是因为都需要与scheduler主动通信,第一次通信获取该节点对应的ID。
绑定端口,并与scheduler节点建立连接;
- 开一个线程用于接收消息。
new thread(&Van::Receiving, this)
; - 向scheduler发送消息(scheduler除外)。主动向scheduler发送消息,让scheduler知道自己(当前节点)的存在.
- 初始化消息重发功能(
resender_
)和建立(与scheduler的)心跳线程;
下面的代码与上述5个功能是对应着的:
1 | void Van::Start() { |
这里要注意的是,第22行,这时候当前节点的ID还不知道,等着后续scheduler分配,这里先赋值Node::kEmpty
;第41行~49行,表示当前节点第一次向scheduler主动通信的过程,把自身的节点信息放在msg里,发送出去。其中,消息接收者recver=kScheduler; 控制命令为添加节点control.cmd=Control::ADD_NODE;而scheduler收到控制命令为Control::ADD_NODE
时,会为收到的节点分配ID。这部分后面会详细介绍。
至此,我们知道每个节点启动时要做的事情,现在回到”邮局”Postoffice::Get()->Start
部分:
1 | void Postoffice::Start(const char* argv0, const bool do_barrier) { |
最后一步Barrier
方法是阻塞操作,直至所有的节点都完成启动。下面分析下这个方法:
1 | void Postoffice::Barrier(int node_group) { |
我们可以看到,Barrier函数里,当前节点向scheduler请求Control::BARRIER
命令,等待kscheduler直到barrier_done_
为true。这个过程用ulk
互斥量将barrier_mu_
锁住。
scheduler收到各个节点命令为Control::BARRIER
的请求时,它的处理过程如下(src/van.cc Receiving方法
):
1 | // Receiving其它功能 ... |
barrier_count_
是一个size为8的数组,存储每个group对应的node数量,scheduler每收到一个Control::BARRIER
请求,都会在相应的group下标上累加++barrier_count_[group]
。
当收到的请求为group的数量等于全局初始化时每个group对应的节点数相同时(即所有的group相关的节点Control::BARRIER
请求均已收到):barrier_count_[group]=static_cast<int>(Postoffice::Get()->GetNodeIDs(group).size())
,这时scheduler将barrier_count_
归0,并向所有的请求节点发送Control::BARRIER
命令收到信号CHECK_GT(Send(res), 0)
。所谓的Barrier阻塞直至所有节点完成启动,核心在于所有计算节点都向scheduler发送BARRIER
信号,由scheduler统一控制节奏。
对于当前节点来说,收到scheduler发来的Control::BARRIER
命令后(对应的request=false
),就知道其它所有节点也完成启动了,此时会执行自己的Manage操作(调用Postoffice::Get()->Manage(msg)
)。我们看Manage的处理逻辑:
1 | void Postoffice::Manage(const Message& recv) { |
Manage处理逻辑比较简单,就是对变量barrier_done
置为true,唤醒其它等待该变量(对象锁)的线程,在启动阶段对应Postoffice::Barrier
里面的barrier_cond_.wait(ulk, [this] {return barrier_done_; })
。这一步对于所有的非scheduler节点处理逻辑都一样。
到这里为止,PS整体初始化过程就已经完成了,这里简要概括一下主要工作过程:
- 首先,节点信息初始化,包括全局的组ID、节点ID等;
- 其次,节点绑定全局ID,scheduler统一分配ID,
Control::ADD_NODE
信号; - 最后,阻塞过程,由scheduler控制节奏,
Control::BARRIER
信号;
3. PS任务启动过程
PS机器学习任务计算过程需要scheduler、worker、server三方节点参与,这三方的计算任务均要启动。我们先看scheduler节点的启动过程。
3.1. Scheduler节点启动过程
scheduler启动过程比较简单,首先创建一个SimpleApp
对象,用于App层面的请求响应处理。具体的请求响应处理逻辑由对应的计算任务决定,与SimpleApp
无关,开发者只需要按照约定接口实现自己的请求响应逻辑,然后注册给SimpleApp
即可。
请求响应约定接口:using Handle = std::function<void(const SimpleData& recved, SimpleApp* app)>;
假如开发者实现的请求响应逻辑如下(仅用于示例,与具体项目无关):
1 | void Scheduler::Handle(const ps::SimpleData & recved, ps::SimpleApp * app) { |
创建一个SimpleApp
对象,并将自定义的请求响应逻辑注册到SimpleApp
中,示例如下:
1 | scheduler_app_.reset(new ps::SimpleApp(0)); |
上述代码可以放在Scheduler任务的构造函数中,在启动Scheduler任务时,只需要创建一下Scheduler任务即可。如何保证Scheduler任务在整个计算过程常驻并能在任务结束时退出?这是可以借助互斥量和条件变量来控制。
我们重点关注创建一个SimpleApp
对象的过程中,需要做哪些事情?
首先调用SimpleApp
一个默认的构造函数,用于初始化请求和响应变量的默认执行逻辑。
1 | inline SimpleApp() : obj_(nullptr) { |
obj_
变量是SimpleApp
的成员变量,需要用“顾客”类-Customer
-来创建,创建时需要绑定SimpleApp
整体的处理逻辑。
1 | inline SimpleApp::SimpleApp(int app_id) : SimpleApp() { |
其中SimpleApp::Process
逻辑是固定的,仅需要根据msg.meta.request
字段判断是请求还是响应,然后执行开发者自定义的处理逻辑(如果开发者没有注册自定义处理逻辑,会调用默认逻辑)。
1 | inline void SimpleApp::Process(const Message& msg) { |
此外,SimpleApp
中还提供了Request
和Response
接口,供开发者用于节点之间的消息请求和响应反馈。接口格式为:
1 | inline int SimpleApp::Request(int req_head, const std::string& req_body, int recv_id) { |
下面我们重点分析创建并绑定了SimpleApp::Process
的顾客Customer
对象obj_
干什么用的?
我们先看obj_
在SimpleApp
类中都做了什么?
int ts = obj_->NewRequest(recv_id);
:为新请求生成一个时间戳;msg.meta.customer_id = obj_->id();
:在Request/Response中填充customer_id
信息;obj_->WaitRequest(timestamp);
:同步请求操作,以ts为key;
一句话概括Customer
类的功能:针对每一次请求/响应进行控制, 包括WaitRequest,NewRequest,NumResponse,Receiving等。Receiving方法的功能是等待取出消息队列recv_queue_
中的消息供Customer
中的recv_handle_
“消费”,而recv_handle_
是在创建SimpleApp
或KVWorker
或KVServer
时传入Customer
构造函数中的。
尤其要说明的是:消息队列recv_queue_
中的消息是由货车Van
通过调用Customer::Accept
函数“放到”队列中的。
1 | void Van::Receiving() { |
至此,我们搞清了scheduler任务节点的工作。这里在梳理下思路:
- step1: 创建一个
SimpleApp
,过程包括用SimpleApp::Process
方法创建一个Customer
对象,Process
会调用开发者自定义的请求/响应处理逻辑; - step2: 当向其它节点发送等待请求时,
Customer::Receiving
方法会循环取出消息队列中的消息(message),该消息可能是Request类型或者Response类型,最后都会交由recv_handler_
来处理(处理逻辑由用户注册)。 - step3: 消息队列中的消息是由
Van::Receiving
收到后直接给到指定顾客手里(调用Customer::Accept
方法),由顾客调用用户自定义的Handle来进行进一步“消费”。 - step4: 消息处理完后,要向对方发送响应。该响应要么是ack消息(应答),要么是对方请求的数据消息。
简单描述消息传输过程:
$$
\text{Van::Receiving} \xrightarrow{\text{msg}} \text{Customer::Accept} \xrightarrow{\text{msg}} \text{recv_queue_.WaitAndPop} \xrightarrow{\text{msg}} \text{recv_handle_} \xrightarrow{\text{(updated) msg}} \text{Response/Send}
$$
3.2. Server节点启动过程
Server节点启动过程与Scheduler节点启动非常类似,最大的区别在于后者用SimpleApp
来创建对象,Server节点启动用的是KVServer
来创建对象。从KVServer
的实现可以看到,KVServer
继承了SimpleApp
,在子类中基础上添加Server节点的逻辑,主要是与Worker节点信息交互方面的处理逻辑(pull/push
操作)。
1 | template <typename Val> |
我们可以看到,KVServer
与SimpleApp
很像,构造函数中都会创建一个Customer
类,用于处理每一次的pull
/push
请求。Server<Val>::Process
方法中的request_handle_
仍由用户自定义并注册给KVServer
。
在
SimpleApp
中,Customer
用于处理request
/response
请求。
KVServer
中的Customer
对象工作过程在Scheduler节点启动过程中已经详细介绍,这里不在赘述。
值得注意的是,KVServer
在整个任务中消息处理主要分两种:
simple_app
类型消息,多是控制命令消息,用于系统中节点之间的协同工作,对应Request/Response
操作;- 非
simple_app
类型消息,多是Server与Worker之间的数据通信,数据封装在KVPairs
中,对应Pull/Push
操作;
3.3. Worker节点启动过程
Worker节点的启动与Server的区别在于前者是调用KVWorker
来完成初始化。KVWorker
相比SimpleApp
和KVServer
,主要多了两个关键操作:Pull
和Push
.
1 | template<typename Val> |
总结:scheduler、server、worker任务启动
序号 | 角色 | 初始化对象 | 任务注册 | 备注 |
---|---|---|---|---|
1 | scheduler | SimpleApp |
Request/Response |
|
2 | server | KVServer |
Request/Response 和ReqHandle |
ReqHandle 用于Push/Pull处理 |
3 | worrker | KVWorker |
Request/Response |
KVWorker 提供了Push/Pull操作 |
Worker节点启动过程与Scheduler、Server一样。唯一的区别在于,Worker启动时不需要自定义请求处理逻辑。对于PS机器学习任务来讲,Worker端的行为都是主动行为,比如数据通信则主动向Server端执行Pull/Push请求,(控制)消息传输向Server端或Scheduler执行Request/Response请求。
至此,我们把PS机器学习任务中的节点启动过程以及内部的消息传递过程介绍完毕。下面我们重点关注任务启动之后的计算过程,节点之间是如何“对话”的?
4. PS节点通信与消息处理
PS机器学习任务启动之后,计算过程中节点之间是如何协同工作的?协同工作主要依靠通信来完成。不同节点类型承载着不同的任务,分布式系统和计算逻辑要把相应的消息发送给相应的节点去执行。
下面我们分别看Worker、Server、Scheduler两两之间的具体通信过程。
4.1. Worker与Server通信
4.2. Worker与Scheduler通信
4.3. Server与Scheduler通信
5. PS系统结束过程
上面我们详细介绍了参数服务器系统启动、节点任务启动以及计算过程中如何通信和处理消息的。当PS完成整个计算过程时,每个节点是如何结束并退出的呢?
我们知道在节点任务启动时,里面用ps::RegisterExitCallback()
方法来注册任务退出时用户自定义的方法(退出时的回调函数)。各个节点的退出回调函数注册如下(示例):
1 | ps::RegisterExitCallback([scheduler]() { delete scheduler; }); |
在调用回调函数之前,PS还做了一些事情,我们还是从PS::Finalize()
开始。PS::Finalize()
调用了Postoffice::Get()->Finalize(do_barrier)
,参数do_barrier
指的是是否需要等待直至所有节点都结束(阻塞过程)。
1 | void Postoffice::Finalize(const bool do_barrier) { |
如果do_barrier
为true,执行阻塞函数Barrier
,等待全部节点完成计算,然后进入终止“货车”的功能。参数的含义是覆盖全部计算节点。该过程与PS::Start
中的逻辑是一样的。
终止“货车”的过程如下:
1 | // ZMQVan::Stop |
由于van默认由zmq
初始化的,它首先调用父类的Stop方法,结束任务层面对象,然后结束自己的通信层面对象。父类Stop方法首先向自己发送命令为Control::TERMINATE
的消息,目的是从Van::Receiving
消息接收函数中的while(true)
跳出来。然后释放接收线程,如果当前节点是scheduler节点的话,还要释放心跳线程heartbeat_thread_
。Van中还有一个resender
对象(消息重发和监控告警用),此时也要释放掉。
Postoffice::Finalize
方法的最后一步是执行用户自定义的退出回调函数exit_callback_()
。
至此,PS系统的结束过程介绍完了。
6. PS心跳管理与系统容错
7. PS系统与Yarn环境交互
节点角色 | rank | 全局ID |
---|---|---|
worker | 0 ~ num_worker_ |
从9开始的奇数 |
server | 0 ~ num_server_ |
从8开始的偶数 |