Redis的Pub/Sub模式

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

Redis同样支持消息的发布/订阅(Pub/Sub)模式,这和中间件activemq有些类似。订阅者(Subscriber)可以订阅自己感兴趣的频道(Channel),发布者(Publisher)可以将消息发往指定的频道(Channel),正式通过这种方式,可以将消息的发送者和接收者解耦。另外,由于可以动态的Subscribe和Unsubscribe,也可以提高系统的灵活性和可扩展性。

关于如何搭建Redis环境,请参考其他文章。这里假设有一个可用的Redis环境(单节点和集群均可)。

在redis-cli中使用Pub/Sub

普通channel的Pub/Sub

先用一个客户端来订阅频道:

190630_caCj_1434710.png

上图中先使用redis-cli作为客户端连接了Redis,之后使用了SUBSCRIBE命令,后面的参数表示订阅了china和hongkong两个channel。可以看到"SUBSCRIBE china hongkong"这条命令的输出是6行(可以分为2组,每一组是一个Message)。因为订阅、取消订阅的操作跟发布的消息都是通过消息(Message)的方式发送的,消息的第一个元素就是消息类型,它可以是以下几种类型:

subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.

unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.

message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.

--from http://redis.io/topics/pubsub

上图的订阅命令将使得发往这两个channel的消息会被这个客户端接收到。需要注意的是,redis-cli客户端在进入subscribe模式以后,将不能再响应其他的任何命令

A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.

The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT

--from http://redis.io/topics/pubsub

官网说客户端在subscribe下除了可以使用以上命令外,不能使用其他命令了。但是本人在Subscribe状态下使用上述几个命令,根本没反应。也就是说,使用redis-cli订阅channel后,该客户端将不能响应任何命令。除非按下(ctrl+c),但该操作不是取消订阅,而是退出redis-cli,此时将回到shell命令行下。

关于这个情况,我在官网上没有找到对这种情况的解释,也有不少的人在网上问,找来找去,本人觉得还算合理的解释是:

On this page: http://redis.io/commands/subscribe applies only to those clients.

The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.

Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).

--from http://stackoverflow.com/questions/17621371/redis-unsubscribe

就是说,官网中说明的client,并不包含这里使用的redis-cli,于是它可以和其他的client有不同表现。(先不纠结这个问题,稍后再用jedis来测试一下。)

接下来再用一个客户端来发布消息:

191249_606c_1434710.png

可以看到,新的一个客户端使用PUBLISH命令往china频道发布了一条叫"China News"的消息,接下来再看看订阅端:

191610_UfBn_1434710.png

可以看见,这条消息已经被接收到了。可以看到,收到的消息中第一个参数是类型"message",第二个参数是channel名字"china",第三个参数是消息内容"China News",这和开始说的message类型的结构一致。

通配符的Pub/Sub

Redis还支持通配符的订阅和发布。客户端可以订阅满足一个或多个规则的channel消息,相应的命令是PSUBSCRIBE和PUNSUBSCRIBE。接下来我们再用另一个redis-cli客户端来订阅"chi*"的channel,如图:

090447_IiDo_1434710.png

和subscribe/unsubscribe的输出类似,可以看到第一部分是消息类型“psubscribe”,第二部分是订阅的规则“chi*”,第三部分则是该客户端目前订阅的所有规则个数。

接下来再发布一条消息到china这个channel中,此时,两个订阅者应该都能收到该消息:

092107_m6bt_1434710.png

实际测试结果跟预期相同。需要注意的是,订阅者2通过通配符订阅的,收到的消息类型是“pmessage”:

pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.

--from http://redis.io/topics/pubsub

第二部分是匹配的模式“chi*”,第三部分是实际的channel名字“china”,第四部分是消息内容“China Daily”。

我们再发布一条消息到chinnna中,此时只有订阅者2能接收到消息了:

092940_bfIu_1434710.png

同样,在使用PSUBSCRIBE进入订阅模式以后,该redis-cli也不能再监听其他任何的命令,要退出该模式,只能使用ctrl+c。

使用Jedis实现Pub/Sub

Jedis是Redis客户端的一种Java实现,在http://redis.io/clients#java中也能找到。

这里使用maven来管理包的依赖,由于使用了Log4j来输出日志,因此会用到log4j的jar包:

