利用Redis实现集群或开发环境下SnowFlake自动配置机器号

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介:

利用Redis实现集群或开发环境下SnowFlake自动配置机器号
前言:
SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用预先分配好的机器ID,工作区ID,机器时间可以生成全局唯一的随时间趋势递增的Long类型ID.长度在17-19位。随着时间的增长而递增,在MySQL数据库中,InnoDB存储引擎可以更快的插入递增的主键。而不像UUID那样因为写入是乱序的,InnoDB不得不频繁的做页分裂操作,耗时且容易产生碎片。

对于SnowFlake 的原理介绍,可以参考该文章:理解分布式id生成算法SnowFlake

理解了雪花的基本原理之后,我们试想:在分布式集群或者开发环境下,不同服务之间/相同服务的不同机器之间应该如何产生差异呢?有以下几种方案:

通过在 yml 文件中配置不同的参数,启动 spring 容器时通过读取该参数来实现不同服务与不同机器的workerId不同。但是这里不方便新增机器/新同事的自动化配置
向第三方应用如zookeeper、Redis中注册ID,以获得唯一的ID。
对于开发环境,可以取机器的IP后三位。因为大家在一个办公室的话IP后三位肯定是0-255之前不重复。但是这样机器ID需要8个Bit,留给数据中心的位数就只有4个了。
本方案结合了以上方案的优点,按照业务的实际情况对雪花中的数据中心和机器ID所占的位数进行调整:数据中心占4Bit,范围从0-15。机器ID占6Bit,范围从0-63
。对不同的服务在yml中配置服务名称,以服务编号作为数据中心ID。如果按照开发+测试+生产环境区分的话,可以部署5个不同的服务。application.yml 中配置如下的参数

分布式雪花ID不同机器ID自动化配置

snowFlake:
dataCenter: 1 # 数据中心的id
appName: test # 业务类型名称
而机器ID采用以下的策略实现:

获取当前机器的IP地址 localIp,模32,获得0-31的整数 machineId
向Redis中注册,使用 appName + dataCenter + machineId 作为key ,以本机IP localIp 作为 value。
注册成功后,设置键过期时间 24 h,并开启一个计时器,在 23h 后更新注册的 key
如果注册失败,可能有以下两个原因:
上次服务异常中断,没有来得及删除key。这里的解决方案是通过key获取value,如果value和localIp一致,则仍然视为注册成功
IP和别人的IP模32的结果一样,导致机器ID冲突。这是就遍历 0-31 获取其中为注册的数字作为本机的机器号
如果不幸Redis连接失败,系统将从32-63之间随机获取ID,并使用 log.error() 打印醒目的提示消息这里建议IDEA + Grep Console 实现不同级别的日志不同前景色显示,方便及时获取错误信息
当服务停止前,向Redis发送请求,删除该Key的占用。
具体的代码如下:
自动配置机器ID,并在容器启动时放入SnowFlake实例对象
package cn.keats.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

