activeMQ 推送之mqtt客户端

简介:

使用activeMQ进行Android推送

activeMQ下载地址:http://activemq.apache.org/download.html

下载后是一个压缩包:apache-activemq-5.9.0-bin.zip

启动方式:

解压缩,进入apache-activemq-5.9.0-bin\apache-activemq-5.9.0\bin,双击activemq.bat,即可启动activeMQ服务

 启动之后:

 android客户端推送采用mqtt(paho-mqtt-client-1.0.1.jar),依赖包见附件

 

但是为了测试,我写了一个swing图形界面,充当手机客户端,依赖的jar包仍然是paho-mqtt-client-1.0.1.jar.界面如下:

使用方法:点击[启动]按钮,开始接收推送消息
 对应的主类是:
MqttSwing,用于接收推送消息.

 

我还写了一个发送推送消息的swing图形界面,充当推送后管系统,界面如下:

使用方法:点击[连接]按钮,才可以发送推送消息
 对应的主类:PusherApp,用于发送推送消息.

核心代码介绍如下.

客户端连接activeMQ,建立连接(只有建立连接,才能接收到推送消息)

方法名:connect,做了两件事:(1)建立连接;(2)订阅主题(topic)

 

Java代码   收藏代码
  1. /*** 
  2.      * 客户端和activeMQ服务器建立连接 
  3.      * @param BROKER_URL 
  4.      * @param clientId : 用于标识客户端,相当于ios中的device token 
  5.      * @param TOPIC 
  6.      * @param isCleanSession :false--可以接受离线消息; 
  7.      * @return 是否启动成功  
  8.      */  
  9.     private boolean connect(String BROKER_URL,String clientId,String TOPIC,boolean isCleanSession){  
  10.         try {  
  11.             ComponentUtil.appendResult(resultTextPane, "connect time:"+TimeHWUtil.getCurrentMiniuteSecond(), true);  
  12.             mqttClient = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());  
  13.             MqttConnectOptions options= new MqttConnectOptions();  
  14.             options.setCleanSession(isCleanSession);//mqtt receive offline message  
  15.             ComponentUtil.appendResult(resultTextPane, "isCleanSession:"+isCleanSession, true);  
  16.             options.setKeepAliveInterval(30);  
  17.             //推送回调类,在此类中处理消息,用于消息监听  
  18.             mqttClient.setCallback(new MyCallBack(MqttSwing.this));  
  19.             boolean isSuccess=false;  
  20.             try {  
  21.                 mqttClient.connect(options);//CLIENT ID CAN NOT BE SAME  
  22.                 isSuccess=true;  
  23.             } catch (Exception e) {  
  24.                 if(isPrintException){  
  25.                     e.printStackTrace();  
  26.                 }  
  27.             }  
  28.             if(!isSuccess){  
  29.                 String message="连接失败,请检查client id是否重复了 或者activeMQ是否启动";  
  30.                 ComponentUtil.appendResult(resultTextPane, message, true);  
  31.                 GUIUtil23.warningDialog(message);  
  32.                 return false;  
  33.             }else{  
  34.             //Subscribe to topics   
  35.                 mqttClient.subscribe(new String[]{TOPIC,clientId});  
  36.                  
  37.                 System.out.println("topic:"+TOPIC+",  "+(clientId));  
  38.                 ComponentUtil.appendResult(resultTextPane, "TOPIC:"+TOPIC+",  "+(clientId), true);  
  39.             }  
  40.   
  41.         } catch (MqttException e) {  
  42.             if(isPrintException){  
  43.             e.printStackTrace();}  
  44.             GUIUtil23.errorDialog(e.getMessage());  
  45.             return false;  
  46.         }  
  47.         return true;  
  48.     }  

 

 

推送消息到来时的回调类:MyCallBack

 

Java代码   收藏代码
  1. package com.mqtt.hw.callback;  
  2.   
  3. import org.apache.commons.lang.StringEscapeUtils;  
  4. import org.eclipse.paho.client.mqttv3.MqttCallback;  
  5. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  7. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  8.   
  9. import com.mqtt.hw.MqttSwing;  
  10. import com.time.util.TimeHWUtil;  
  11.   
  12. public class MyCallBack implements MqttCallback {  
  13.     private MqttSwing mqttSwing;  
  14.       
  15.       
  16.       
  17.     public MyCallBack(MqttSwing mqttSwing) {  
  18.         super();  
  19.         this.mqttSwing = mqttSwing;  
  20.     }  
  21.   
  22.     @Override  
  23.     public void connectionLost(Throwable cause) {  
  24.           
  25.     }  
  26.   
  27.     @Override  
  28.     public void messageArrived(MqttTopic topic, MqttMessage message)  
  29.             throws Exception {  
  30.         System.out.println("messageArrived...."+TimeHWUtil.getCurrentMiniuteSecond());  
  31.         String messageStr=StringEscapeUtils.unescapeHtml(new String(message.getPayload()));  
  32.         System.out.println("message:"+messageStr);  
  33.         this.mqttSwing.receiveMessage(messageStr);  
  34.         //使窗口处于激活状态  
  35.   
  36.     }  
  37.   
  38.     @Override  
  39.     public void deliveryComplete(MqttDeliveryToken token) {  
  40.           
  41.     }  
  42.   
  43. }  

 

 

