在 muduo 中实现 protobuf 编解码器与消息分发器

简介:

陈硕 (giantchen_AT_gmail)

Blog.csdn.net/Solstice  t.sina.com.cn/giantchen

Muduo 全系列文章列表: http://blog.csdn.net/Solstice/category/779646.aspx

本文是《一种自动反射消息类型的 Google Protobuf 网络传输方案》的延续,介绍如何将前文介绍的打包方案与 muduo::net::Buffer 结合,实现了 protobuf codec 和 dispatcher。

Muduo 的下载地址: http://muduo.googlecode.com/files/muduo-0.1.9-alpha.tar.gz ,SHA1 dc0bb5f7becdfc0277fb35f6dfaafee8209213bc ,本文的完整代码可在线阅读 http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/ 。

考虑到不是每个人都安装了 Google Protobuf,muduo 中的 protobuf 相关示例默认是不 build 的,如果你的机器上安装了 protobuf 2.3.0 或 2.4.0a,那么可以用 ./build.sh protobuf_all 来构建 protobuf 相关的 examples。

在介绍 codec 和 dispatcher 之前,先讲讲前文的一个未决问题。

为什么 Protobuf 的默认序列化格式没有包含消息的长度与类型?

Protobuf 是经过深思熟虑的消息打包方案,它的默认序列化格式没有包含消息的长度与类型,自然有其道理。哪些情况下不需要在 protobuf 序列化得到的字节流中包含消息的长度和(或)类型?我能想到的答案有:

  • 如果把消息写入文件,一个文件存一个消息,那么序列化结果中不需要包含长度和类型,因为从文件名和文件长度中可以得知消息的类型与长度。
  • 如果把消息写入文件,一个文件存多个消息,那么序列化结果中不需要包含类型,因为文件名就代表了消息的类型。
  • 如果把消息存入数据库(或者 NoSQL),以 VARBINARY 字段保存,那么序列化结果中不需要包含长度和类型,因为从字段名和字段长度中可以得知消息的类型与长度。
  • 如果把消息以 UDP 方式发生给对方,而且对方一个 UDP port 只接收一种消息类型,那么序列化结果中不需要包含长度和类型,因为从 port 和 UDP packet 长度中可以得知消息的类型与长度。
  • 如果把消息以 TCP 短连接方式发给对方,而且对方一个 TCP port 只接收一种消息类型,那么序列化结果中不需要包含长度和类型,因为从 port 和 TCP 字节流长度中可以得知消息的类型与长度。
  • 如果把消息以 TCP 长连接方式发给对方,但是对方一个 TCP port 只接收一种消息类型,那么序列化结果中不需要包含类型,因为 port 代表了消息的类型。
  • 如果采用 RPC 方式通信,那么只需要告诉对方 method name,对方自然能推断出 Request 和 Response 的消息类型,这些可以由 protoc 生成的 RPC stubs 自动搞定。

对于最后一点,比方说 sudoku.proto 定义为:

service SudokuService {
  rpc Solve (SudokuRequest) returns (SudokuResponse);
}

那么 RPC method Sudoku.Solve 对应的请求和响应分别是 SudokuRequest 和 SudokuResponse。在发送 RPC 请求的时候,不需要包含 SudokuRequest 的类型,只需要发送 method name Sudoku.Solve,对方自知道应该按照 SudokuRequest 来解析(parse)请求。这个例子来自我的半成品项目 evproto,见 http://blog.csdn.net/Solstice/archive/2010/04/17/5497699.aspx 。

对于上述这些情况,如果 protobuf 无条件地把长度和类型放到序列化的字节串中,只会浪费网络带宽和存储。可见 protobuf 默认不发送长度和类型是正确的决定。Protobuf 为消息格式的设计树立了典范,哪些该自己搞定,哪些留给外部系统去解决,这些都考虑得很清楚。

