使用阿里云消息队列

简介:

使用阿里云消息队列

控制台地址:http://ons.console.aliyun.com/#/home/topic

 

(1)生成Producer ID

点击"申请发布"

 示例代码:

Java代码   收藏代码
  1. package com.alibaba.ons.demo;  
  2.   
  3. import java.util.Properties;  
  4.   
  5. import com.aliyun.openservices.ons.api.Message;  
  6. import com.aliyun.openservices.ons.api.ONSFactory;  
  7. import com.aliyun.openservices.ons.api.Producer;  
  8. import com.aliyun.openservices.ons.api.PropertyKeyConst;  
  9. import com.aliyun.openservices.ons.api.SendResult;  
  10.   
  11. public class ProducerClient {  
  12.   
  13.     public static void main(String[] args) {  
  14.        Properties properties = new Properties();  
  15.        properties.put(PropertyKeyConst.ProducerId, "PID_whuang");  
  16.        properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");  
  17.        properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");  
  18.        Producer producer = ONSFactory.createProducer(properties);  
  19.              
  20.        //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。  
  21.        producer.start();  
  22.        Message msg = new Message(  
  23.             //Message Topic  
  24.             "com_hbjltv",   
  25.             //Message Tag,  
  26.             //可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤          
  27.             "TagA",  
  28.             //Message Body  
  29.             //任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式  
  30.             "Hello ONS".getBytes()  
  31.         );  
  32.           
  33.         // 设置代表消息的业务关键属性,请尽可能全局唯一。  
  34.         // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。  
  35.         // 注意:不设置也不会影响消息正常收发  
  36.         msg.setKey("ORDERID_100");  
  37.           
  38.         //发送消息,只要不抛异常就是成功  
  39.         SendResult sendResult = producer.send(msg);  
  40.         System.out.println(sendResult);  
  41.   
  42.         // 在应用退出前,销毁Producer对象  
  43.         // 注意:如果不销毁也没有问题  
  44.         producer.shutdown();  
  45.     }  
  46. }  

 

 

 

(2)生成Consumer ID

点击"申请订阅"

 示例代码:

Java代码   收藏代码
  1. public class ConsumerTest {  
  2.     public static void main(String[] args) {  
  3.         Properties properties = new Properties();  
  4.         properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile");  
  5.         properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");  
  6.         properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");  
  7.         Consumer consumer = ONSFactory.createConsumer(properties);  
  8.         consumer.subscribe("com_hbjltv""*"new MessageListener() {  
  9.             public Action consume(Message message, ConsumeContext context) {  
  10.                 System.out.println("Receive: " + message);  
  11.                 return Action.CommitMessage;  
  12.             }  
  13.         });  
  14.         consumer.start();  
  15.         System.out.println("Consumer Started");  
  16.     }  
  17. }     

 

(3) clientId 的限制

阿里云消息队列对clientId的名称有严格限制:

