rocketmq demo

简介:

首先下载rocketmq,启动需要指定rocketmq home目录

cd github

git clone -b develop https://github.com/apache/incubator-rocketmq.git

代码结构代码 收藏代码

whatsmars-mq  
  |-src  
    |-main  
      |-java  
        |-com.itlong.whatsmars.mq.rocketmq.quickstart  
      BrokerStartup.java  
      Consumer.java  
      NamesrvStartup.java  
      Producer.java  
      |-resource  
        conf.properties  
  pom.xml  

依赖:

Xml代码 收藏代码

<dependencies>  
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-namesrv -->  
        <dependency>  
            <groupId>org.apache.rocketmq</groupId>  
            <artifactId>rocketmq-namesrv</artifactId>  
            <version>4.0.0-incubating</version>  
        </dependency>  
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-broker -->  
        <dependency>  
            <groupId>org.apache.rocketmq</groupId>  
            <artifactId>rocketmq-broker</artifactId>  
            <version>4.0.0-incubating</version>  
        </dependency>  
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->  
        <dependency>  
            <groupId>org.apache.rocketmq</groupId>  
            <artifactId>rocketmq-client</artifactId>  
            <version>4.0.0-incubating</version>  
        </dependency>  
  
    </dependencies>  

conf.properties

Xml代码 收藏代码

rocketmqHome=D:\\github\\incubator-rocketmq\\distribution  
namesrvAddr=127.0.0.1:9876  
mapedFileSizeCommitLog=52428800  
mapedFileSizeConsumeQueue=30000  

类似于zookeeper的服务:

Java代码 收藏代码

public class NamesrvStartup {  
  
    public static void main(String[] args) {  
        args = new String[] {"-c", "D:\\github\\whatsmars\\whatsmars-mq\\src\\main\\resources\\conf.properties"};  
        org.apache.rocketmq.namesrv.NamesrvStartup.main(args);  
    }  
}  

Broker:

Java代码 收藏代码

public class BrokerStartup {  
  
    public static void main(String[] args) {  
        args = new String[] {"-c", "D:\\github\\whatsmars\\whatsmars-mq\\src\\main\\resources\\conf.properties"};  
        org.apache.rocketmq.broker.BrokerStartup.main(args);  
        System.out.println("Broker started.");  
    }  
}  

Consumer:

Java代码 收藏代码

package com.itlong.whatsmars.mq.rocketmq.quickstart;  
  
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import org.apache.rocketmq.client.exception.MQClientException;  
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;  
import org.apache.rocketmq.common.message.MessageExt;  
  
import java.util.List;  
  
/** 
 * This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}. 
 */  
public class Consumer {  
  
    public static void main(String[] args) throws InterruptedException, MQClientException {  
  
        /* 
         * Instantiate with specified consumer group name. 
         */  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
  
        /* 
         * Specify name server addresses. 
         * <p/> 
         * 
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR 
         * <pre> 
         * {@code 
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); 
         * } 
         * </pre> 
         */  
        consumer.setNamesrvAddr("127.0.0.1:9876");  
        /* 
         * Specify where to start in case the specified consumer group is a brand new one. 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        /* 
         * Subscribe one more more topics to consume. 
         */  
        consumer.subscribe("TopicTest", "*");  
  
        /* 
         *  Register callback to execute on arrival of messages fetched from brokers. 
         */  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  
                                                            ConsumeConcurrentlyContext context) {  
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
        /* 
         *  Launch the consumer instance. 
         */  
        consumer.start();  
  
        System.out.printf("Consumer Started.%n");  
    }  
}  

Producer:

Java代码 收藏代码

package com.itlong.whatsmars.mq.rocketmq.quickstart;  
  
import org.apache.rocketmq.client.exception.MQClientException;  
import org.apache.rocketmq.client.producer.DefaultMQProducer;  
import org.apache.rocketmq.client.producer.SendResult;  
import org.apache.rocketmq.common.message.Message;  
import org.apache.rocketmq.remoting.common.RemotingHelper;  
  
/** 
 * This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}. 
 */  