只有在使用 TCP 长连接,且在一个连接上传递不止一种消息的情况下(比方同时发 Heartbeat 和 Request/Response),才需要我前文提到的那种打包方案。(为什么要在一个连接上同时发 Heartbeat 和业务消息?请见陈硕《分布式系统的工程化开发方法》 p.51 心跳协议的设计。)这时候我们需要一个分发器 dispatcher,把不同类型的消息分给各个消息处理函数,这正是本文的主题之一。

以下均只考虑 TCP 长连接这一应用场景。

先谈谈编解码器。

什么是编解码器 codec?

Codec 是 encoder 和 decoder 的缩写,这是一个到软硬件都在使用的术语,这里我借指“把网络数据和业务消息之间互相转换”的代码。

在最简单的网络编程中,没有消息 message 只有字节流数据,这时候是用不到 codec 的。比如我们前面讲过的 echo server,它只需要把收到的数据原封不动地发送回去,它不必关心消息的边界(也没有“消息”的概念),收多少就发多少,这种情况下它干脆直接使用 muduo::net::Buffer,取到数据再交给 TcpConnection 发送回去,见下图。

codec_echo

non-trivial 的网络服务程序通常会以消息为单位来通信,每条消息有明确的长度与界限。程序每次收到一个完整的消息的时候才开始处理,发送的时候也是把一个完整的消息交给网络库。比如我们前面讲过的 asio chat 服务,它的一条聊天记录就是一条消息,我们设计了一个简单的消息格式,即在聊天记录前面加上 4 字节的 length header,LengthHeaderCodec 代码及解说见《Muduo 网络编程示例之二:Boost.Asio 的聊天服务器》一文。

codec 的基本功能之一是做 TCP 分包:确定每条消息的长度,为消息划分界限。在 non-blocking 网络编程中,codec 几乎是必不可少的。如果只收到了半条消息,那么不会触发消息回调,数据会停留在 Buffer 里(数据已经读到 Buffer 中了),等待收到一个完整的消息再通知处理函数。既然这个任务太常见,我们干脆做一个 utility class,避免服务端和客户端程序都要自己处理分包,这就有了 LengthHeaderCodec。这个 codec 的使用有点奇怪,不需要继承,它也没有基类,只要把它当成普通 data member 来用,把 TcpConnection 的数据喂给它,然后向它注册 onXXXMessage() 回调,代码见 asio chat 示例。muduo 里的 codec 都是这样的风格,通过 boost::function 粘合到一起。

codec 是一层间接性,它位于 TcpConnection 和 ChatServer 之间,拦截处理收到的数据,在收到完整的消息之后再调用 CharServer 对应的处理函数,注意 CharServer::onStringMessage() 的参数是 std::string,不再是 muduo::net::Buffer,也就是说 LengthHeaderCodec 把 Buffer 解码成了 string。另外,在发送消息的时候,ChatServer 通过 LengthHeaderCodec::send() 来发送 string,LengthHeaderCodec 负责把它编码成 Buffer。这正是“编解码器”名字的由来。

codec_chat

Protobuf codec 与此非常类似,只不过消息类型从 std::string 变成了 protobuf::Message。对于只接收处理 Query 消息的 QueryServer 来说,用 ProtobufCodec 非常方便,收到 protobuf::Message 之后 down cast 成 Query 来用就行。如果要接收处理不止一种消息,ProtobufCodec 恐怕还不能单独完成工作,请继续阅读下文。

codec_protobuf

实现 ProtobufCodec

Protobuf 的打包方案我已经在《一种自动反射消息类型的 Google Protobuf 网络传输方案》中讲过,并以 string 为载体演示了 encode 和 decode 操作。在 muduo 里,我们有专门的 Buffer class,编码更轻松。

编码算法很直截了当,按照前文定义的消息格式一路打包下来,最后更新一下首部的长度即可。