?
1
2
3
4
5
6
7
8
9
10
< dependency >
     < groupId >redis.clients</ groupId >
     < artifactId >jedis</ artifactId >
     < version >2.8.0</ version >
</ dependency >
< dependency >
     < groupId >log4j</ groupId >
     < artifactId >log4j</ artifactId >
     < version >1.2.17</ version >
</ dependency >

Jedis中的JedisPubSub抽象类提供了订阅和取消的功能。想处理订阅和取消订阅某些channel的相关事件,我们得扩展JedisPubSub类并实现相关的方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package  com.demo.redis;
 
import  org.apache.log4j.Logger;
import  redis.clients.jedis.JedisPubSub;
 
public  class  Subscriber  extends  JedisPubSub { //注意这里继承了抽象类JedisPubSub
 
     private  static  final  Logger LOGGER = Logger.getLogger(Subscriber. class );
 
     @Override
     public  void  onMessage(String channel, String message) {
         LOGGER.info(String.format( "Message. Channel: %s, Msg: %s" , channel, message));
     }
 
     @Override
     public  void  onPMessage(String pattern, String channel, String message) {
         LOGGER.info(String.format( "PMessage. Pattern: %s, Channel: %s, Msg: %s"
             pattern, channel, message));
     }
 
     @Override
     public  void  onSubscribe(String channel,  int  subscribedChannels) {
         LOGGER.info( "onSubscribe" );
     }
 
     @Override
     public  void  onUnsubscribe(String channel,  int  subscribedChannels) {
         LOGGER.info( "onUnsubscribe" );
     }
 
     @Override
     public  void  onPUnsubscribe(String pattern,  int  subscribedChannels) {
         LOGGER.info( "onPUnsubscribe" );
     }
 
     @Override
     public  void  onPSubscribe(String pattern,  int  subscribedChannels) {
         LOGGER.info( "onPSubscribe" );
     }
}

有了订阅者,我们还需要一个发布者:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package  com.demo.redis;
 
import  java.io.BufferedReader;
import  java.io.IOException;
import  java.io.InputStreamReader;
import  org.apache.log4j.Logger;
import  redis.clients.jedis.Jedis;
 
public  class  Publisher {
 
     private  static  final  Logger LOGGER = Logger.getLogger(Publisher. class );
     private  final  Jedis publisherJedis;
     private  final  String channel;
 
     public  Publisher(Jedis publisherJedis, String channel) {
         this .publisherJedis = publisherJedis;
         this .channel = channel;
     }
 