@Configuration
@Slf4j
public class MachineIdConfig {

@Resource
private JedisPool jedisPool;

@Value("${snowFlake.dataCenter}")
private Integer dataCenterId;

@Value("${snowFlake.appName}")
private String APP_NAME;

/**
 * 机器id
 */
public static Integer machineId;
/**
 * 本地ip地址
 */
private static String localIp;

/**
 * 获取ip地址
 *
 * @return
 * @throws UnknownHostException
 */
private String getIPAddress() throws UnknownHostException {
    InetAddress address = InetAddress.getLocalHost();
    return address.getHostAddress();
}

/**
 * hash机器IP初始化一个机器ID
 */
@Bean
public SnowFlake initMachineId() throws Exception {
    localIp = getIPAddress(); // 192.168.0.233

    Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233
    //
    machineId = ip_.hashCode() % 32;// 0-31
    // 创建一个机器ID
    createMachineId();

    log.info("初始化 machine_id :{}", machineId);
    return new SnowFlake(machineId, dataCenterId);
}

/**
 * 容器销毁前清除注册记录
 */
@PreDestroy
public void destroyMachineId() {
    try (Jedis jedis = jedisPool.getResource()) {
        jedis.del(APP_NAME + dataCenterId + machineId);
    }
}
/**
 * 主方法:首先获取机器 IP 并 % 32 得到 0-31
 * 使用 业务名 + 组名 + IP 作为 Redis 的 key,机器IP作为 value,存储到Redis中
 *
 * @return
 */
public Integer createMachineId() {
    try {
        // 向redis注册,并设置超时时间
        log.info("注册一个机器ID到Redis " + machineId + " IP:" + localIp);
        Boolean flag = registerMachine(machineId, localIp);
        // 注册成功
        if (flag) {
            // 启动一个线程更新超时时间
            updateExpTimeThread();
            // 返回机器Id
            log.info("Redis中端口没有冲突 " + machineId + " IP:" + localIp);
            return machineId;
        }
        // 注册失败,可能原因 Hash%32 的结果冲突
        if (!checkIfCanRegister()) {
            // 如果 0-31 已经用完,使用 32-64之间随机的ID
            getRandomMachineId();
            createMachineId();
        } else {
            // 如果存在剩余的ID
            log.warn("Redis中端口冲突了,使用 0-31 之间未占用的Id " + machineId + " IP:" + localIp);
            createMachineId();
        }
    } catch (Exception e) {
        // 获取 32 - 63 之间的随机Id
        // 返回机器Id
        log.error("Redis连接异常,不能正确注册雪花机器号 " + machineId + " IP:" + localIp, e);
        log.warn("使用临时方案,获取 32 - 63 之间的随机数作为机器号,请及时检查Redis连接");
        getRandomMachineId();
        return machineId;
    }
    return machineId;
}

/**
 * 检查是否被注册满了
 *
 * @return
 */
private Boolean checkIfCanRegister() {
    // 判断0~31这个区间段的机器IP是否被占满
    try (Jedis jedis = jedisPool.getResource()) {
        Boolean flag = true;
        for (int i = 0; i < 32; i++) {
            flag = jedis.exists(APP_NAME + dataCenterId + i);
            // 如果不存在。设置机器Id为这个不存在的数字
            if (!flag) {
                machineId = i;
                break;
            }
        }
        return !flag;
    }
}

/**
 * 1.更新超時時間
 * 注意,更新前检查是否存在机器ip占用情况
 */
private void updateExpTimeThread() {
    // 开启一个线程执行定时任务:
    // 每23小时更新一次超时时间
    new Timer(localIp).schedule(new TimerTask() {
        @Override
        public void run() {
            // 检查缓存中的ip与本机ip是否一致, 一致则更新时间,不一致则重新获取一个机器id
            Boolean b = checkIsLocalIp(String.valueOf(machineId));
            if (b) {
                log.info("IP一致,更新超时时间 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );
                }
            } else {
                // IP冲突
                log.info("重新生成机器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                // 重新生成机器ID,并且更改雪花中的机器ID
                getRandomMachineId();
                // 重新生成并注册机器id
                createMachineId();
                // 更改雪花中的机器ID
                SnowFlake.setWorkerId(machineId);
                // 结束当前任务
                log.info("Timer->thread->name:{}", Thread.currentThread().getName());
                this.cancel();
            }
        }
    }, 10 * 1000, 1000 * 60 * 60 * 23);
}

/**
 * 获取32-63随机数
 */
public void getRandomMachineId() {
    machineId = (int) (Math.random() * 31) + 31;
}
/**
 * 检查Redis中对应Key的Value是否是本机IP
 *
 * @param mechineId
 * @return
 */
private Boolean checkIsLocalIp(String mechineId) {
    try (Jedis jedis = jedisPool.getResource()) {
        String ip = jedis.get(APP_NAME + dataCenterId + mechineId);
        log.info("checkIsLocalIp->ip:{}", ip);
        return localIp.equals(ip);
    }
}

/**
 * 1.注册机器
 * 2.设置超时时间
 *
 * @param machineId 取值为0~31
 * @return
 */
private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
    // try with resources 写法,出异常会释放括号内的资源 Java7特性
    try (Jedis jedis = jedisPool.getResource()) {
        // key 业务号 + 数据中心ID + 机器ID value 机器IP
        Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);
        if(result == 1){
            // 过期时间 1 天
            jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
            return true;
        } else {
            // 如果Key存在,判断Value和当前IP是否一致,一致则返回True
            String value = jedis.get(APP_NAME + dataCenterId + machineId);
            if(localIp.equals(value)){
                // IP一致,注册机器ID成功
                jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                return true;
            }
            return false;
        }
    }
}

}
雪花ID:
import org.springframework.context.annotation.Configuration;