解码算法有几个要点:

  • protobuf::Message 是 new 出来的对象,它的生命期如何管理?muduo 采用 shared_ptr<Message> 来自动管理对象生命期,这与其他地方的做法是一致的。
  • 出错如何处理?比方说长度超出范围、check sum 不正确、message type name 不能识别、message parse 出错等等。ProtobufCodec 定义了 ErrorCallback,用户代码可以注册这个回调。如果不注册,默认的处理是断开连接,让客户重连重试。codec 的单元测试里模拟了各种出错情况。
  • 如何处理一次收到半条消息、一条消息、一条半消息、两条消息等等情况?这是每个 non-blocking 网络程序中的 codec 都要面对的问题。

ProtobufCodec 在实际使用中有明显的不足:它只负责把 muduo::net::Buffer 转换为具体类型的 protobuf::Message,应用程序拿到 Message 之后还有再根据其具体类型做一次分发。我们可以考虑做一个简单通用的分发器 dispatcher,以简化客户代码。

此外,目前 ProtobufCodec 的实现非常初级,它没有充分利用 ZeroCopyInputStream 和 ZeroCopyOutputStream,而是把收到的数据作为 byte array 交给 protobuf Message 去解析,这给性能优化留下了空间。protobuf Message 不要求数据连续(像 vector 那样),只要求数据分段连续(像 deque 那样),这给 buffer 管理带来性能上的好处(避免重新分配内存,减少内存碎片),当然也使得代码变复杂。muduo::net::Buffer 非常简单,它内部是 vector<char>,我目前不想让 protobuf 影响 muduo 本身的设计,毕竟 muduo 是个通用的网络库,不是为实现 protobuf RPC 而特制的。

消息分发器 dispatcher 有什么用?

前面提到,在使用 TCP 长连接,且在一个连接上传递不止一种 protobuf 消息的情况下,客户代码需要对收到的消息按类型做分发。比方说,收到 Logon 消息就交给 QueryServer::onLogon() 去处理,收到 Query 消息就交给 QueryServer::onQuery() 去处理。这个消息分派机制可以做得稍微有点通用性,让所有 muduo+protobuf 程序收益,而且不增加复杂性。

换句话说,又是一层间接性,ProtobufCodec 拦截了 TcpConnection 的数据,把它转换为 Message,ProtobufDispatcher 拦截了 ProtobufCodec 的 callback,按消息具体类型把它分派给多个 callbacks。

codec_dispatcher

ProtobufCodec 与 ProtobufDispatcher 的综合运用

我写了两个示例代码,client 和 server,把 ProtobufCodec 和 ProtobufDispatcher 串联起来使用。server 响应 Query 消息,发生回 Answer 消息,如果收到未知消息类型,则断开连接。client 可以选择发送 Query 或 Empty 消息,由命令行控制。这样可以测试 unknown message callback。

为节省篇幅,这里就不列出代码了,请移步阅读

http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/client.cc 

http://code.google.com/p/muduo/source/browse/trunk/examples/protobuf/codec/server.cc

在构造函数中,通过注册回调函数把四方 (TcpConnection、codec、dispatcher、QueryServer) 结合起来。

ProtobufDispatcher 的两种实现

要完成消息分发,那么就是对消息做 type-switch,这似乎是一个 bad smell,但是 protobuf Message 的 Descriptor 没有留下定制点(比如暴露一个 boost::any 成员),我们只好硬来了。

先定义

typedef boost::function<void (Message*)> ProtobufMessageCallback;

注意,本节出现的不是 muduo dispatcher 真实的代码,仅为示意,突出重点,便于画图。

ProtobufDispatcherLite 的结构非常简单,它有一个 map<Descriptor*, ProtobufMessageCallback> 成员,客户代码可以以 Descriptor* 为 key 注册回调(recall: 每个具体消息类型都有一个全局的 Descriptor 对象,其地址是不变的,可以用来当 key)。在收到 protobuf Message 之后,在 map 中找到对应的 ProtobufMessageCallback,然后调用之。如果找不到,就调用 defaultCallback。

codec_dispatcher_lite

