最佳实践:如何基于MNS实现一对多拉取消息消费模型

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 如何实现一对多拉取消息消费模型 问题背景: 阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull)模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push)模式。上面两种模型基

如何实现一对多拉取消息消费模型

问题背景

阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push模式。上面两种模型基本能满足我们大多数应用场景。

 

推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送。有些情况下,比如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型。具体方案如下:

 

解决方案:

让主题将消息先推送到队列,然后由消费者从队列拉取消息。这样既可以做到1对多的广播消息,又不需要暴露消费者的地址;如下图所示:

fd522053707462f71e44928315782ab2e1cde430

接口说明:

MNS最新的Java SDK (1.1.5)中的CloudPullTopic 默认支持上述解决方案。其中

MNSClient 提供下面两个接口来快速创建CloudPullTopic

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
 
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

其中,TopicMeta 是创建topicmeta 设置, queueNameList里指定topic消息推送的队列名列表;needCreateQueue表明queueNameList是否需要创建;queueMetaTemplate是创建queue需要的queue meta 参数设置;


Demo 代码

        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
        MNSClient client = account.getMNSClient();

        // build consumer name list.
        Vector<String> consumerNameList = new Vector<String>();
        String consumerName1 = "consumer001";
        String consumerName2 = "consumer002";
        String consumerName3 = "consumer003";
        consumerNameList.add(consumerName1);
        consumerNameList.add(consumerName2);
        consumerNameList.add(consumerName3);
        QueueMeta queueMetaTemplate = new QueueMeta();
        queueMetaTemplate.setPollingWaitSeconds(30);

        try{
            //producer code:
            // create pull topic which will send message to 3 queues for consumer.
            String topicName = "demo-topic-for-pull";
            TopicMeta topicMeta = new TopicMeta();
            topicMeta.setTopicName(topicName);
            CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

            //publish message and consume message.
            String messageBody = "broadcast message to all the consumers:hello the world.";
            // if we sent raw message,then should use getMessageBodyAsRawString to parse the message body correctly.
            TopicMessage tMessage = new RawTopicMessage(); 
            tMessage.setBaseMessageBody(messageBody);
            pullTopic.publishMessage(tMessage);

            // consumer code:
            //3 consumers receive the message.
            CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
            CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
            CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

            Message consumer1Msg = queueForConsumer1.popMessage(30);
            if(consumer1Msg != null) 
            {
                System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            Message consumer2Msg = queueForConsumer2.popMessage(30);
            if(consumer2Msg != null) 
            {
                System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            Message consumer3Msg = queueForConsumer3.popMessage(30);
            if(consumer3Msg != null) 
            {
                System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
            }else{
                System.out.println("the queue is empty");
            }

            // delete the fullTopic.
            pullTopic.delete();
        }catch(ClientException ce)
        {
            System.out.println("Something wrong with the network connection between client and MNS service."
                    + "Please check your network and DNS availablity.");
            ce.printStackTrace();
        }
        catch(ServiceException se)
        {
            /*you can get more MNS service error code in following link.
              https://help.aliyun.com/document_detail/mns/api_reference/error_code/error_code.html?spm=5176.docmns/api_reference/error_code/error_response
            */
            se.printStackTrace();
        }

        client.close();


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
|
消息中间件 存储 缓存
分布式中间件核心原理与RocketMQ最佳实践
随着互联网业务的不断扩展和复杂化,分布式系统的需求也越来越迫切。为了满足这一需求,分布式中间件应运而生。在分布式系统中,中间件的角色是协调和管理各个节点之间的通信和数据交换,它起到了桥梁的作用。本文将介绍分布式中间件的核心原理和RocketMQ最佳实践,帮助读者更好地理解和应用分布式中间件。
338 1
|
消息中间件 存储 算法
RocketMQ 重试机制详解及最佳实践
本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。
762 0
RocketMQ 重试机制详解及最佳实践
|
6月前
|
消息中间件 监控 安全
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(3)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
66 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(3)
|
6月前
|
消息中间件 Java Kafka
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
61 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
|
6月前
|
消息中间件 Cloud Native Apache
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(1)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
46 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(1)
|
12月前
|
消息中间件 Cloud Native 中间件
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(上)
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(上)
363 0
|
12月前
|
消息中间件 存储 数据采集
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(下)
带你读《企业级云原生白皮书项目实战》——4.1.3 消息队列RocketMQ版最佳实践(下)
339 0
|
12月前
|
消息中间件 存储 分布式计算
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(上)
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(上)
348 0
|
12月前
|
消息中间件 存储 分布式计算
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(下)
带你读《企业级云原生白皮书项目实战》——4.1.6 消息队列Kafka版最佳实践(下)
313 0