粘包

粘包

​ 在本节中,我将会针对于服务器粘包已经对应的处理进行一次解析,这根本上是我针对于粘包该现象进行的自我的分析,如有错误,请告诉我,谢谢。

​ 针对于粘包该现象,其本质就是由于在客户端和服务器端的数据调度不一致导致的,对于客户端来说,由于TCP内部的对于数据包的特殊处理,其内部需要发送出去的数据包是可能会先被积压到一起,等到积攒到一定程序时才进行发送的。对应的TCP发送并不会保证数据包单独的发送。对于底层来说,无论你是要发送什么,在其看来都是一段字节流,所谓的发送,就是将积攒的字节流进行一次整体的发送。而由于该段字节流在实际上可能蕴含了多个子数据包,所以如果服务器端不对其进行对应的解析,其会导致收到的数据失去了其的对应意义。我们这里就是需要处理这一种现象。具体关于粘包出现的解析,可以自己寻找材料了解。

粘包原因

​ 对于粘包,我们需要看到其出现的本质是什么,其本质就是服务器端对于来自客户端的数据包的内部组成没有一种敏感度,或者说,对于客户端发送过来的数据包总是默认识别为一个大数据包,且会从头开始解析到尾。那么这里我们可以猜测一种解决方案,就是在我们的双端通信中去添加一种协议,规定由用户端发送的数据包上需要添加上对应的其发送的数据包的数据信息长度。对于服务器端来说,其在接收解析对应的数据包时其会先对其进行一次解析。其先解析出对应的数据包长度信息,然后根据该信息去进行对应的数据读取以及处理。而这里的这种思想,其实就是对应的TLV青春版,有兴趣可以去了解下。

​ 简单来说,我们在接下来会规定一种协议,在每个数据包的起始位置,都附上在该次发送的数据包的长度。因此,我们需要来重新设计一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MsgNode
{
friend class CSession;
public:
MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
_data = new char[_total_len+1]();
memcpy(_data, &max_len, HEAD_LENGTH);
memcpy(_data+ HEAD_LENGTH, msg, max_len);
_data[_total_len] = '\0';
}
MsgNode(short max_len):_total_len(max_len),_cur_len(0) {
_data = new char[_total_len +1]();
}
~MsgNode() {
delete[] _data;
}
void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
private:
short _cur_len;
short _total_len;
char* _data;
};

​ 如上,这里不会进行分析,我们直接来进入我们对应的服务器端在接收到一段来自底层的数据包是需要进行的处理逻辑吧。

​ 首先对于服务器的结构,我们需要对其接受对应的数据包的逻辑进行一些优化。首先可以先来添加几个独立的结构

1
2
3
4
5
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;

​ 在上述代码中,创建了一个智能指针管理对应的接受头部的节点,另一个智能指针管理对应的信息的节点,还有一个用来标识当前是否已经解析完对应的头部信息。同时需要一个对于客户端和服务器端双端都进行约束的条件,这里设置了对应的数据包的长度和数据头的长度,单位长度为1字节

1
2
#define MAX_LENGTH  1024*2
#define HEAD_LENGTH 2

​ 接下来是一段大长段代码,一点点进行分析吧

1
void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self)

​ 首先我们需要知道该函数的调用时机,对于该函数来说,我们在外部注册其为在ASIO框架接受到外部的数据包并进行读取时的回调函数,请注意,我们这里使用的外部注册函数是async_read_some函数,即在每次调用到对应的读取函数后都会进行一次回调函数,这里正是对应的回调逻辑。

​ 再明确一下这里的参数意义。第一个const boost::system::error_code& error代表由框架自行处理时会返回的错误码,无需太过在意,主要是第二个参数size_t bytes_transferred,这个参数是在本次读取中框架在缓冲区中读取到的数据的长度,第三个参数不必在意,是为了解决我们前面的生命周期的问题的。

​ 首先来进入到我们这里面的逻辑,首先就是一个简单的错误检测,略过。接下来是一个循环判断,由其来进行对应的当前剩余未处理的字节长的表示判断。这里为什么会有这个我们之后再说。先进入我们的处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
if (!_b_head_parse) {
//收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//收到的数据比头部多
//头部剩余未复制的长度
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部数据
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;
//头部长度非法
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = make_shared<MsgNode>(data_len);

//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}

memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}

​ 该段函数是我在对应的头数据包未完成对应的头部解析时会进行的逻辑处理,而这个是通过我们前面的头部处理标识量进行控制的。在进入该处理块之后。我们需要做的就是去读取数据包中的数据去进行更多的我们头部数据的读取构建。首先,我们需要去判断当前的本次读取中的数据包中是否能够支持我们本次的头部数据包的构建以此来进行进一步的数据包的分流。

​ 这里只需要一个简单的判断,即判断当前头部数据包中的数据长度以及该次读取中的数据长度总和是否大于规定的头部数据包的长度,这里即通过if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH)来进行控制的。当程序满足当前条件时,意味着当前收到的数据以及已有的数据头不足以构建我们的头部信息,此时我们需要做什么呢?停下来想一想……首先,即使当前数据包不够信息,我们也不能够丢弃,我们需要把当前读取到的所有剩余数据拷贝到对应的头部数据结构中。需要注意的是,在这里我们添加了对应的copy_len偏移,在此时我们看不出其有什么作用,接下来我们会看到的。