当然,它的设计也有小小的缺陷,那就是 ProtobufMessageCallback 限制了客户代码只能接受基类 Message,客户代码需要自己做向下转型,比如:

codec_query_server1

如果我希望 QueryServer 这么设计:不想每个消息处理函数自己做 down casting,而是交给 dispatcher 去处理,客户代码拿到的就已经是想要的具体类型。如下:

codec_query_server2

那么该该如何实现 ProtobufDispatcher 呢?它如何与多个未知的消息类型合作?做 down cast 需要知道目标类型,难道我们要用一长串模板类型参数吗?

有一个办法,把多态与模板结合,利用 templated derived class 来提供类型上的灵活性。设计如下。

codec_dispatcher_class

ProtobufDispatcher 有一个模板成员函数,可以接受注册任意消息类型 T 的回调,然后它创建一个模板化的派生类 CallbackT<T>,这样消息的类新信息就保存在了 CallbackT<T> 中,做 down casting 就简单了。

比方说,我们有两个具体消息类型 Query 和 Answer。

codec_query

然后我们这样注册回调:

dispatcher_.registerMessageCallback<muduo::Query>(
    boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
dispatcher_.registerMessageCallback<muduo::Answer>(
    boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));

这样会具现化 (instantiation) 出两个 CallbackT 实体,如下:

codec_query_callback

以上设计参考了 shared_ptr 的 deleter,Scott Meyers 也谈到过

ProtobufCodec 和 ProtobufDispatcher 有何意义?

ProtobufCodec 和 ProtobufDispatcher 把每个直接收发 protobuf Message 的网络程序都会用到的功能提炼出来做成了公用的 utility,这样以后新写 protobuf 网络程序就不必为打包分包和消息分发劳神了。它俩以库的形式存在,是两个可以拿来就当 data member 用的 class,它们没有基类,也没有用到虚函数或者别的什么面向对象特征,不侵入 muduo::net 或者你的代码。如果不这么做,那将来每个 protobuf 网络程序都要自己重新实现类似的功能,徒增负担。

下一篇文章讲《分布式程序的自动回归测试》会介绍利用 protobuf 的跨语言特性,采用 Java 为 C++ 服务程序编写 test harness。