     /**
      * 不停的读取输入,然后发布到channel上面,遇到quit则停止发布。
      */
     public  void  startPublish() {
         LOGGER.info( "Type your message (quit for terminate)" );
         try  {
             BufferedReader reader =  new  BufferedReader( new  InputStreamReader(System.in));
             while  ( true ) {
                 String line = reader.readLine();
                 if  (! "quit" .equals(line)) {
                     publisherJedis.publish(channel, line);
                 else  {
                     break ;
                 }
             }
         catch  (IOException e) {
             LOGGER.error( "IO failure while reading input" , e);
         }
     }
}

为简单起见,这个发布者接收控制台的输入,然后将输入的消息发布到指定的channel上面,如果输入quit,则停止发布消息。

接下来是主函数:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package  com.demo.redis;
 
import  org.apache.log4j.Logger;
import  redis.clients.jedis.Jedis;
import  redis.clients.jedis.JedisPool;
import  redis.clients.jedis.JedisPoolConfig;
 
public  class  Program {
     
     public  static  final  String CHANNEL_NAME =  "MyChannel" ;
     //我这里的Redis是一个集群,192.168.56.101和192.168.56.102都可以使用
     public  static  final  String REDIS_HOST =  "192.168.56.101" ;
     public  static  final  int  REDIS_PORT =  7000 ;
     
     private  final  static  Logger LOGGER = Logger.getLogger(Program. class );
     private  final  static  JedisPoolConfig POOL_CONFIG =  new  JedisPoolConfig();
     private  final  static  JedisPool JEDIS_POOL = 
             new  JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT,  0 );
     
     public  static  void  main(String[] args)  throws  Exception {
         final  Jedis subscriberJedis = JEDIS_POOL.getResource();
         final  Jedis publisherJedis = JEDIS_POOL.getResource();
         final  Subscriber subscriber =  new  Subscriber();
         //订阅线程:接收消息
         new  Thread( new  Runnable() {
             public  void  run() {
                 try  {
                     LOGGER.info( "Subscribing to \"MyChannel\". This thread will be blocked." );
                     //使用subscriber订阅CHANNEL_NAME上的消息,这一句之后,线程进入订阅模式,阻塞。
                     subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
                     
                     //当unsubscribe()方法被调用时,才执行以下代码
                     LOGGER.info( "Subscription ended." );
                 catch  (Exception e) {
                     LOGGER.error( "Subscribing failed." , e);
                 }
             }
         }).start();
         
         //主线程:发布消息到CHANNEL_NAME频道上
         new  Publisher(publisherJedis, CHANNEL_NAME).startPublish();
         publisherJedis.close();
         
         //Unsubscribe
         subscriber.unsubscribe();
         subscriberJedis.close();
     }
}

主类Program中定义了channel名字、连接redis的地址和端口,并使用JedisPool来获取Jedis实例。由于订阅者(subscriber)在进入订阅状态后会阻塞线程,因此新起一个线程(new Thread())作为订阅线程,并是用主线程来发布消息。待发布者(类中的new Publisher)停止发布消息(控制台中输入quit即可)时,解除订阅者的订阅(subscriber.unsubscribe()方法)。此时订阅线程解除阻塞,打印结束的日志并退出。

运行程序之前,还需要一个简单的log4j配置以观察输出:

?
1
2
3
4
5
log4j.rootLogger=INFO,stdout
 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n

运行Program,以下是执行结果:

121749_U4aS_1434710.png

从结果看,当订阅者订阅后,订阅线程阻塞,主线程中的Publisher接收输入后,发布消息到MyChannel中,此时订阅该channel的订阅者收到消息并打印。


Jedis源码简要分析

关于使用UNSUBSCRIBE

开始使用redis-cli时,在subscriber进入监听状态后,并不能使用UNSUBSCRIBE和PUNSUBSCRIBE命令,现在在Jedis中,在订阅线程阻塞时,通过在main线程中调用改subscriber的unsubscribe()方法来解除阻塞。查看Jedis源码,其实该方法也就是给redis发送了一个UNSUBSCRIBE命令而已:

122631_acTO_1434710.png

因此这里是支持在“客户端”使用UNSUBSCRIBE命令的。

关于订阅者接收消息

在接收消息前,需要订阅channel,订阅完成之后,会执行一个循环,这个循环会一直阻塞,直到该Client没有订阅数为止,如下图:

115524_AhEW_1434710.png

中间省略的其他行,主要是用于解析收到的Redis响应,这段代码也是根据响应的第一部分确定响应的消息类型,然后挨个解析响应的后续内容,最后根据解析到消息类型,并使用后续解析到的内容作为参数来回调相应的方法,省略的内容如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final  byte [] resp = ( byte []) firstObj;
if  (Arrays.equals(SUBSCRIBE.raw, resp)) {
   subscribedChannels = ((Long) reply.get( 2 )).intValue();
   final  byte [] bchannel = ( byte []) reply.get( 1 );
   final  String strchannel = (bchannel ==  null ) ?  null  : SafeEncoder.encode(bchannel);
   //调用onSubscribe方法,该方法在我们的Subscriber类中实现
   onSubscribe(strchannel, subscribedChannels);
else  if  (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
   subscribedChannels = ((Long) reply.get( 2 )).intValue();
   final  byte [] bchannel = ( byte []) reply.get( 1 );
   final  String strchannel = (bchannel ==  null ) ?  null  : SafeEncoder.encode(bchannel);
   //调用onUnsubscribe方法,该方法在我们的Subscriber类中实现
   onUnsubscribe(strchannel, subscribedChannels);
else  if  (Arrays.equals(MESSAGE.raw, resp)) {
   final  byte [] bchannel = ( byte []) reply.get( 1 );
   final  byte [] bmesg = ( byte []) reply.get( 2 );
   final  String strchannel = (bchannel ==  null ) ?  null  : SafeEncoder.encode(bchannel);
   final  String strmesg = (bmesg ==  null ) ?  null  : SafeEncoder.encode(bmesg);
   //调用onMessage方法,该方法在我们的Subscriber类中实现
   onMessage(strchannel, strmesg);
else  if  (Arrays.equals(PMESSAGE.raw, resp)) {
   final  byte [] bpattern = ( byte []) reply.get( 1 );
   final  byte [] bchannel = ( byte []) reply.get( 2 );
   final  byte [] bmesg = ( byte []) reply.get( 3 );
   final  String strpattern = (bpattern ==  null ) ?  null  : SafeEncoder.encode(bpattern);
   final  String strchannel = (bchannel ==  null ) ?  null  : SafeEncoder.encode(bchannel);
   final  String strmesg = (bmesg ==  null ) ?  null  : SafeEncoder.encode(bmesg);
   //调用onPMessage方法,该方法在我们的Subscriber类中实现
   onPMessage(strpattern, strchannel, strmesg);
else  if  (Arrays.equals(PSUBSCRIBE.raw, resp)) {
   subscribedChannels = ((Long) reply.get( 2 )).intValue();
   final  byte [] bpattern = ( byte []) reply.get( 1 );
   final  String strpattern = (bpattern ==  null ) ?  null  : SafeEncoder.encode(bpattern);
   onPSubscribe(strpattern, subscribedChannels);
else  if  (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
   subscribedChannels = ((Long) reply.get( 2 )).intValue();
   final  byte [] bpattern = ( byte []) reply.get( 1 );
   final  String strpattern = (bpattern ==  null ) ?  null  : SafeEncoder.encode(bpattern);
   //调用onPUnsubscribe方法,该方法在我们的Subscriber类中实现
   onPUnsubscribe(strpattern, subscribedChannels);
else  {
   //对于其他Redis没有定义的返回消息类型,则直接报错
   throw  new  JedisException( "Unknown message type: "  + firstObj);
}

以上就是为什么我们需要在Subscriber中实现这几个方法的原因了(这些方法并不是抽象的,可以选择实现使用到的方法)。


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
缓存 NoSQL Java
flea-cache使用之Redis集群模式接入
【1月更文挑战第2天】本篇博文介绍笔者 flea-framework 下的 flea-cache 模块中的Redis集群模式接入
41 1
flea-cache使用之Redis集群模式接入
|
3月前
|
存储 监控 NoSQL
Redis 高可用之主从模式
上一节RDB和AOF持久化机制提到了 Redis 的持久性,也就是在服务器实例宕机或故障时,拥有再恢复的能力。但是在这个服务器实例宕机恢复期间,是无法接受新的数据请求。对于整体服务而言这是无法容忍的,因此我们可以使用多个服务器实例,在一个实例宕机中断时,另外的服务器实例可以继续对外提供服务,从而不中断业务。Redis 是如何做的呢?Redis 做法是**增加冗余副本**,**将一份数据同时保存在多个实例**上。那么如何保存各个实例之间的数据一致性呢?
45 0
Redis 高可用之主从模式
|
2月前
|
消息中间件 存储 NoSQL
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
63 0
|
5月前
|
存储 监控 NoSQL
redis主从模式,redis哨兵模式,redis集群模式
redis主从模式,redis哨兵模式,redis集群模式
redis主从模式,redis哨兵模式,redis集群模式
QGS
|
6月前
|
NoSQL 网络协议 Redis
Redis7配置哨兵模式(一主二从三哨兵)
Redis7配置哨兵模式(一主二从三哨兵)
QGS
283 1
|
3月前
|
消息中间件 NoSQL Java
硬核 | Redis Pub/Sub 发布订阅与宅男有什么关系?
硬核 | Redis Pub/Sub 发布订阅与宅男有什么关系?
50 0
|
3月前
|
存储 负载均衡 NoSQL
Redis的集群模式是什么?它的优点和缺点是什么?
Redis的集群模式是什么?它的优点和缺点是什么?
50 0
|
3月前
|
NoSQL Java Redis
Redis的发布-订阅模式是什么?它的应用场景是什么?
Redis的发布-订阅模式是什么?它的应用场景是什么?
55 0
|
3月前
|
存储 NoSQL 数据库连接
Redis主从模式以及数据同步原理:全量数据同步、增量数据同步
Redis主从模式以及数据同步原理:全量数据同步、增量数据同步
166 0
|
3月前
|
缓存 NoSQL Java
flea-cache使用之Redis分片模式接入
【1月更文挑战第1天】本篇介绍 Huazie 的Flea框架下的 flea-cache模块中 Redis分片模式接入和使用
83 1
flea-cache使用之Redis分片模式接入

热门文章

最新文章