(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,

例如:

CID_tv_mobile@@@86458fd 是合法的

CID_tv_mobile@@86458fd 是非法的,因为只有两个@

(b)总长度不能超过23个字符

例如

CID_tv_mobile@@@86458_A是合法的

CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符



 

 

(4)在手机端(客户端)增加订阅逻辑

 

Java代码   收藏代码
  1. package com.service;  
  2.   
  3. import java.security.InvalidKeyException;  
  4. import java.security.NoSuchAlgorithmException;  
  5.   
  6. import org.eclipse.paho.client.mqttv3.MqttClient;  
  7. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
  8. import org.eclipse.paho.client.mqttv3.MqttException;  
  9. import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;  
  10.   
  11. import android.app.Service;  
  12. import android.content.Context;  
  13. import android.content.Intent;  
  14. import android.content.SharedPreferences;  
  15. import android.os.IBinder;  
  16.   
  17. import com.common.util.SystemHWUtil;  
  18. import com.dict.Constants3;  
  19. import com.jianli.R;  
  20. import com.push.PushCallback;  
  21. import com.string.widget.util.ValueWidget;  
  22. import com.util.MacSignature;  
  23. import com.util.ShopUtil;  
  24.   
  25. /** 
  26.  * @author Dominik Obermaier 
  27.  */  
  28. public class MQTTService extends Service {  
  29.   
  30.     // public static final String BROKER_URL =  
  31.     // "tcp://broker.mqttdashboard.com:1883";  
  32.     // public static String BROKER_URL = "tcp://172.16.15.50:1883";  
  33.     public static String BROKER_URL_FORMAT = "tcp://%s:%s";  
  34.     // public static final String BROKER_URL = "tcp://test.mosquitto.org:1883";  
  35.   
  36.     /* 
  37.      * In a real application, you should get an Unique Client ID of the device 
  38.      * and use this, see 
  39.      * http://android-developers.blogspot.de/2011/03/identifying 
  40.      * -app-installations.html 
  41.      */  
  42.     public static String clientId = null;  
  43.   
  44.     /** 
  45.      * 不能含有英文句点,可以包含下划线 
  46.      */  
  47.     public static final String TOPIC = "com_hbjltv";  
  48.     private MqttClient mqttClient;  
  49. //  private String ip="182.92.80.122";  
  50.     /*** 
  51.      * 是否连接上activeMQ 
  52.      */  
  53.     private boolean online = false;  
  54.     boolean isAliyun=false;  
  55.   
  56.     public IBinder onBind(Intent intent) {  
  57.         return null;  
  58.     }  
  59.   
  60.     @Override  
  61.     public void onCreate() {  
  62.         super.onCreate();  
  63.     }  
  64.   
  65.     private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{  
  66.         return new MqttClient(serverURL, clientId,  
  67.                 new MemoryPersistence());  
  68.     }  
  69.     /*** 
  70.      *  
  71.      * @param serverURL 
  72.      * @param clientId 
  73.      *            : 最大长度:23 
  74.      * @param isAllowOffline 
  75.      * @param username 
  76.      * @param password 
  77.      * @throws MqttException 
  78.      */  
  79.     private void connectAndSubscribe(String serverURL, String clientId,  
  80.     /* String topicFilter, */boolean isAllowOffline, String username,  
  81.             String password) throws MqttException {  
  82.           
  83.         if(isAliyun){  
  84.             if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){  
  85.                 return;  
  86.             }  
  87.         }  
  88.         mqttClient = createMqttClient(serverURL, clientId);  
  89.         MqttConnectOptions options = new MqttConnectOptions();  
  90.         options.setCleanSession(!isAllowOffline);// mqtt receive offline message  
  91.         if (ValueWidget.isNullOrEmpty(username)) {  
  92.             username = null;  
  93.         }  
  94.         String sign=null;  
  95.         if(isAliyun){  
  96.             try {  
  97.                 sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password);  
  98.                 password=sign;  
  99.             } catch (InvalidKeyException e) {  
  100.                 e.printStackTrace();  
  101.             } catch (NoSuchAlgorithmException e) {  
  102.                 e.printStackTrace();  
  103.             }  
  104.               
  105.         }  
  106.         if (ValueWidget.isNullOrEmpty(password)) {  
  107.             password = null;  
  108.         } else {  
  109.             options.setPassword(password.toCharArray());  
  110.         }  
  111.         options.setUserName(username);  
  112.         options.setConnectionTimeout(10);  
  113.         options.setKeepAliveInterval(10);  
  114.         if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null  
  115.             mqttClient = createMqttClient(serverURL, clientId);  
  116.         }  
  117.         mqttClient.setCallback(new PushCallback(this));  
  118.         boolean isSuccess=false;  
  119.         mqttClient.connect(options);  
  120.         isSuccess=true;  
  121.         // Subscribe to all subtopics of homeautomation  
  122.         // mqttClient.subscribe(topicFilter);  
  123.         if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null  
  124.             mqttClient = createMqttClient(serverURL, clientId);  
  125.         }  
  126.         if(isAliyun){  
  127.             final String p2ptopic = TOPIC+"/p2p/";  
  128.             //同时订阅两个topic,一个是基于标准mqtt协议的发布订阅模式,一个是扩展的点对点推送模式  
  129.             final String[] topicFilters=new String[]{TOPIC,p2ptopic};  
  130.             mqttClient.subscribe(topicFilters);  
  131.         }else{  
  132.             mqttClient.subscribe(new String[] { TOPIC, clientId });  
  133.         }  
  134.     }  
  135.   
  136.     @Override  
  137.     public void onStart(Intent intent, int startId) {  
  138.         final boolean isRestart=intent.getBooleanExtra("isRestart"false);  
  139.         ShopUtil.logger2("restart MQTT service:"+isRestart);  
  140.         // super.onStart(intent, startId);  
  141. //      if (intent == null) {//重启服务时intent 确实为空  
  142.           
  143. //          Log.d(Constants.LOG_TAG, "intent is null");  
  144. //          return;  
  145. //      }  
  146.         Context context = getApplicationContext();  
  147.         clientId = ShopUtil.getIMEI(context);  
  148.         // Bundle bundle=intent.getExtras();  
  149.         // String ip=bundle.getString(Constants.ACTIVEMQ_IP);  
  150. //      final String ip = context.getString(R.string.pushserver_ip);  
  151.         SharedPreferences preferences = getApplicationContext()  
  152.                 .getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME,  
  153.                         Context.MODE_PRIVATE);  
  154.         final String ip ="mqtt.ons.aliyun.com";// preferences.getString("pushserver_ip", context.getString(R.string.pushserver_ip));  
  155.         final String port = preferences.getString("pushserver_port""1883");  
  156.         isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS""false"));  
  157.         // String topic=bundle.getString(Constants.ACTIVEMQ_TOPIC);  
  158.          System.out.println("push ip:"+ip);  
  159.         new Thread(new Runnable() {  
  160.             /**** 
  161.              * 尝试连接的次数,为什么要尝试连接几次那? 
  162.              * (1)无wifi时启动,则肯定连接失败,所以尝试连接三次,只要在这个期间启动wifi就可以连接上activeMQ;<br /> 
  163.              * (2)之前连接上,然后断开wifi,然后又启动wifi,<br /> 
  164.              * 这时容易报 "Broker unavailable"异常,暂时不清楚原因,所以也需要继续尝试连接;<br /> 
  165.              *  
  166.              */  
  167.             private int tryTime = 5;  
  168.   
  169.             @Override  
  170.             public void run() {  
  171.                 System.out.println(tryTime+","+mqttClient+","+isOnline() );  
  172.                 while (tryTime > 0  
  173.                         && (!isOnline() || mqttClient == null || (!mqttClient  
  174.                                 .isConnected())||isRestart)) {  
  175.                     try {  
  176.                         ShopUtil.logger2("start push service");  
  177.                         ShopUtil.logger2("push server:"+ip);  
  178.                         String prefix=Constants3.CONSUMER_ID_TV+"@@@";  
  179.                         int remainingLength=23-prefix.length();  
  180.                         String suffix=null;  
  181.                         if(clientId.length()>remainingLength){  
  182.                             suffix=clientId.substring(0,remainingLength);  
  183.                         }else{  
  184.                             suffix=clientId;  
  185.                         }  
  186.                         String clientId2=prefix+suffix;  
  187.                         connectAndSubscribe(String.format(  
  188.                                 MQTTService.BROKER_URL_FORMAT, ip, port),  
  189.                                 clientId2, /* topic, */true""/*自己申请的access key*/""/*secret*/);  
  190.                          ShopUtil.logger2("clientId:" + clientId2);  
  191.                         ShopUtil.logger2("succeed to connect to activeMQ");  
  192.                         setOnline(true);  
  193.                     } catch (MqttException e) {  
  194.                         setOnline(false);  
  195.                         mqttClient=null;  
  196.                          e.printStackTrace();  
  197.                         ShopUtil.logger2("抛异常:"+e.getMessage());  
  198.                         ShopUtil.logger2("ip:" + ip + " ,port:" + port);  
  199.                         try {  
  200.                             Thread.sleep(10000);  
  201.                         } catch (InterruptedException e1) {  
  202.                             e1.printStackTrace();  
  203.                         }  
  204.                     }  
  205.                     tryTime--;  
  206.                 }  
  207.   
  208.             }  
  209.         }).start();  
  210. //      new Thread(new Runnable() {  
  211. //          @Override  
  212. //          public void run() {  
  213. //              System.out.println("start:"+System.currentTimeMillis());  
  214. //              try {  
  215. //                  Thread.sleep(10000);  
  216. //              } catch (InterruptedException e) {  
  217. //                  e.printStackTrace();  
  218. //              }  
  219. //              while(true){  
  220. //                  try {  
  221. //                      Thread.sleep(10000);  
  222. //                      if(mqttClient!=null&& !mqttClient.isConnected()){  
  223. //                          System.out.println("disConnected:"+System.currentTimeMillis());  
  224. //                      }  
  225. //                  } catch (InterruptedException e) {  
  226. //                      e.printStackTrace();  
  227. //                  }  
  228. //              }  
  229. //          }  
  230. //      }).start();  
  231.     }  
  232.   
  233.     @Override  
  234.     public void onDestroy() {  
  235.         setOnline(false);  
  236.            
  237.         try {  
  238.             ShopUtil.logger2("MQTTService destory");  
  239.             mqttClient.disconnect(0);  
  240.         } catch (MqttException e) {  
  241. //          Toast.makeText(getApplicationContext(),  
  242. //                  "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG)  
  243. //                  .show();  
  244.             e.printStackTrace();  
  245.               
  246.         }  
  247.         mqttClient = null;  
  248.         stopForeground(true);  
  249.         Intent intent = new Intent("com.dbjtech.waiqin.destroy");    
  250.         sendBroadcast(intent);   
  251.     }  
  252.   
  253.       
  254.   
  255.     public boolean isOnline() {  
  256.         return online;  
  257.     }  
  258.   
  259.     public void setOnline(boolean online) {  
  260.         this.online = online;  
  261.     }  
  262.     @Override  
  263.     public int onStartCommand(Intent intent, int flags, int startId) {  
  264.         flags = START_STICKY;  
  265.         return super.onStartCommand(intent, flags, startId);  
  266.     }  
  267. }  

 

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 人工智能 监控
|
4月前
|
消息中间件 BI Serverless
消息队列推出serverless版、Quick BI升级至5.0……阿里云近期产品动态汇总
消息队列推出serverless版、Quick BI升级至5.0……阿里云近期产品动态汇总
481 1
|
消息中间件 存储 运维
厚积薄发--一文带您了解阿里云 RocketMQ 轻量版消息队列(MNS)
MNS 重点聚焦在基准消息队列的核心能力建设,MNS 经过多年迭代与打磨,尽管内部极为复杂,但一直努力保持其在客户端的简单易用,围绕轻量和集成两个命题,着力建设更易用的消息队列产品。
1593 0
厚积薄发--一文带您了解阿里云  RocketMQ 轻量版消息队列(MNS)
|
8月前
|
消息中间件 缓存 大数据
消息队列和应用工具产品体系-阿里云消息队列产品简介
消息队列和应用工具产品体系-阿里云消息队列产品简介
1106 1
消息队列和应用工具产品体系-阿里云消息队列产品简介
|
12月前
|
消息中间件
《阿里云产品手册2022-2023 版》——消息队列
《阿里云产品手册2022-2023 版》——消息队列
398 0
|
12月前
|
消息中间件 Kafka
《阿里云产品手册2022-2023 版》——消息队列Kafka 版
《阿里云产品手册2022-2023 版》——消息队列Kafka 版
470 0
|
消息中间件 分布式计算 运维
阿里云消息队列 Kafka 生态集成的实践与探索
阿里云消息队列 Kafka 生态集成的实践与探索
1035 0
阿里云消息队列 Kafka 生态集成的实践与探索
|
消息中间件 SQL 存储
阿里云消息队列 Kafka-消息检索实践
本文章主要介绍消息队列使用过程中所遇到的消息丢失、重复消费等痛点问题的排查办法,以及消息队列 Kafka「检索组件」的场景实践,并对其关键技术进行解读。旨在帮助大家对消息队列 Kafka「检索组件」的特点和使用方式更加熟悉,以更有效地解决消息排查过程中所遇到的问题。
565 0
阿里云消息队列 Kafka-消息检索实践
|
消息中间件 Cloud Native 物联网
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
在混沌工程技术沙龙--金融行业精品专场的分布式系统稳定性评估体系获奖名单中,阿里云分布式消息队列服务成为通过首批消息队列服务稳定性认证,荣获最高级别 “先进级” 认证的消息队列服务。
460 0
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
|
消息中间件 存储 运维
阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
RocketMQ5.0 的发布标志着阿里云消息从消息领域正式迈向了“消息、事件、流”场景大融合的新局面。未来阿里云消息产品的演进也将继续围绕消息、事件、流核心场景而开展。
1085 1
阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台