消息中间件核心实体(1)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 接上一篇《消息中间件核心实体(0)》,这一篇继续介绍消息中间件中的一些实体。 上一篇主要是Message、Topic、TopicMeta和Queue这样最基础的实体,这几篇介绍一些发送和消费的过程中会涉及到的实体和组件。

接上一篇《消息中间件核心实体(0)》,这一篇继续介绍消息中间件中的一些实体。

上一篇主要是Message、Topic、TopicMeta和Queue这样最基础的实体,这几篇介绍一些发送和消费的过程中会涉及到的实体和组件。

1. 发送

1.1 增强Message属性

Message一般只包含topic、tag、content这些属性,这些属性也是使用方在发送时会涉及到的内容。但是光有这些属性往往是不够的,比如我们会需要记录产生这条消息的Producer的信息;记录消息的产生时间和产生的IP信息等等。这些信息都是在Client中给消息附加上去的,对发送方来说是透明的,所以不会在Message实体中暴露,而是我们会增加一个实体:EnhancedMessage。

EnhancedMessage继承自Message,并会增加一些如下的属性:

  • bornTime

  • bornAddress

  • producer

  • etc

引申一点,Producer发送消息的大致过程如下:

  1. 增强Message属性,得到EnhancedMessage的实例

  2. 获取可以写入的队列(也可以理解成获取分区)

  3. 向队列写入消息(可以是队列暴露写入接口或者由专门的写入工具写入到队列中)

伪代码:

EnhancedMessage msg = enhance(message);
// 根据消息选择一个可以写入的目标队列
WritableQueue queue = router.select(msg);
// 写入消息(queue实现write方法进行写入)
Result result = queue.write(msg);

// write过程
// 将消息序列化成自定义协议的网络包
Packet messagePacket = Serializer.encode(msg);
// 发送网络包
bootstrap.write(messagePacket);

上面的WritableQueue暴露了API去写入,具体实现可以是写入到网络,即远端的一个Partition。而在做单元测试或者本地测试的时候,可以覆盖write的实现,而不用真正写入到网络中,这会使代码更容易测试测试。

上面两幅图是Rocket开源版本中发送相关的一些代码,私以为这段代码非常的不优雅,读起来特别累,特别是requestHeader的各种属性设置。

这段是Rocket开源版本中真正将消息写入到网络的实现,看起来总是非常臃肿,另外不知道是如何mock这些实现以达到在本地做测试的目的的。

1.2 Queue的路由选择

发送过程中会涉及到队列的选择(分区的选择),一条消息最终会根据一定的策略落到一个分区中,这里需要一个组件来完成选择(把这个组件单独抽象出来,这样便于控制写入的目标来进行测试,抽象出来也可以由使用方来实现,这样可以按照使用方自己的场景做特定的路由)。

路由组件非常的简单,一般是Router会根据topic获取到topic的元数据(元数据包含了多有分区的信息),然后根据消息的属性或者用户的参数计算出落到哪个分区,比如可以根据用户的参数对分区总数取模来选择分区,这样可以做到将某一类消息发送到一个分区,比如同一个用户的消息或同一笔订单的不同消息。

这个组件会比较简单,但是在集成的时候需要注意一点,这个组件用户可以自己注入到Producer中来达到控制分区选择策略的目的。

RocketMQ在TopicPublishInfo中实现分区的选择,TopicPublishInfo包含了队列信息(List<MessageQueue> messageQueueList属性),笔者更倾向于抽象出独立的路由组件,以便在特定的场景用户可以自己实现路由,或者在测试时可以做到使用特定路由规则。

2. 消费

消费可以分为多种方式,从获取消息的方式上可以分为Pull和Push两种类型的Consumer;从消费消息的方式上可以分为集群消费和广播消费。这里不展开讨论各种模式的实现(以后单独会讨论Consumer该实现那些内容),会以Push模式&集群消费的Consumer为例,把消费流程中涉及到的一些组件进行介绍。

2.1 分配分区

集群消费中需要保证每个分区有且只有一个Consumer在进行消费。如果某个分区没有Consumer消费,那么使用方拿不到完整的数据;如果某个分区被两个Consumer消费,那么会产生大量的重复消息。所以这里需要实现一个分区分配策略,使在分布式环境中,每个Consumer拿到属于自己的分区,且相互交叉。下面是四个分区两个Consumer默认情况下的分配结果。

实现的策略一般是:

  1. 拿到一个Topic所有的分区,对这个列表进行排序

  2. 拿到当前所有的Consumer,对Consumer列表进行排序

  3. 根据自己所处的Consumer列表的位置和Consumer总数,从分区列表中获取对应的一部分

每个分区和Consumer都有唯一的ID,这样各自按照排序后的结果进行分配,可以达到相互不交叉且不遗漏的目的。(在Consumer总数或分区数发生变化的过程中可能分配结果不正确,这个过程是短暂的,且在消费时还会结合锁去保证分区只有一个Consumer消费,所以不会对实际消费产生影响)。

同样记住一点,这个分配策略是需要暴露出去的,系统可以默认实现集群消费和广播消费的基础策略,用户可以实现自己的分配策略注入到系统中。

2.2 消息缓存