/**

  • 功能:分布式ID生成工具类
    *

*/
@Configuration
public class SnowFlake {

/**
 * 开始时间截 (2019-09-08) 服务一旦运行过之后不能修改。会导致ID生成重复
 */
private final long twepoch = 1567872000000L;

/**
 * 机器Id所占的位数 0 - 64
 */
private final long workerIdBits = 6L;

/**
 * 工作组Id所占的位数 0 - 16
 */
private final long dataCenterIdBits = 4L;

/**
 * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
 */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

/**
 * 支持的最大数据标识id,结果是15
 */
private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);

/**
 * 序列在id中占的位数
 */
private final long sequenceBits = 12L;

/**
 * 机器ID向左移12位
 */
private final long workerIdShift = sequenceBits;

/**
 * 数据标识id向左移17位(12+5)
 */
private final long datacenterIdShift = sequenceBits + workerIdBits;

/**
 * 时间截向左移22位(5+5+12)
 */
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

/**
 * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
 */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

/**
 * 工作机器ID(0~63)
 */
private static long workerId;

/**
 * 数据中心ID(0~16)
 */
private long datacenterId;

/**
 * 毫秒内序列(0~4095)
 */
private long sequence = 0L;

/**
 * 上次生成ID的时间截
 */
private long lastTimestamp = -1L;

//==============================Constructors=====================================

/**
 * 构造函数
 *
 * @param workerId     工作ID (0~63)
 * @param datacenterId 数据中心ID (0~15)
 */
public SnowFlake(long workerId, long datacenterId) {
    if (workerId > maxWorkerId || workerId < 0) {
        throw new IllegalArgumentException(String.format("机器ID必须小于 %d 且大于 0", maxWorkerId));
    }
    if (datacenterId > maxDatacenterId || datacenterId < 0) {
        throw new IllegalArgumentException(String.format("工作组ID必须小于 %d 且大于 0", maxDatacenterId));
    }
    this.workerId = workerId;
    this.datacenterId = datacenterId;
}

/**
 * 构造函数
 *
 */
public SnowFlake() {
    this.workerId = 0;
    this.datacenterId = 0;
}

/**
 * 获得下一个ID (该方法是线程安全的)
 *
 * @return SnowFlakeId
 */
public synchronized long nextId() {
    long timestamp = timeGen();

    //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
    if (timestamp < lastTimestamp) {
        throw new RuntimeException(
                String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
    }

    // 如果是同一时间生成的,则进行毫秒内序列
    if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & sequenceMask;
        // 毫秒内序列溢出
        if (sequence == 0) {
            // 阻塞到下一个毫秒,获得新的时间戳
            timestamp = tilNextMillis(lastTimestamp);
        }
    }
    //时间戳改变,毫秒内序列重置
    else {
        sequence = 0L;
    }

    // 上次生成ID的时间截
    lastTimestamp = timestamp;

    // 移位并通过或运算拼到一起组成64位的ID
    return ((timestamp - twepoch) << timestampLeftShift) //
            | (datacenterId << datacenterIdShift) //
            | (workerId << workerIdShift) //
            | sequence;
}

/**
 * 阻塞到下一个毫秒,直到获得新的时间戳
 *
 * @param lastTimestamp 上次生成ID的时间截
 * @return 当前时间戳
 */
protected long tilNextMillis(long lastTimestamp) {
    long timestamp = timeGen();
    while (timestamp <= lastTimestamp) {
        timestamp = timeGen();
    }
    return timestamp;
}

/**
 * 返回以毫秒为单位的当前时间
 *
 * @return 当前时间(毫秒)
 */
protected long timeGen() {
    return System.currentTimeMillis();
}

public long getWorkerId() {
    return workerId;
}

public static void setWorkerId(long workerId) {
    SnowFlake.workerId = workerId;
}

