GateServer
在该栏目中,我将会对于我的项目中的服务器进行一次review,在review中去重新回顾我的服务器架构来巩固我对于一个服务器设计上的理解,话不多说,直接开始。
项目流程
入口
在该文中,进行对应的网关服务器GateServer
的分析。在该服务器中,首先会在一个main入口处启动一个io_context
上下文,该上下文是我们对应的主监听线程所使用的上下文。我们使用该上下文来初始化我们的监听类CServer
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| auto& gCfgMgr=ConfigMgr::Inst() ; std::string gate_port_str = gCfgMgr["GateServer"]["Port"]; unsigned short gate_port = atoi(gate_port_str.c_str());
unsigned short port = static_cast<unsigned short>(8080); net::io_context ioc{ 1 }; boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); signals.async_wait([&ioc](const boost::system::error_code& error, int signal_number) { if (error) { return; } ioc.stop(); }); std::make_shared<CServer>(ioc, port)->Start(); std::cout << "GateServer listen the port " << port << std::endl; ioc.run();
|
在初始化好之后,程序启动我们的CServer的函数,该函数保证了单一职责原则,只负责了整个服务器的连接监听工作,在客户端连接到来时,其会创建一个HttpConnection
对象,该对象是我们的网关服务器中负责一个连接处理的一个基本单位,起到一个类似于Session
的作用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| void CServer::Start() { auto self = shared_from_this(); auto& io_context = AsioIOServicePool::GetInstance()->GetIOService();
std::shared_ptr<HttpConnection> new_con = std::make_shared<HttpConnection>(io_context);
_acceptor.async_accept(new_con->GetSocket(), [self, new_con](beast::error_code ec) { try { if (ec) { std::cout << "Accept error: " << ec.message() << std::endl; return; } new_con->Start();
self->Start();
}catch (std::exception& exp) { std::cerr << "Exception: " << exp.what() << std::endl; } } ); }
|
在该函数中,我们创建了对应的连接请求到达时的回调函数,类会创建一个新的HttpConnection对象,并通过对其属性的一些设置之后启动Start函数来进行我们本次连接的处理。提前注意,我们的该函数是一个异步的行为,利用了一下对应的底层事件异步机制来进行我们的单线程的性能的压榨。接下来我们进入对应的HttpConnection
处理逻辑。
HttpConnection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void HttpConnection::Start() { auto self = shared_from_this(); http::async_read(_socket, _buffer, _request, [self](beast::error_code ec,std::size_t bytes_transferred) { try { if (ec) { std::cout << "Http read error is " << ec.what() << std::endl; return; } boost::ignore_unused(bytes_transferred); self->CheckDeadline(); self->HandleReq(); } catch (std::exception& ec) { std::cout << "exception is " << ec.what() << std::endl; } }); }
|
在该方法中,我们使用beast封装好的功能进行对应的事件处理。首先还是存在着异步的机制,不阻塞当前的逻辑。这点是我们之前的能够实现迅速的恢复监听的基础。当然,这里可能会存在一种严重的线程饥饿问题,可以自己分析一下。
我们来回顾一下在beast
库中需要注意的几个点。由于我们使用的这个库是一个面向HTTP协议封装好的框架,所以我们使用了对应的async_read函数来进行我们的协议头的读取,同时我们保证了一个缓冲区能够缓存住我们一个来自客户端的请求包,这是在外面的客户端设计决定的,我们的客户端设计中发送的HTTP始终不会太大,至少现在来说不会超过我们设计的缓冲区大小。因此,我们直接简单的使用
http::async_read(_socket, _buffer, _request, [self](beast::error_code ec,std::size_t bytes_transferred{...});
来辅助我们实现对于HTTP传输过来的数据的处理。略过对应的处理模式,我们接下来看我们的逻辑分发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| void HttpConnection::HandleReq() { try { _response.version(_request.version()); _response.keep_alive(false); if (_request.method() == http::verb::get) { PreParseGetParam(); bool success = LoginSystem::GetInstance()->HandleGet(_get_url, shared_from_this()); if (!success) { _response.result(http::status::not_found); _response.set(http::field::content_type, "type/plain"); beast::ostream(_response.body()) << "url not found\r\n"; WriteResponse(); return; } _response.result(http::status::ok); _response.set(http::field::server, "GateServer"); WriteResponse(); return; } if (_request.method() == http::verb::post) { bool success = LoginSystem::GetInstance()->HandlePost(_request.target(), shared_from_this()); std::cout << "[DEBUG] HandlePost success: " << std::boolalpha << success << std::endl; if (!success) { _response.result(http::status::not_found); _response.set(http::field::content_type, "type/plain"); beast::ostream(_response.body()) << "url not found\r\n"; std::cout << "[HttpConnection] response" << endl; WriteResponse(); return; } std::cout << "[HttpConnection] response" << endl; _response.result(http::status::ok); _response.set(http::field::server, "GateServer"); WriteResponse(); return; } } catch (const std::exception& e) { std::cerr << "Exception caught in HandleReq: " << e.what() << std::endl; _response.result(http::status::internal_server_error); _response.set(http::field::content_type, "text/plain"); beast::ostream(_response.body()) << "internal server error\r\n"; WriteResponse(); } }
|
在解析完对应的头部数据之后,我们开始进行我们的服务器逻辑处理。简单来看,我们会根据我们解析出来的HTTP头部中的方法类型来进行我们的逻辑分发。对应的其实是通过我们更底层的字模块LoginSystem
中编写的事件驱动逻辑来进行处理的。在这里,我们传递了对应的URL资源符和我们本次会话的对象本身过去交予处理。在这里,其实可以进行优化,在后续的服务器中,其实存在着一种更加优雅的模块调用逻辑,遇到再说吧。接下来我们来看对应的模块调用逻辑。
1 2 3 4 5 6 7 8 9
| bool LoginSystem::HandlePost(std::string path, std::shared_ptr<HttpConnection> con) { if (_post_handlers.find(path) == _post_handlers.end()) { return false; } std::cout << "[LoginSystem] 成功处理请求" << std::endl; _post_handlers[path](con); return true; }
|
我们在对应的HttpConnection
调用的函数逻辑其实只是一层代理,用于更加底层的一层逻辑的调用。
LoginSystem
接下来我们对于我们的LoginSystem
模块进行分析
该类我们使用单例来进行设计,事实上,很多设计网络功能的模块的设计都是使用的单例。目的是实现重用,避免重复创建的开销。在该类中,我们使用一种类似的事件注册和事件驱动的机制来提供我们的功能。
1 2 3
| typedef std::function<void(std::shared_ptr<HttpConnection>)>HttpHandler; std::map<std::string, HttpHandler>_post_handlers; std::map<std::string, HttpHandler> _get_handlers;
|
这是我们的事件容器,使用一个接口来进行注册的事件的统一,由于我们的键值对中的值根本上是一个可调用对象,所以我们可以通过键来取出对应的值并提供对应的参数来实现我们的功能以此来实现我们的事件驱动。
接下来我们简单来分析一个我们注册的事件来了解我们本类的实际功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| void LoginSystem::RegPost(std::string url, HttpHandler handler) { _post_handlers.insert(make_pair(url, handler)); } #----------------------------------------------------------------------- RegPost("/reset_pwd", [](std::shared_ptr<HttpConnection> connection) { auto body_str = boost::beast::buffers_to_string(connection->_request.body().data()); std::cout << "receive body is " << body_str << std::endl; connection->_response.set(http::field::content_type, "text/json"); Json::Value root; Json::Reader reader; Json::Value src_root; bool parse_success = reader.parse(body_str, src_root); if (!parse_success) { std::cout << "Failed to parse JSON data!" << std::endl; root["error"] = ErrorCodes::Error_Json; std::string jsonstr = root.toStyledString(); beast::ostream(connection->_response.body()) << jsonstr; return true; }
auto email = src_root["email"].asString(); auto name = src_root["user"].asString(); auto pwd = src_root["passwd"].asString();
std::string varify_code; bool b_get_varify = RedisMgr::GetInstance()->Get(CODEPREFIX + src_root["email"].asString(), varify_code); if (!b_get_varify) { std::cout << " get varify code expired" << std::endl; root["error"] = ErrorCodes::VarifyExpired; std::string jsonstr = root.toStyledString(); beast::ostream(connection->_response.body()) << jsonstr; return true; } if (varify_code != src_root["varifycode"].asString()) { std::cout << " varify code error" << std::endl; root["error"] = ErrorCodes::VarifyCodeErr; std::string jsonstr = root.toStyledString(); beast::ostream(connection->_response.body()) << jsonstr; return true; } bool email_valid = MysqlMgr::GetInstance()->CheckEmail(name, email); if (!email_valid) { std::cout << " user email not match" << std::endl; root["error"] = ErrorCodes::EmailNotMatch; std::string jsonstr = root.toStyledString(); beast::ostream(connection->_response.body()) << jsonstr; return true; } bool b_up = MysqlMgr::GetInstance()->UpdatePwd(name, pwd); if (!b_up) { std::cout << " update pwd failed" << std::endl; root["error"] = ErrorCodes::PasswdUpFailed; std::string jsonstr = root.toStyledString(); beast::ostream(connection->_response.body()) << jsonstr; return true; } std::cout << "succeed to update password" << pwd << std::endl; root["error"] = ErrorCodes::Success; root["email"] = email; root["user"] = name; root["passwd"] = pwd; root["varifycode"] = src_root["varifycode"].asString(); std::string jsonstr = root.toStyledString(); beast::ostream(connection->_response.body()) << jsonstr; return true; });
|
首先我们需要明确一点,该可调用对象由于其的定义在被调用时其会存在着额外的俩个参数,一个是调用的该对象的连接对象,我们可以使用该链接对象来进行我们的一些操作,最直接的就是使用对应的成员,实际上,我们声明了本模块类为我们的会话连接类的友元,所以可以看到在这里对于私有成员对象的读取操作。这是很便捷的。避免一些复杂逻辑的编写。
接着我们来注重于内部的逻辑实现。由于我们前面通过代理来实现了本次需要执行的逻辑的分流,在我们的这个示例中,我们需要做的是对于密码的重置。在逻辑的一开始,我们进行了一些接下来需要使用的内容的初始化,我们取出了对应的会回答给客户端套接字的HTTP
回复结构体来构建我们的缓冲区。接下来我们的所有回答操作直接写入该结构中即可。最后再由上层自由控制发送。又起到一层职责分离的作用。
除此之外,我再补充一点,在本项目中,基本所有的LoginSystem
会处理到的数据都是以JSON
格式传输过来的。这是我们双端的规定。对应与/reset_pwd
请求,我们在客户端报文中发送了对应的邮箱,用户名和密码过来,通过使用JSON
自带的解析器进行简单的解析与使用。再然后,我们开始在这一层去陷入更深的逻辑中去。我们这里像之前的HttpConnection
陷入LoginSystem
模块一样,陷入了一系列的模块。包括但不限于RedisMgr
模块,MysqlMgr
模块,这些不再进行分析,对应的逻辑其实跟这里的陷入逻辑没有什么不同,在该层中,由于我们较为良好的设计,我们可以将深层的子模块看做为更多的子黑盒,直接调用对应的功能即可。
在一层层的子模块的陷入又返回之后,我们的该层会根据最终的逻辑来确定我们的回复报文中到底需要写入什么逻辑,然后进行对应的写入然后逐层的退出。
在最后,该LoginSystem
层将会彻底退出,我们的程序逻辑将会回退到我们的HttpConnection
层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| if (_request.method() == http::verb::post) { bool success = LoginSystem::GetInstance()->HandlePost(_request.target(), shared_from_this()); std::cout << "[DEBUG] HandlePost success: " << std::boolalpha << success << std::endl; if (!success) { _response.result(http::status::not_found); _response.set(http::field::content_type, "type/plain"); beast::ostream(_response.body()) << "url not found\r\n"; std::cout << "[HttpConnection] response" << endl; WriteResponse(); return; } std::cout << "[HttpConnection] response" << endl; _response.result(http::status::ok); _response.set(http::field::server, "GateServer"); WriteResponse(); return; }
|
例如,在陷入又返回之后,我们会逐层的逻辑推动,到最后,我们始终会执行一个 WriteResponse();
函数,该函数的功能就是调用一次我们的http::async_write
方法,将我们在逐层的调用中产生的回复报文写回到我们的通信套接字中,由于我们是HTTP连接,此时底层自动推送该报文回去给客户端。同时,由于我们采取的是一个短连接的形式,所以该HttpConnection
对象也会在不久之后被销毁,一次会话行为结束,对应的在这同步过程中积攒的更多的连接可以被处理。自此,程序实现一种闭环。通过不断处理客户端请求来实现一个服务器所应该有的功能。