RocketMQ学习(一):简介和QuickStart

简介:

RocketMQ是什么?

引用官方描述:
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

支持严格的消息顺序
支持Topic与Queue两种模式
亿级消息堆积能力
比较友好的分布式特性
同时支持Push与Pull方式消费消息
历经多次天猫双十一海量消息考验

RocketMQ是纯java编写,基于通信框架Netty。

代码地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。

下载完代码后,将各个模块导入eclipse,本地尝试启动看看。

1.启动nameServer,运行rocketmq-namesrv的NamesrvStartup,运行之前需设置环境变量ROCKETMQ_HOME为RocketMQ项目的根目录,这样有一个作用是,指向logback的配置文件路径,保证在nameServer启动时,logback的正常初始化。我本机设置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。
The Name Server boot success. 表示启动成功。

2.启动brokerServer,运行rocketmq-broker的BrokerStartup,同样,运行之前需设置环境变量ROCKETMQ_HOME,然后启动参数需要带上【-n “192.168.0.109:9876″】,我本机的ip是192.168.0.109。如果不带-n的参数,那么broker会去访问http://jmenv.tbsite.net:8080/rocketmq/nsaddr获取nameServer的地址,这个地址不是我们自己的nameServer。
The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。

3.这个非必选项,不运行也可以。还可以启动rocketmq-srvutil的FiltersrvStartup,这是Consumer使用Java代码,在服务器做消息过滤。启动方式和broker一样,具体的过滤原理以后再详细的说。

到此就可以运行demo了。

pom.xml依赖:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>3.2.2</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

如果依赖包下载不下来,再给个仓库地址,开源中国的:

1

2

3

4

5

6

7

8

9

10

11

12

13

<repositories>

<repository>

<id>nexus</id>

<name>Nexus</name>

<url>http://maven.oschina.net/content/groups/public/</url>

<releases>

<enabled>true</enabled>

</releases>

<snapshots>

<enabled>true</enabled>

</snapshots>

</repository>

</repositories>

贴代码:
Producer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

package com.zoo.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

/**

* Producer,发送消息

*

*/

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.109:9876");

producer.start();

for (int i = 0; i < 1000; i++) {

try {

Message msg = new Message("TopicTest",// topic

"TagA",// tag

("Hello RocketMQ " + i).getBytes()// body

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

Thread.sleep(3000);

}

catch (Exception e) {

e.printStackTrace();

Thread.sleep(3000);

}

}

producer.shutdown();

}

}

Consumer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

package com.zoo.quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,订阅消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.0.109:9876");

/**

* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

* 如果非第一次启动,那么按照上次消费的位置继续消费

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyContext context) {

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

System.out.println(" Receive Message Size: " + msgs.size());

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

因为demo代码来自于rocketmq-example,所以没有上传Github。

ps:以前rocketmq在Github开源的时候没有学习,后来突然有一天发现Github上404了,心里后悔莫急,这次rocketmq重新开源出来,一定不能错过了。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
|
3月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
39 0
|
12天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
15 0
|
2月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
53 0
|
3月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
30 0
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
66 0
|
3月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
324 1
|
3月前
|
消息中间件 存储
深入学习RabbitMQ五种模式(二)
深入学习RabbitMQ五种模式(二)
25 0
|
3月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(基于SpringBoot)