public long getDatacenterId() {
    return datacenterId;
}

public void setDatacenterId(long datacenterId) {
    this.datacenterId = datacenterId;
}

}
Redis 配置
public class RedisConfig {

@Value("${spring.redis.host}")
private String host;

@Value("${spring.redis.port:6379}")
private Integer port;

@Value("${spring.redis.password:-1}")
private String password;

@Bean
public JedisPool jedisPool() {
    // 1.设置连接池的配置对象
    JedisPoolConfig config = new JedisPoolConfig();
    // 设置池中最大连接数
    config.setMaxTotal(50);
    // 设置空闲时池中保有的最大连接数
    config.setMaxIdle(10);
    config.setMaxWaitMillis(3000L);
    config.setTestOnBorrow(true);
    log.info(password);
    // 2.设置连接池对象
    if("-1".equals(password)){
        log.info("Redis不通过密码连接");
        return new JedisPool(config, host, port,0);
    } else {
        log.info("Redis通过密码连接" + password);
        return new JedisPool(config, host, port,0, password);
    }
}

}
使用方法
项目中引入 Redis 、 Jedis 依赖
复制上面两个类到项目until包下
application.yml 配置服务名称,机器序号,Redis账号,密码
配置Jedis,使得项目启动时池中有Redis连接对象
启动项目
在需要生成ID的类中注入

@Autowired
private SnowFlake snowFlake;
// 生产ID
snowFlake.nextId(); 方法生产ID

原文地址https://www.cnblogs.com/keatsCoder/p/12129279.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
13天前
|
NoSQL Linux Redis
06- 你们使用Redis是单点还是集群 ? 哪种集群 ?
**Redis配置:** 使用哨兵集群,结构为1主2从,加上3个哨兵节点,总计分布在3台Linux服务器上,提供高可用性。
33 0
|
22天前
|
负载均衡 监控 NoSQL
Redis的集群方案有哪些?
Redis集群包括主从复制(基础,手动故障恢复)、哨兵模式(自动高可用)和Redis Cluster(官方分布式解决方案,自动分片和容错)。此外,还有如Codis、Redisson和Twemproxy等第三方工具用于代理和负载均衡。选择方案需考虑应用场景、数据规模和并发需求。
31 2
|
1月前
|
NoSQL Java Redis
Springboot从2.x升级到3.x以后redis默认配置调整
Springboot从2.x升级到3.x以后redis默认配置调整
46 0
|
2月前
|
NoSQL Redis
若依管理系统去掉Redis相关配置
若依管理系统去掉Redis相关配置
|
2月前
|
NoSQL Redis 数据安全/隐私保护
Docker中Redis的安装与配置
本文主要讲解如何在Docker环境中搭建Redis环境,并进行相关配置
245 5
Docker中Redis的安装与配置
|
27天前
|
NoSQL Redis
Redis集群(六):集群常用命令及说明
Redis集群(六):集群常用命令及说明
27 0
|
2月前
|
运维 NoSQL 算法
Redis-Cluster 与 Redis 集群的技术大比拼
Redis-Cluster 与 Redis 集群的技术大比拼
46 0
|
1月前
|
NoSQL Linux Redis
Linux系统中安装redis+redis后台启动+常见相关配置
Linux系统中安装redis+redis后台启动+常见相关配置
|
2月前
|
NoSQL Redis Docker
在docker中安装redis,并且阿里云服务器配置
在docker中安装redis,并且阿里云服务器配置
183 1
|
21天前
|
NoSQL Java 测试技术
面试官:如何搭建Redis集群?
**Redis Cluster** 是从 Redis 3.0 开始引入的集群解决方案,它分散数据以减少对单个主节点的依赖,提升读写性能。16384 个槽位分配给节点,客户端通过槽位信息直接路由请求。集群是无代理、去中心化的,多数命令直接由节点处理,保持高性能。通过 `create-cluster` 工具快速搭建集群,但适用于测试环境。在生产环境,需手动配置文件,启动节点,然后使用 `redis-cli --cluster create` 分配槽位和从节点。集群动态添加删除节点、数据重新分片及故障转移涉及复杂操作,包括主从切换和槽位迁移。
31 0
面试官:如何搭建Redis集群?