public class Producer {  
    public static void main(String[] args) throws MQClientException, InterruptedException {  
  
        /* 
         * Instantiate with a producer group name. 
         */  
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");  
  
        /* 
         * Specify name server addresses. 
         * <p/> 
         * 
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR 
         * <pre> 
         * {@code 
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); 
         * } 
         * </pre> 
         */  
        producer.setNamesrvAddr("127.0.0.1:9876");  
  
        /* 
         * Launch the instance. 
         */  
        producer.start();  
  
        for (int i = 0; i < 1000; i++) {  
            try {  
  
                /* 
                 * Create a message instance, specifying topic, tag and message body. 
                 */  
                Message msg = new Message("TopicTest" /* Topic */,  
                    "TagA" /* Tag */,  
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */  
                );  
  
                /* 
                 * Call send message to deliver message to one of brokers. 
                 */  
                SendResult sendResult = producer.send(msg);  
  
                System.out.printf("%s%n", sendResult);  
            } catch (Exception e) {  
                e.printStackTrace();  
                Thread.sleep(1000);  
            }  
        }  
  
        /* 
         * Shut down once the producer instance is not longer in use. 
         */  
        producer.shutdown();  
    }  
}  

修改NamesrvStartup,BrokerStartup中的配置文件路径,依次启动NamesrvStartup,BrokerStartup,Consumer,Producer.

消息管理系统 https://github.com/javahongxi/whatsmars/tree/master/rocketmq-console

原文链接:[http://wely.iteye.com/blog/2382296]

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 人工智能 移动开发
阿里云Rocket MQ PHP Http SDK发送消息示例Demo
消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的PHP Http SDK为范例介绍RocketMQ消息的发送。
2661 0
阿里云Rocket MQ PHP Http SDK发送消息示例Demo
|
5月前
|
消息中间件 数据可视化 Windows
RabbitMQ的简单使用Demo
RabbitMQ的简单使用Demo
28 1
|
消息中间件 Java
RabbitMQ生产者消者费代码案例 (Demo超详细)
hello你好,我是辰兮,很高兴你能来阅读,本篇继续分享消息队列的实践案例,分享给初学者,大家一起进步!
509 0
RabbitMQ生产者消者费代码案例 (Demo超详细)
|
存储 消息中间件 安全
MQTT物联网通讯协议入门及Demo实现
MQTT物联网通讯协议入门及Demo实现
|
消息中间件 网络架构
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
108 0
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
|
消息中间件
8、RabbitMQ教程-Direct Exchange类型的基本使用demo
8、RabbitMQ教程-Direct Exchange类型的基本使用demo
106 0
8、RabbitMQ教程-Direct Exchange类型的基本使用demo
|
消息中间件
7、RabbitMQ教程-Fanout Exchange类型的基本使用demo
7、RabbitMQ教程-Fanout Exchange类型的基本使用demo
88 0
7、RabbitMQ教程-Fanout Exchange类型的基本使用demo
|
消息中间件 网络协议 Java
阿里云微服务消息队列(MQTT For IoT)使用Demo
微消息队列 MQTT 版是阿里云推出的一款面向移动互联网以及物联网领域的轻量级消息中间件。如果说传统的消息队列中间件一般应用于微服务之间,那么适用于物联网的微消息队列 MQTT 版则实现了端与云之间的消息传递和真正意义上的万物互联。本文结合最新推出的V3版本实例介绍产品的具体使用流程。
3744 0
阿里云微服务消息队列(MQTT For IoT)使用Demo
|
消息中间件 Java 开发工具
阿里云Rocket MQ Java Http SDK发送消费消息示例Demo
消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的Java Http SDK为范例介绍RocketMQ消息的发送和接收。
3893 0
阿里云Rocket MQ Java Http SDK发送消费消息示例Demo
|
消息中间件 Go API
阿里云Rocket MQ 管理API Go SDK使用示例Demo
消息队列 RocketMQ 版产品接口支持 HTTP 调用、SDK 调用和 OpenAPI Explorer 调用,本文主要演示Go SDK的安装和使用。
989 0

热门文章

最新文章