​ 在进行对应的数据往头部数据包中的拷贝之后,我们需要进行对应的头部数据结构体的一些信息的更新,在这里即是对应的结构的头部信息的已有长度的更新。接下来想一想处理逻辑应该是什么。既然我们当前的数据包以及被读取处理完毕,那么本次读取就应该作废了,我们接下来应该做的是关闭这次读取并且再次开启一次读取事件的注册(需要注意的是,虽然在这里没有给出,但是在对应的读取时间绑定的回调函数中,还是我们现在的这个HandleRead执行流)。有了前面的基础,你应该也能理解这里的操作会出现怎么样的行为。

​ 简单模拟一下,在这种情况下,在程序成功执行完本次读取的解析以及对应的头部结构体的数据更新之后,该程序会重新注册一个写函数以及对应的回调函数,接着,再次触发写时间,由框架调度再次进入这个HandleRead的执行流当中。此时由于我们前面没有更新对应的头部处理标志,所以这里还是会进入对应的处理逻辑中。

​ 但是在这一次中,我们去假设,if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH)在本次的逻辑处理中不再满足,换句话说,在本次中,读取的字节长度至少能够满足我们接下来的头部信息的构建,也就是所谓的>=HEAD_LENGTH。那么我们先来猜测一下之类的处理逻辑,显然是应该先把我们的头部信息给构建完毕,然后去处理我们接下来可能存在的未处理的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部数据
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;
//头部长度非法
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = make_shared<MsgNode>(data_len);

​ 首先,我们先去判断在当前的情况下对应的头部构建的所需字节,这个可以通过简单的约定头部长度减去对应的已有信息长度去进行处理。接下就是一个简单的偏移地址复制和部分信息的更新。这里你可能会疑惑一点,既然这里已经完成了对应的头部信息的构建,为什么这里不直接对于标志进行更新?别急,再等一会。

​ 接下来就到了我们的下一个处理环节,在我们构建完对应的头部节点之后,我们需要对其中蕴含的信息进行处理,这里的由于我们基础的设计就只是在头部中附上对应的数据块的大小,所以我们这里只需要将对应的数据进行读取然后进行解析即可。我们来看一下这里的处理方式。我们这里考虑直接将其拷贝到一个地址中,该地址所代表的是一个short变量,由于我们的头部长度只有2字节,所以我们的short长度是完全可以容纳的。在这里拷贝之后,我们需要了解的一点只是这里的数据读取其实就是对应的数据解析形式的不同,当我们完成拷贝之后,我们就已经完成了一个变量的构造,可以对其进行一个正常的使用。

​ 接下来我们对其进行了一个简单的错误检测,判断当前的数据包长度是否超出了我们约定的长度,如果非法,直接删除回话。否则我们就可以继续进行我们接下来的网络数据包的处理了。

1
2
3
4
5
6
7
8
9
10
11
12
13
_recv_msg_node = make_shared<MsgNode>(data_len);

//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}

​ 在成功确定我们需要对数据包中的剩余数据进行处理之后,我们首先要创建一个根据我们前面的头部长度中储存的大小来储存的消息结构,即这里的_recv_msg_node = make_shared<MsgNode>(data_len);再接下来的处理逻辑其实就跟我们一开始的整个读取逻辑的进入没有什么区别了,这里就简单自己了解一下即可。不再进行说明。

​ 接下来跟头部处理一样,如果此时剩余的数据能够完全构建我们的数据包结构,那么我们还需要对于剩余的数据进行一次再次的HandleRead对调处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;

​ 在这里有一点你可能在初次看到时会感到疑惑的点,就是这里的拷贝memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);为什么会按照固定的长度data_len进行拷贝。停下来想一想……

​ 原因在于这个执行流所在的外部一层执行流中,回想一下此时处在哪里,是处在对应的if判断成立,也就是说当前已经处理了对应的头部文件并且执行到这里的bytes_transferred已经被更新为了当前剩余的未读取数据长度了。此时还能进入这里的执行流意味着剩余的长度一定能够支持我们的整段的数据段的读取,而且由于我们这里是位于头部信息为完成构造的大前提下的,这个前提的隐藏信息就是我们当前的数据段是一个为空的情况。这也是为什么我们这里的拷贝直接使用这种构造的原因。当然,这里保留了对应的

+ _recv_msg_node->_cur_len操作,从我个人观点来看其是可以删去的,但是为了一种可读性可以进行保留,至于删除该操作是否会导致一些问题,我在之后再确认一下。

​ 在这里,我们进行了对应的内存信息的数据块拷贝之后,我们需要进行一些额外的处理,还记得现在应该是一个什么状态吗。此时对应的数据包已经被读取了对应的头部信息。同时,其剩余的信息能够支撑起对应的数据包的构建,在这里就同时支撑起了对应的数据包的构建。那么在此时我们来考虑一下当前剩余的缓存区的数据的状态。其只会存在俩种情况,剩余未读取数据为空,或者在构建完我们此次的数据块之后还留存着一些数据未进行处理。但是本质上,这俩种情况都应该导致一种状态的转移,就是此时应该转移成一个头数据块和内容数据块均未构建的情况,即我们在一开始进入该HandleRead函数的情况。我们这里的节点清理以及对应的回调函数绑定可以提现该行为。

​ 接下来注意另一种情况,即这里会进入continue执行流的情况。当进入该执行流时,就意味着当前剩余的缓存区数据还不为空,换句话说,就是在处理完一个完整的数据头的构建以及对应的数据包的处理之后,还存在未处理的数据,此时在逻辑上该段数据是一个新数据包的数据头信息。那么此时的状态就像什么呢,没错,就像我们在一个数据包刚进行对应的HandleRead处理一致,所以我们这里直接使用了对应的continue使得程序直接跳回到了程序入口处的处理逻辑省去了退出去重新注册以及对应的调度开销。

-------------本文结束 感谢阅读-------------