Redis学习笔记~分布式的Pub/Sub模式

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

redis的客户端有很多,这次用它的pub/sub发布与订阅我选择了StackExchange.Redis,发布与订阅大家应该很清楚了,首先一个订阅者,订阅一个服务,服务执行一些处理程序(可能是写个日志,插入个数据,发个email)然后当另一个项目的某个业务发布这个服务后,被订阅的程序将会被执行,这个听起来很有意思,redis有好的实现了这个pub/sub功能。

看一下结构图

 

Sub订阅(消息消费者)

对于订阅方,这里类似于一个服务,它会长期运行着,被启动后动态订阅一些服务进来,以便以后再被其它服务调用

        //sub a function in A-project
            PubSubManager.Instance.Subscribe("UserLog", (msg) =>
            {
                //订阅者处理自己的业务逻辑,这相关于队列服务要干的事
                Console.WriteLine(msg);
            });

Pub发布(消息生产者)

而对于其它项目,如A网站,它可能在被在POST动作后发布这个UserLog的服务,这时上面的订阅方将会消费它(消费者模式),或者服务代码被执行!

     [HttpPost]
        public ActionResult Index(string user)
        {
            PubSubManager.Instance.Publish("UserLog", user + "这个用户提交表单了");
            return View();
        }

对于一直运行的服务,将会收到来自不同项目的消息,而它只负责消费它!

Lind.DDD.PublishSubscribe

封装了一些标准的pub/sub方法,它可以有多种实现方法,本例使用redis这个中间件

   /// <summary>
    /// 发布订阅的接口规则
    /// </summary>
    public interface IPubSub
    {
        /// <summary>
        /// 发布,有顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void Publish(string channel, string value);
        /// <summary>
        /// 订阅,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void Subscribe(string channel, Action<string> action);
        /// <summary>
        /// 异步发布,无顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishAsync(string channel, string value);
        /// <summary>
        /// 异步订阅,无顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeAsync(string channel, Action<string> action);

        /// <summary>
        /// 发布,有顺序,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishByte(string channel, byte[] value);
        /// <summary>
        /// 订阅,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeByte(string channel, Action<byte[]> action);
        /// <summary>
        /// 异步发布,有顺序,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishByteAsync(string channel, byte[] value);
        /// <summary>
        /// 异步订阅,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeByteAsync(string channel, Action<byte[]> action);

        /// <summary>
        /// 发布,有顺序,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void Publish<T>(string channel, T value);
        /// <summary>
        /// 订阅,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void Subscribe<T>(string channel, Action<T> action);
        /// <summary>
        /// 异步发布,有顺序,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishAsync<T>(string channel, T value);
        /// <summary>
        /// 异步订阅,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeAsync<T>(string channel, Action<T> action);
        /// <summary>
        /// 取消指定订阅
        /// </summary>
        /// <param name="channel"></param>
        void UnSubscribe(string channel);
        /// <summary>
        /// 取消所有订阅
        /// </summary>
        /// <param name="channel"></param>
        void UnSubscribeAll();
    }

 本文转自博客园张占岭(仓储大叔)的博客,原文链接:Redis学习笔记~分布式的Pub/Sub模式,如需转载请自行联系原博主。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
9天前
|
NoSQL Java 关系型数据库
【Redis系列笔记】分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。 分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
31 2
|
4天前
|
监控 NoSQL 算法
探秘Redis分布式锁:实战与注意事项
本文介绍了Redis分区容错中的分布式锁概念,包括利用Watch实现乐观锁和使用setnx防止库存超卖。乐观锁通过Watch命令监控键值变化,在事务中执行修改,若键值被改变则事务失败。Java代码示例展示了具体实现。setnx命令用于库存操作,确保无超卖,通过设置锁并检查库存来更新。文章还讨论了分布式锁存在的问题,如客户端阻塞、时钟漂移和单点故障,并提出了RedLock算法来提高可靠性。Redisson作为生产环境的分布式锁实现,提供了可重入锁、读写锁等高级功能。最后,文章对比了Redis、Zookeeper和etcd的分布式锁特性。
36 16
探秘Redis分布式锁:实战与注意事项
|
5天前
|
NoSQL Java 大数据
介绍redis分布式锁
分布式锁是解决多进程在分布式环境中争夺资源的问题,与本地锁相似但适用于不同进程。以Redis为例,通过`setIfAbsent`实现占锁,加锁同时设置过期时间避免死锁。然而,获取锁与设置过期时间非原子性可能导致并发问题,解决方案是使用`setIfAbsent`的超时参数。此外,释放锁前需验证归属,防止误删他人锁,可借助Lua脚本确保原子性。实际应用中还有锁续期、重试机制等复杂问题,现成解决方案如RedisLockRegistry和Redisson。
|
6天前
|
缓存 NoSQL Java
【亮剑】如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
|
7天前
|
NoSQL Redis 微服务
分布式锁_redis实现
分布式锁_redis实现
|
9天前
|
存储 NoSQL Java
基于Redis实现分布式锁
基于Redis实现分布式锁
33 0
|
10天前
|
监控 NoSQL Redis
Redis的哨兵模式详解
Redis的哨兵模式详解
27 0
|
10天前
|
NoSQL Java Redis
Redis入门到通关之分布式锁Rediision
Redis入门到通关之分布式锁Rediision
12 0
|
27天前
|
NoSQL Java Redis
redis分布式锁
redis分布式锁
|
2月前
|
缓存 NoSQL Java
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson(一)
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson
59 0