posted on 2011-04-13 07:48 陈硕 阅读(3996) 评论(9编辑 收藏

评论

#1楼 2011-04-13 09:30 Paul Wong  

好文章,近断时间正在了解这方面的知识。
  

#2楼 2011-04-13 09:40 大石头  

非常好的东西,顶一个!

我们也做了一个消息模型,用自己的二进制序列化器。当时也考虑过Protobuf,那是个好东西,只是在控制方面还不够灵活,不能满足要求。

我们的消息开头就是编号,用7位压缩编码写的,接收者就是根据这个编号来识别是哪一种消息类型,然后根据该类型进行反序列化。

因为指定了类型,所以数据里面不需要指定长度。
如果遇到需要使用动态长度数据的消息,消息实现本身要加一个字段表明长度,然后扩展反序列化,告诉Reader,后面这个家伙的长度是多少。

为了减少传输数据的长度,我们的二进制序列化器,不写长度,不写类型,不写成员名称,整数一律采用7位编码存储。
说到底,就跟C++里面一个对象的内存模型一样,这边把对象的内存数据拿走,接收者把数据放入内存,成为一个对象
  

#3楼 2011-04-13 14:44 msnweb  

....ls 的。用7位压缩编码写的,接收者就是根据这个编号来识别是哪一种消息类型,然后根据该类型进行反序列化。拿到序列号就能解析出消息长度吗?不能,除非消息是定长的。否则你的解析器需要做到非常智能,比如解析http协议那样的解析器。否则如果一段的数据发生了错误,你的解析器能立刻发现问题吗?不行,因为你缺少一些边界校验。

至于lz 这篇文章。只是用map<Descriptor*, ProtobufMessageCallback> 成员 代替了 switch case 的做法。用 mether string 代替了手工的编号技术。lz的组包方式,已经拍过砖了 就不多说了。

关于lz 自己一直宣扬自己的 vector<char> 做buff, 我只能保留自己的意见。 一个buff 居然需要使用 vector 您不嫌太重了吗?lz 曾经写过一篇批判ACE的文章,老夫挺有同感,现在看来 lz 在ACE 的道路上继续前行!
  

#4楼 2012-09-16 22:24 hailong  

我想请问下关于关于google probuf的问题,如果我拥有的字段是可变的,比如TLV编码,中L(length)是可变的,可以是一个字节或者两个字节,用C++ ungigned表示,当一个字节时,长度范围是(0-127),所以连个字节就是>127,如果自己写代码就判断下第一个字节是否为1来判断;但是我想如果自己写解码比较麻烦,还不知道probuf中能否有自定义解码规则,您知道吗,谢谢啦!
  

#5楼[楼主2012-09-17 07:44 陈硕  

@ hailong
既然使用了protobuf,为什么你还要关心它如何编码呢?
字段直接定义成 int 类型就可以,protobuf 对整数采用变长编码。
  

#6楼 2014-05-05 18:25 ChironChan  

请教一下。复杂的message结构,许多个optional。。对性能影响大吗?optional项可能每次内容不一样。。
  

#7楼[楼主2014-05-06 01:06 陈硕  

@ ChironChan
你做一下 benchmark 不就知道了。
  

#8楼 2014-05-06 09:29 ChironChan  

好吧。确实有点偷懒了。谢谢博主回复。
  

#9楼 2014-07-13 20:36 卿言放纵  

@ ChironChan
引用请教一下。复杂的message结构,许多个optional。。对性能影响大吗?optional项可能每次内容不一样。。
基本上没影响,这个是按照id做的一个KV队,如果optional很多,就是KV大一点而已,只要别搞个几十万条,其他的无所谓

ProtoBuf使用,主要注意两点:
1, bytes和string的数据长度别超过 ( 508 * 1024 - 49 ) ,否则序列号时会有一个数量级的性能降低;因为内存管理方式上的不同,直接把数据打包入PB中,比使用带外数据要快一些;不过如果考虑通用性,大数据最好都走带外数据;
2, PB不具有边界性,一定要注意保持他的完整性



  
    本文转自 陈硕  博客园博客,原文链接:http://www.cnblogs.com/Solstice/archive/2011/04/13/2014362.html ,如需转载请自行联系原作者





相关文章
|
12月前
|
存储 编解码 安全
Opus从入门到精通(二):编解码器使用
opus_encoder_get_size()返回编码器状态要求的大小。注意,这段代码的未来版本可能改变大小,所以没有assuptions应该对它做出。编码器状态在内存中总是连续,复制它只要一个浅拷贝就足够了。使用opus_encoder_ctl()接口可以改变一些编码器的参数设置。所有这些参数都已有缺省值,所以只在必要的情况下改变它们。
1080 0
|
3月前
|
存储 XML JSON
原来可以这么使用 Protobuf
原来可以这么使用 Protobuf
105 0
|
4月前
|
设计模式 JSON 编解码
Netty使用篇:编解码器
Netty使用篇:编解码器
|
4月前
|
XML 存储 JSON
一文简单聊聊protobuf
一文简单聊聊protobuf
|
5月前
|
JavaScript Java PHP
Protobuf 3.3 使用总结
Protobuf 3.3 使用总结
48 0
|
8月前
|
XML 存储 Java
Protobuf了解一下?
Protobuf了解一下?
66 0
|
编解码 JSON 安全
IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理
本篇将从Base64再到Base128编码,带你一起从底层来理解Protobuf的数据编码原理。 本文结构总体与 Protobuf 官方文档相似,不少内容也来自官方文档,并在官方文档的基础上添加作者理解的内容(确保不那么枯燥),如有出入请以官方文档为准。
329 0
IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理
|
编解码 缓存 API