推送者与activeMQ建立连接:

 

Java代码   收藏代码
  1. /** 
  2.      * 初始化connection和session 
  3.      *  
  4.      * @throws Exception 
  5.      */  
  6.     private void init(/* String mqIp,boolean transacted */throws Exception {  
  7.         if (!DialogUtil.verifyTFEmpty(serverIpTextField, "服务器ip")) {  
  8.             return;  
  9.         }  
  10.         String transactedStr = transactedTextField.getText();  
  11.         boolean transacted = false;  
  12.         if (ValueWidget.isNullOrEmpty(transactedStr)) {  
  13.             transacted = false;  
  14.         } else {  
  15.             transacted = Boolean.parseBoolean(transactedStr);  
  16.         }  
  17.         String message = "transacted:" + transacted;  
  18.         ComponentUtil.appendResult(resultTextArea, message, true);  
  19.         System.out.println(message);  
  20.         String brokerUrl = String.format(BROKER_URL,  
  21.                 serverIpTextField.getText());  
  22.         // 创建链接工厂  
  23.         TopicConnectionFactory factory = new ActiveMQConnectionFactory(  
  24.                 ActiveMQConnection.DEFAULT_USER,  
  25.                 ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);  
  26.         ComponentUtil.appendResult(resultTextArea, "url:" + brokerUrl, true);  
  27.         // 通过工厂创建一个连接  
  28.           
  29.         connection = factory.createTopicConnection();  
  30.         // 启动连接  
  31.         connection.start();  
  32.         ComponentUtil.appendResult(resultTextArea, "启动connection 成功"true);  
  33.         // 创建一个session会话 transacted  
  34.         session = connection.createTopicSession(  
  35.                 transacted /* Boolean.FALSE */, Session.AUTO_ACKNOWLEDGE);  
  36.   
  37.     }  

 

 

项目源代码见附件mqtt_swing.zip

手机android客户端(测试推送)见附件android-mqtt-push-master.zip

也可以从 https://github.com/tokudu/AndroidPushNotificationsDemo

 

  下载

详细配置参阅附件mqtt推送详解.zip

 依赖的jar包

io0007-find_progess-0.0.8.4-SNAPSHOT.jar

io0007-find_progess-0.0.8.4-SNAPSHOT-sources.jar

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 Java Spring
RocketMQ-JAVA客户端不同版本接入方式
RocketMQ4.0 RocketMQ5.0 JAVA接入 spring springboot
RocketMQ-JAVA客户端不同版本接入方式
|
9月前
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
UE DTMqtt 虚幻引擎 Mqtt 客户端插件说明
329 0
|
8月前
|
安全 网络协议 物联网
不看后悔系列之一篇搞懂LinuxCentOS搭建MQTT服务器及客户端操作使用
linux CentOS上搭建MQTT服务器并不难,主要就是用到了mosquitto这款消息代理服务软件。其采用发布/订阅模式传输机制,轻量、简单、开放并易于实现,被广泛应用于物联网之中。
1376 0
|
1月前
|
Java Maven
【开源视频联动物联网平台】vertx写一个mqtt客户端
【开源视频联动物联网平台】vertx写一个mqtt客户端
32 1
|
4月前
|
消息中间件 运维 负载均衡
负载均衡中后端连了三个rabbitmq,如果挂了一个,客户端连接mq会变慢吗
在负载均衡中使用三个 RabbitMQ 实例,如果其中一个实例发生故障,可能会影响客户端连接到 RabbitMQ 的性能。具体影响取决于负载均衡的配置和客户端的实现方式。 如果负载均衡器能够及时检测到故障的 RabbitMQ 实例并将流量路由到正常的实例,那么客户端连接的性能影响可能较小。但如果负载均衡器不能迅速切换流量或者客户端实现不支持及时的连接故障转移,那么可能会导致客户端连接的延迟或失败。 在设计这样的架构时,有一些考虑因素: 1. **健康检查和故障切换:** 确保负载均衡器能够定期检查 RabbitMQ 实例的健康状态,并在出现故障时快速将流量切换到其他正常的实例。 2.
|
5月前
|
消息中间件
RabbitMQ客户端清空所有消息
RabbitMQ客户端清空所有消息
164 0
|
5月前
|
传感器 JavaScript 物联网
如何在Node.js中使用MQTT客户端库?
如何在Node.js中使用MQTT客户端库?
67 0
|
5月前
|
物联网 Python
如何通过示例在Python中使用Paho MQTT客户端?
如何通过示例在Python中使用Paho MQTT客户端?
76 2
如何通过示例在Python中使用Paho MQTT客户端?
|
5月前
|
存储 传感器 物联网
MQTT 客户端和代理连接如何工作?
MQTT 客户端和代理连接如何工作?
107 2
MQTT 客户端和代理连接如何工作?
|
7月前
|
消息中间件 SQL 弹性计算
RocketMQ中使用Java客户端发送消息和消费的应用
本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示
500 1