Redis - 探索更多可能

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: ## 概述 对 Redis 的印象可能很多人都还只停留在 2.8 的阶段,一个结构化的内存存储(嗯,好像也没什么问题)。虽然距离 4.0 发布(2017.7.14)已经一年过去了,但相信很多人已经不再去关心 Redis 的新特性了,因为从 2.8 后的 Redis 已经足够好用了。? Redis 3.0 添加了集群的能力,4.0 添加了模块化能力,5.0添加了流类型。如果说 3.0

概述

对 Redis 的印象可能很多人都还只停留在 2.8 的阶段,一个结构化的内存存储(嗯,好像也没什么问题)。虽然距离 4.0 发布(2017.7.14)已经一年过去了,但相信很多人已经不再去关心 Redis 的新特性了,因为从 2.8 后的 Redis 已经足够好用了。?

Redis 3.0 添加了集群的能力,4.0 添加了模块化能力,5.0添加了流类型。如果说 3.0 和 4.0 添加的新特性对于一般用户来说无足轻重,那5.0新的流类型就不可忽视啦!

在没有 Stream 类型之前,其实 Redis 也支持各种类似于流的处理模式,例如 Fire and forget 模式的 Pub/Sub,阻塞队列 BLPOP,时间序列 zsort 存储,等各种方式都能模拟类似的场景,但却都觉得有点欠缺,终于,流类型成功的解决了以上所有问题,并能支持其他的常见使用场景。

说到流消息就不得不说到 Kafka 啦,我相信大家应该都听说过消息中间件 Kafka,至于 RocketMQ 或者 MetaQ 就不再赘述他们与 Kafka 的关系啦,Redis 作者在实现流类型时大量参考了 Kafka 中的概念,例如消费模型,流消息的概念。当然所有的参考只局限于 Kafka 的文档,与 Kafka 的代码实现没有任何关系哦。

心动不如心动,那先一睹为快吧。

环境准备