消费端一个重要的组件是消息缓存。为了提升性能,在消费端消息的获取和消息的消费是异步的。Consumer内部有线程专门从服务端获取消息写入到消息缓存中,另外有线程从缓存中获取消息调用用户的回调接口来执行业务操作。

消息缓存除了提供基础的put和take来实现存入消息和取出消息,还需要自身容量,水位控制等配置。

本身Buffer不是很复杂的部分,但是需要考虑一些流控策略,比如Buffer使用率到多少时降低从服务端获取数据的频率。

RocketMQ中实现消息缓存由ProcessQueue实现,笔者倾向于独立出Buffer模块,另外Buffer需要提供锁,以实现顺序消费。

2.3 消费进度

还有一个重要的实体是消费进度,系统需要记录“每个”Consumer的消费进度,且这个数据需要被持久化。

消费进度需要记录某个Group对某个Topic的某个分区的消费位点。进度是按照Topic维度去组织的(持久化在服务端),结构如下:

topic
    group0
        cursor0、cursor1、cursor2...
    group1
        ...
        
实现的对象应该是:
class Cursors {
    String topic;
    Cursor cursor;
    
    class Cursor {
        String group;
        // 用数组来存储一个group消费的一个topic的所有分区的进度
        // 分区数一般情况下不会变更(变更场景很少),用数据就可以
        long[] cursors;
    }
}

Consumer可以在每一次获取消息时将消费进度提交到服务端,在服务端来更新Cursors内部的数据。

3. 结语

最近两篇内容将一些基础实体和组件简单的介绍了一下,下一篇讨论一下消息应该由Server Push给Consumer还是Consumer主动来Pull消息。

往期文章:

消息中间件核心实体(0)

消息的写入和读取流程

NameServer模块划分

Client模块划分

Broker模块划分

消息中间件架构讨论

业务方对消息中间件的需求

消息中间件中的一些概念

什么是分布式消息中间件?

欢迎关注公众号来交流MQ相关问题。

 

如果本文对您有帮助,点一下右下角的“推荐”
目录
相关文章
|
消息中间件 存储 Kafka
消息中间件核心实体(0)
消息中间件核心实体(0) 最近两周在做的一个新项目,一个主从复制的组件,这两天刚跑通测试。 从之前讨论的架构来说,消息中间件也是有主从复制这个模块的,像Rocket就支持主从模式。 在做这个项目之前已经写过两个版本的主从复制模块,基本思路是: Slave主动和Master建立链接 Sla...
809 0
|
7月前
|
NoSQL Java Redis
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因:
|
7月前
|
算法 NoSQL Java
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
7月前
|
算法 NoSQL Java
2021年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
8月前
|
消息中间件 数据采集 Java
开发神技!阿里消息中间件进阶手册限时开源,请接住我的下巴
相信大家在实际工作中都用过消息中间件进行系统间数据交换,解决应用解耦、异步消息、流量削峰等问题,由此消息中间件的强大功能想必也不用我多说了!目前业界上关于消息中间件的实现多达好几十种,可谓百花齐放,所用的实现语言同样也五花八门。不管使用哪一个消息中间件,我们的目的都是实现高性能、高可用、可伸缩和最终一致性架构。
|
10月前
|
缓存 NoSQL 容灾
《Java应用提速(速度与激情)》——六、阿里中间件提速
《Java应用提速(速度与激情)》——六、阿里中间件提速
|
10月前
|
消息中间件 NoSQL Dubbo
阿里Java高级岗中间件二面:GC+IO+JVM+多线程+Redis+数据库+源码
一转眼,都2023年了,你是否在满意的公司?拿着理想的薪水? 虽然“钱多、事少、离家近”的工作可能离技术人比较远,但是找到一份合适的工作,其实并不像想象中那么难。但是,有些技术人确实是认真努力工作,但在面试时表现出的能力水平却不足以通过面试,或拿到高薪,其实不外乎以下 2 个原因: 第一,“知其然不知其所以然”。做了多年技术,开发了很多业务应用,但似乎并未思考过种种技术选择背后的逻辑。所以,他无法向面试官展现出自己未来技术能力的成长潜力。面试官也不会放心把具有一定深度的任务交给他。 第二,知识碎片化,不成系统。在面试中,面试者似乎无法完整、清晰地描述自己所开发的系统,或者使用的相关技术。
|
10月前
|
SQL 算法 NoSQL
2023年阿里高频Java面试题:分布式+中间件+高并发+算法+数据库
又到了一年一度的金九银十,互联网行业竞争是一年比一年严峻,作为工程师的我们唯有不停地学习,不断的提升自己才能保证自己的核心竞争力从而拿到更好的薪水,进入心仪的企业(阿里、字节、美团、腾讯.....)
|
11月前
|
存储 缓存 人工智能
2022互联网寒冬,看看阿里中间件团队如何降本提效?(2)
2022互联网寒冬,看看阿里中间件团队如何降本提效?
193 0
|
11月前
|
人工智能 Kubernetes 算法
2022互联网寒冬,看看阿里中间件团队如何降本提效?(1)
2022互联网寒冬,看看阿里中间件团队如何降本提效?
178 1

相关产品

  • 云消息队列 MQ
  • 云消息队列 Kafka 版
  • 微服务引擎