如果你是 macOs 用户,并且安装了 brew(如果没有安装,那建议先安装 /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"),那么只需要

brew install redis

即可,如果你是非 macOs 用户,那要嘛考虑换 mac,要嘛使用 docker 启动

docker run --rm -it -p 6379:6379 -v $PWD:/data --name redis redis:alpine
# 题外话: 使用 alpine 更小更省心,强烈推荐,有任何使用问题都可以交流哦,至于有多好,在这里怕是说不完。

# 验证安装的版本
docker exec redis redis-cli info server
# 客户端链接
docker exec -it redis redis-cli

一切准备就绪,就开始实践吧。既然流是新的数据类型,那我们就先从支持的操作开始吧。

命令列表

Stream 类型一共支持 13 个命令,这里简单列举一下支持的命令。

命令 功能概述
xinfo 获取消费者,分组和流信息
xadd 添加消息到流
xtrim 将流重置为指定大小
xdel 通过 ID 删除
xrange 返回范围内的消息,特殊起始 + -
xrevrange 与 xrange 相同,但返回顺序相反
xlen 获取流长度
xread 从流中指定 id 开始读取指定量消息,可选择阻塞返回
xgroup 管理消费组
xreadgroup 以订阅组成员的身份读取流消息 - 即订阅/消费消息
xack 响应消息被正确处理
xpending 查询正在处理中的消息 - 尚未 ACK 的消息
xclaim 获取正在处理中的消息

在开始之前,简单概述一下使用过程中需要注意的点

  1. 流消息内容是字典 - 即 KV 结构
  2. 每个消息有一个 ID - 128bit - 由时间戳和序列号组成
  3. 插入消息时使用 * 作为 ID 则是由服务端生成 ID
  4. ID 必须递增
  5. -/+ 分别表示最小和最大消息 ID
  6. $ 表示最新的消息位置,在创建消费组时使用
  7. > 表示最新消费的消息位置,在消费消息时使用

命令交互

# 往流中添加消息 - 会返回消息 ID
# 手动指定 ID
xadd s 1-0 name wener age 18
xadd s 1-1 name wen age 17
# 由服务端生成消息 ID
xadd s * name xx age 16
# 返回所有消息
xrange s - +
# 返回第一条
xrange s - + count 1
# 返回最后一条
xrevrange s + - count 1
# 返回消息长度
xlen s

# 读第一条消息
xread count 1 streams s1 0-0
# 读取第二条 - 指定的消息 ID 是 1-0 ,会返回这个 ID 之后的消息
xread count 1 streams s1 1-0

# 模拟消息的 Roling 处理
# ------
del s
# 在插入消息时,可限制消息的最大长度,类似于 rolling 日志文件的逻辑
# 逻辑等同于先 add 再 trim
# 插入时限制最大长度 2
xadd s MAXLEN 2 * ts 1
xadd s MAXLEN 2 * ts 2
xadd s MAXLEN 2 * ts 3
xadd s MAXLEN 2 * ts 4
# 流中只会有 3 4 这两条消息
xrange s - + 

# 消费组
# ======
# 重置流内容
del s
# 创建消费组 g1 并将消费位置置为最新消息位置 $
# 因为 s 不存在,指定 MKSTREAM 会自动创建一个空的流 s
xgroup create s g1 $ MKSTREAM
# 添加新的消息
xadd s 1-1 name zz age 16
# 会返回最新插入的消息,当前消费者为 c1
xreadgroup group g1 c1 count 1 streams s >
# 当消息处理完成后对服务端进行响应
xack s g1 1-1

# 模拟消息处理失败场景
# ------
# 添加新的消息
xadd s 1-2 name aa age 16
# 由 c2 消费
xreadgroup group g1 c2 count 1 streams s >
# 但在处理过程中异常,未 ACK,此时通过 pending 查看 c2 堆积的消息
xpending s g1 - + 1 c2
# c1 有能力处理,因此可以将 c2 处理失败的消息拿过来处理
# retrycount 由应用自己维护,记录重试次数
# 500 为表示该消息的处理时间超过 500ms 才能“拿”过来
xclaim s g1 c1 500 1-2 retrycount 2
# c1 成功处理该消息
xack s g1 1-2

Stream 的操作相当简介,能实现什么样的功能主要取决于业务的设计。使用 cli 完成了基本的操作再来看看 Java 的操作吧。

Java 交互

lettuce 是一个基于 Netty 的异步 Redis 客户端,在最新版中支持了 Stream 的操作。

生产和消费

public void stream() throws InterruptedException {
    RedisClient client = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = client.connect();
    // 流的名字
    String streamName = "s";
    // 消费组名
    String groupName = "g1";

    AtomicInteger counter = new AtomicInteger();
    // 总消息量
    long total = 1000000;
    // 并发生产
    int producerCount = 2;
    // 并发消费
    int consumerCount = 4;
    for (int i = 0; i < producerCount; i++) {
        int id = i;
        CompletableFuture.runAsync(() -> {
            String name = "producer." + id;
            StatefulRedisConnection<String, String> connect = client.connect();
            while (true) {
                int n = counter.incrementAndGet();
                if (n > total) {
                    return;
                }
                // 同步生产
                Timer.Context context = metrics.timer(name).time();
                connect
                        .sync()
                        .xadd(streamName, "ts", String.valueOf(System.currentTimeMillis()), "i", String.valueOf(n))
                ;
                context.close();
            }
        });
    }

    for (int i = 0; i < consumerCount; i++) {
        StatefulRedisConnection<String, String> connect = client.connect();

        // 消费的上下文
        ConsumerContext c = new ConsumerContext();
        c
                .setConnection(connect)
                .setConsumer(Consumer.from(groupName, "c" + i))
                .setStreamName(streamName)
                .setGroupName(groupName)
                .setName("consumer." + i)
                .setXReadArgs(XReadArgs.Builder.block(Duration.ofSeconds(5)))
                .setXreadLastOffset(XReadArgs.StreamOffset.lastConsumed(streamName))
        ;
        // 异步消费
        consume(c);
    }

    Thread.sleep(Duration.ofMinutes(10).toMillis());
}


private CompletionStage<?> consume(ConsumerContext c) {
    Timer.Context context = metrics.timer(c.name).time();
    return c.connection
            .async()
            .xreadgroup(c.consumer, c.xReadArgs, c.xreadLastOffset)
            // 消息处理
            .thenCompose(v -> {
                context.close();
                if (v.isEmpty()) {
                    metrics.meter(c.name + ".empty").mark();
                    return CompletableFuture.completedFuture(null);
                }
                StreamMessage<String, String> message = v.get(0);

                // 输出一定日志量
                if (ThreadLocalRandom.current().nextDouble() < 0.01) {
                    log.info("[{}] {}", c.name, message.getBody());
                }

                // 成功处理
                return c.connection.async().xack(c.streamName, c.groupName, message.getId());
            })
            // 异常处理
            .exceptionally(e -> {
                metrics.meter(c.name + ".error").mark();
                return null;
            })
            // 循环 - 没有推出逻辑
            .thenCompose((v) -> consume(c));
}


@Data
@Accessors(chain = true)
public static class ConsumerContext {
    String name;
    String streamName;
    String groupName;
    Consumer<String> consumer;

    StatefulRedisConnection<String, String> connection;

    XReadArgs.StreamOffset<String> xreadLastOffset;

    XReadArgs xReadArgs;
}

处理未成功的消息

同步操作,逻辑相对清晰

public void testClaimPendingSingleThreadSync() {
    RedisClient client = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = client.connect();
    String streamName = "s";
    String groupName = "g1";

    RedisCommands<String, String> sync = connection.sync();
    Consumer<String> consumer = Consumer.from(groupName, "c1");
    Range<String> fullRange = Range.create("-", "+");

    while (true) {
        try (Timer.Context ignored = metrics.timer(consumer.getName() + ".pending").time()) {
            PendingResult result = PendingResult.of(sync.xpending(streamName, consumer, fullRange, Limit.from(1)));

            if (!result.hasPending()) {
                break;
            }

            List<StreamMessage<String, String>> list = sync.xclaim(
                    streamName,
                    consumer,
                    new XClaimArgs().minIdleTime(500).retryCount(result.getDeliverCount() + 1),
                    result.getMessageId()
            );
            if (list.isEmpty()) {
                continue;
            }
            StreamMessage<String, String> message = list.get(0);
            if (ThreadLocalRandom.current().nextDouble() < 0.001) {
                log.info("[{}] {}", consumer.getName(), message.getBody());
            }
            sync.xack(streamName, groupName, message.getId());
        }
    }
}

某次的性能统计

Java CPU 70%
c1.pending
             count = 769903
         mean rate = 3360.66 calls/second
     1-minute rate = 3267.08 calls/second
     5-minute rate = 2855.18 calls/second
    15-minute rate = 2538.15 calls/second
               min = 0.22 milliseconds
               max = 1.80 milliseconds
              mean = 0.30 milliseconds
            stddev = 0.13 milliseconds
            median = 0.26 milliseconds
              75% <= 0.32 milliseconds
              95% <= 0.49 milliseconds
              98% <= 0.65 milliseconds
              99% <= 0.88 milliseconds
            99.9% <= 1.66 milliseconds

处理未成功的消息

异步操作,逻辑相对不那么清晰~

public void testClaimPendingSingleThreadAsync() throws ExecutionException, InterruptedException {
    RedisClient client = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = client.connect();
    String streamName = "s";
    String groupName = "g1";

    RedisAsyncCommands<String, String> async = connection.async();
    Consumer<String> consumer = Consumer.from(groupName, "c2");
    Range<String> fullRange = Range.create("-", "+");

    AtomicReference<Supplier<CompletionStage<?>>> process = new AtomicReference<>();
    AtomicReference<Timer.Context> context = new AtomicReference<>();
    // 一次处理
    process.set(() -> {
        context.set(metrics.timer(consumer.getName() + ".process").time());
        return async
                .xpending(streamName, consumer, fullRange, Limit.from(1))
                .thenCompose(v -> {
                    PendingResult result = PendingResult.of(v);
                    if (!result.hasPending()) {
                        throw new RuntimeException("DONE");
                    }

                    return async.xclaim(
                            streamName,
                            consumer,
                            new XClaimArgs().minIdleTime(500).retryCount(result.getDeliverCount() + 1),
                            result.getMessageId()
                    );
                })
                .thenCompose(list -> {
                    StreamMessage<String, String> message = list.get(0);
                    if (ThreadLocalRandom.current().nextDouble() < 0.001) {
                        log.info("[{}] {}", consumer.getName(), message.getBody());
                    }
                    return async.xack(streamName, groupName, message.getId());
                })
                .thenCompose(v -> {
                    context.get().close();
                    return process.get().get();
                });
    });
    // 循环
    process.get()
            .get()
            .whenComplete((v, e) -> {
                if (e != null) {
                    e.printStackTrace();
                }
                log.info("Complete");
            })
            .toCompletableFuture()
            .get();
}

但性能会比同步操作的性能要好呢,Java 的 CPU 也比同步的更低

Java CPU 50%
redis-server CPU 50%
c2.process
            count = 879207
        mean rate = 5145.76 calls/second
    1-minute rate = 5128.23 calls/second
    5-minute rate = 3779.55 calls/second
15-minute rate = 3132.92 calls/second
            min = 0.14 milliseconds
            max = 0.81 milliseconds
            mean = 0.18 milliseconds
        stddev = 0.06 milliseconds
        median = 0.16 milliseconds
            75% <= 0.19 milliseconds
            95% <= 0.29 milliseconds
            98% <= 0.34 milliseconds
            99% <= 0.40 milliseconds
        99.9% <= 0.81 milliseconds

总结

当什么时候选择 Redis 的流呢?

  1. 内存存储满足需求
  2. 速度要求高
  3. 能接收 Redis 的持久化保障 - (保障是不一定持久 ?)

合理的应用也是需要合理的场景。

流总的来说还是很不错的,还有很多可能使用的场景在这里不做一一赘述,流的内部实现也是非常的有意思的,等有时间再做另外的一个分享。此外 Redis 4 的 Module 也是非常有魅力,例如甚至可以用 Golang 来实现模块添加新的命令功能,嗯嗯,机会多多。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
15天前
|
NoSQL Linux Redis
06- 你们使用Redis是单点还是集群 ? 哪种集群 ?
**Redis配置:** 使用哨兵集群,结构为1主2从,加上3个哨兵节点,总计分布在3台Linux服务器上,提供高可用性。
227 0
|
24天前
|
负载均衡 监控 NoSQL
Redis的集群方案有哪些?
Redis集群包括主从复制(基础,手动故障恢复)、哨兵模式(自动高可用)和Redis Cluster(官方分布式解决方案,自动分片和容错)。此外,还有如Codis、Redisson和Twemproxy等第三方工具用于代理和负载均衡。选择方案需考虑应用场景、数据规模和并发需求。
189 2
|
30天前
|
NoSQL Redis
Redis集群(六):集群常用命令及说明
Redis集群(六):集群常用命令及说明
186 0
|
2月前
|
运维 NoSQL 算法
Redis-Cluster 与 Redis 集群的技术大比拼
Redis-Cluster 与 Redis 集群的技术大比拼
82 0
|
1天前
|
存储 NoSQL 算法
Redis 搭建分片集群
Redis 搭建分片集群
|
24天前
|
NoSQL Java 测试技术
面试官:如何搭建Redis集群?
**Redis Cluster** 是从 Redis 3.0 开始引入的集群解决方案,它分散数据以减少对单个主节点的依赖,提升读写性能。16384 个槽位分配给节点,客户端通过槽位信息直接路由请求。集群是无代理、去中心化的,多数命令直接由节点处理,保持高性能。通过 `create-cluster` 工具快速搭建集群,但适用于测试环境。在生产环境,需手动配置文件,启动节点,然后使用 `redis-cli --cluster create` 分配槽位和从节点。集群动态添加删除节点、数据重新分片及故障转移涉及复杂操作,包括主从切换和槽位迁移。
32 0
面试官:如何搭建Redis集群?
|
28天前
|
存储 缓存 NoSQL
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)(一)
【Redis深度专题】「核心技术提升」探究Redis服务启动的过程机制的技术原理和流程分析的指南(集群功能分析)
315 0
|
1月前
|
NoSQL Redis Docker
使用Docker搭建一个“一主两从”的 Redis 集群(超详细步骤)
使用Docker搭建一个“一主两从”的 Redis 集群(超详细步骤)
68 0
|
1月前
|
存储 监控 NoSQL
Redis 架构深入:主从复制、哨兵到集群
大家好,我是小康,今天我们来聊下 Redis 的几种架构模式,包括主从复制、哨兵和集群模式。
Redis 架构深入:主从复制、哨兵到集群
|
1月前
|
运维 负载均衡 NoSQL
【大厂面试官】知道Redis集群和Redis主从有什么区别吗
集群节点之间的故障检测和Redis主从中的哨兵检测很类似,都是通过PING消息来检测的。。。面试官抓抓脑袋,继续看你的简历…得想想考点你不懂的😰。
67 1

热门文章

最新文章