本文最后更新于 321 天前,其中的信息可能已经有所发展或是发生改变。
首先,分析一下主从+哨兵模式带来的问题:
- 在主从 + 哨兵的模式下,仍然只有一个 Master 节点,当并发请求较大时,哨兵模式不能缓解写压力
- 在 Sentinel 模式下,每个节点需要保存全量数据,无法进行海量数据存储
因此,在 Redis 3.0 之后,提供了 Cluster 的解决方案,核心原理是对数据做分片:
- 采用无中心结构
- 每个 master 可以有多个 slave 节点
- 整个集群分片共有 16384 个哈希槽
- 每个 key 通过 CRC16 校验后对 16384 取模来决定放置哪个槽,集群的每个节点负责一部分hash 槽
- 当主节点不可用时,从节点会升级为主节点,原有主节点恢复后会降级为从节点
和 Sentinel 类似,Cluster 也存在服务监控和选举规则:
- 主观下线:和 Sentinel 一样采用心跳包检测,当一个节点不能从另一个节点接收到心跳信息,该节点会将它标记为“主观下线”状态
- 客观下线:当半数以上的主节点都将某个节点标记为“主观下线”,那么这个节点会被标记为“客观下线”
当某个节点被标记为客观下线后,会从该主节点的从节点中选举一个从节点作为新的主节点:
- 选举过程主要看从节点的复制偏移量(replica offset)和 runid
- 优先选择复制偏移量最大的节点,如果复制偏移量相同,则选择 runid 最小的节点
注意:
- 若某一主节点及其从节点都不可用,则会导致整个 Redis Cluster 集群不可用
顺序分布:
- 根据数据的某些属性进行排序,将数据均匀地分配到不同的存储节点
- 例如,将用户 ID 排序,分区间存入不同的节点
一致性哈希:
- 将整个哈希值空间组成一个虚拟的圆环,然后根据某种哈希算法将数据项映射到该圆环上
- 例如对于 Redis,对节点 id 进行 hash,将其值分布在圆环上
- 发生读写的 key 经过 hash 后,顺时针查圆环上的节点,若未找到,则默认为 0 位置后的第一个节点
如果采用一致性哈希算法,若某个节点挂了,受影响的数据仅仅是此节点到环空间前一个节点(沿着逆时针方向行走遇到的第一个节点)之间的数据,其它不受影响。增加一个节点也同理。
但是当删除节点时,数据再分配会把当前节点所有数据加到它的下一个节点上(缓存抖动)。这样会导致下一个节点使用率暴增,可能会导致挂掉,如果下一个节点挂掉,下下个节点将会承受更大的压力,最终导致集群雪崩。
Redis 并没有使用一致性哈希,而是采用哈希槽的方式进行分片
Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽:
理论上 CRC16 算法可以得到 216 个数值,其数值范围在 0-65535 之间,取模运算 key 的时候,应该是CRC16(key)%65535,但是却设计为CRC16(key)%16384,原因是作者在设计的时候做了空间上的权衡,觉得节点最多不可能超过1000个,同时为了保证节点之间通信效率,所以采用了 214。
具体分片方式如下:
- 把16384槽按照节点数量进行平均分配,由节点进行管理
- 对每个key按照 CRC16规则进行 hash 运算
- 把hash结果对 16383 进行取余
- 把余数发送给 Redis 节点
- 节点接收到数据,验证是否在自己管理的槽编号的范围
- 如果在自己的编号范围内,会把数据存储到数据槽中,返回执行结果
- 否则,会把数据发送给正确的节点,由正确的节点来处理
使用哈希槽的优势:
- 由于一致性哈希会造成缓存抖动和集群雪崩,因此要在原有基础上进行扩容和删减节点变得极为困难
- 使用哈希槽在新增节点时,只需要将其他节点的哈希槽分出一部分给新节点
- 删除节点时,则将该节点的哈希槽再分配给别的节点,之后再删除节点即可
注意:
- Redis Cluster 节点之间共享消息,每个节点会知道哪个节点负责哪个数据槽
- 添加节点后,需要手动给新节点分配哈希槽,从其他节点的哈希槽分来一部分,并且支持哈希槽均衡
本章节的 demo 代码示例,免搭建即开即用:learn-redis-demo
基于 Redis 实现分布式锁主要依赖于 SETNX
命令:
SETNX key value
:若不存在 key 则设置 key 值为 value,返回 1
- 若 key 已存在,则不做任何操作,返回 0
为了防止某个线程获取锁之后异常结束没有释放锁,导致其他线程调用 SETNX
命令返回 0 而进入死锁,因此加锁后需要设置超时时间
以下是一个简单的 SpringBoot demo:
| @RestController |
| @RequestMapping("/sell") |
| public class AppController { |
| @Resource |
| StringRedisTemplate stringRedisTemplate; |
| |
| String LOCK = "TICKETSELLER"; |
| String KEY = "TICKET"; |
| |
| @GetMapping("/ticket") |
| public void sellTicket() { |
| Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, "1"); |
| if (Boolean.TRUE.equals(isLocked)) { |
| |
| stringRedisTemplate.expire(LOCK, 5, TimeUnit.SECONDS); |
| try { |
| |
| int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY)); |
| if (ticketCount > 0) { |
| |
| stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1)); |
| System.out.println("I get a ticket!"); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } finally { |
| |
| stringRedisTemplate.delete(LOCK); |
| } |
| } else { |
| System.out.println("Field"); |
| } |
| } |
| |
| } |
- 我们先是用
SETNX
创建了锁,假如这个服务在创建锁之后由于事故导致直接停机,那么这个锁就是一个永不过期的锁
- 这将导致其他服务无法获取到锁,影响业务的正常进行
解决方案:
- 使用 LUA 脚本来进行加锁和设置过期时间的操作
- 这样可以使得加锁和设置过期时间是一个原子操作
| @RestController |
| @RequestMapping("/sell") |
| public class AppController { |
| @Resource |
| StringRedisTemplate stringRedisTemplate; |
| |
| String LOCK = "TICKETSELLER"; |
| String KEY = "TICKET"; |
| |
| @GetMapping("/ticket") |
| public void sellTicket() { |
| |
| String LUA Script = |
| "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + |
| "then redis.call('expire',KEYS[1],ARGV[2]) ;" + |
| "return true " + |
| "else return false " + |
| "end"; |
| |
| |
| Boolean isLocked = stringRedisTemplate.execute(new RedisCallback<Boolean>() { |
| @Override |
| public Boolean doInRedis(RedisConnection connection) throws DataAccessException { |
| return connection.eval(LUA Script.getBytes(), |
| ReturnType.BOOLEAN, |
| 1, |
| LOCK.getBytes(), |
| "1".getBytes(), |
| "5".getBytes()); |
| } |
| }); |
| if (Boolean.TRUE.equals(isLocked)) { |
| try { |
| int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY)); |
| if (ticketCount > 0) { |
| stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1)); |
| System.out.println("I get a ticket!"); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } finally { |
| stringRedisTemplate.delete(LOCK); |
| } |
| } else { |
| System.out.println("Field"); |
| } |
| } |
| |
| } |
假设现有服务 A 和服务 B,A 先拿到锁执行业务,但是由于业务过长导致 A 的锁到期后超时释放:
- 如果 B 的业务还没结束,A 的业务结束进行释放锁的操作,A 就会错误的删除掉 B 加的锁,那 B 的业务执行完就无锁可释了
- 如果 B 服务可以获取到锁了,B 加锁并执行他的业务,由于此时 A 也在执行业务,两个服务共享内存就容易造成超卖问题
针对第一种问题的出现,解决方案很简单,只需要对锁的值做出限制即可:
- 设置加锁 key 的值为唯一,如利用 uid + threadid
- 在释放锁时判断是否是自己的锁,如果是则释放
- 这个释放锁的操作也要保证原子性,因此也需要用 LUA 脚本来实现
| @RestController |
| @RequestMapping("/sell") |
| public class AppController { |
| @Resource |
| StringRedisTemplate stringRedisTemplate; |
| |
| String LOCK = "TICKETSELLER"; |
| String KEY = "TICKET"; |
| |
| @GetMapping("/ticket") |
| public void sellTicket() { |
| String lockLuaScript = |
| "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + |
| "then redis.call('expire',KEYS[1],ARGV[2]) ;" + |
| "return true " + |
| "else return false " + |
| "end"; |
| |
| |
| String VALUE = String.valueOf(Thread.currentThread().getId()); |
| Boolean isLocked = stringRedisTemplate.execute(new RedisCallback<Boolean>() { |
| @Override |
| public Boolean doInRedis(RedisConnection connection) throws DataAccessException { |
| return connection.eval(lockLuaScript.getBytes(), |
| ReturnType.BOOLEAN, |
| 1, |
| LOCK.getBytes(), |
| VALUE.getBytes(), |
| "5".getBytes() |
| ); |
| } |
| }); |
| if (Boolean.TRUE.equals(isLocked)) { |
| try { |
| int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY)); |
| if (ticketCount > 0) { |
| stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1)); |
| System.out.println("I get a ticket!"); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } finally { |
| |
| |
| |
| |
| |
| String unlockLuaScript = |
| "if redis.call('get',KEYS[1]) == ARGV[1] " + |
| "then redis.call('del',KEYS[1]); " + |
| "return true " + |
| "else return false " + |
| "end"; |
| stringRedisTemplate.execute(new RedisCallback<Object>() { |
| @Override |
| public Object doInRedis(RedisConnection connection) throws DataAccessException { |
| return connection.eval(unlockLuaScript.getBytes(), |
| ReturnType.BOOLEAN, |
| 1, |
| LOCK.getBytes(), |
| VALUE.getBytes() |
| ); |
| } |
| }); |
| } |
| } else { |
| System.out.println("Field"); |
| } |
| } |
| |
| } |
针对第二种问题,我们可以利用看门狗机制实现:
- 开一个守护线程,每隔一段时间就获取一次锁的状态
- 如果仍然持有锁,则对其续期,此过程仍然利用 LUA 脚本实现
- 当业务结束后终止该线程
| @RestController |
| @RequestMapping("/sell") |
| public class AppController { |
| @Resource |
| StringRedisTemplate stringRedisTemplate; |
| |
| String LOCK = "TICKETSELLER"; |
| String KEY = "TICKET"; |
| |
| @GetMapping("/ticket") |
| public void sellTicket() { |
| String lockLuaScript = |
| "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + |
| "then redis.call('expire',KEYS[1],ARGV[2]) ;" + |
| "return true " + |
| "else return false " + |
| "end"; |
| |
| String VALUE = String.valueOf(Thread.currentThread().getId()); |
| Boolean isLocked = stringRedisTemplate.execute(new RedisCallback<Boolean>() { |
| @Override |
| public Boolean doInRedis(RedisConnection connection) throws DataAccessException { |
| return connection.eval(lockLuaScript.getBytes(), |
| ReturnType.BOOLEAN, |
| 1, |
| LOCK.getBytes(), |
| VALUE.getBytes(), |
| "5".getBytes() |
| ); |
| } |
| }); |
| if (Boolean.TRUE.equals(isLocked)) { |
| |
| String addlockLuaScript = |
| "if redis.call('get',KEYS[1]) == ARGV[1] " + |
| "then redis.call('expire',KEYS[1], ARGV[2]) ; " + |
| "return true " + |
| "else return false " + |
| "end"; |
| Thread watchDoge = new Thread(() -> { |
| while (Boolean.TRUE.equals(stringRedisTemplate.execute(new RedisCallback<Boolean>() { |
| @Override |
| public Boolean doInRedis(RedisConnection connection) throws DataAccessException { |
| return connection.eval(addlockLuaScript.getBytes(), |
| ReturnType.BOOLEAN, |
| 1, |
| LOCK.getBytes(), |
| VALUE.getBytes(), |
| "5".getBytes()); |
| } |
| })) && !Thread.currentThread().isInterrupted()) { |
| try { |
| System.out.println(Thread.currentThread().isInterrupted()); |
| Thread.sleep(4000); |
| } catch (Exception e) { |
| break; |
| } |
| } |
| }); |
| watchDoge.setDaemon(true); |
| watchDoge.start(); |
| try { |
| int ticketCount = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY)); |
| if (ticketCount > 0) { |
| stringRedisTemplate.opsForValue().set(KEY, String.valueOf(ticketCount - 1)); |
| |
| watchDoge.interrupt(); |
| System.out.println("I get a ticket!"); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } finally { |
| String unlockLuaScript = |
| "if redis.call('get',KEYS[1]) == ARGV[1] " + |
| "then redis.call('del',KEYS[1]); " + |
| "return true " + |
| "else return false " + |
| "end"; |
| stringRedisTemplate.execute(new RedisCallback<Object>() { |
| @Override |
| public Object doInRedis(RedisConnection connection) throws DataAccessException { |
| return connection.eval(unlockLuaScript.getBytes(), |
| ReturnType.BOOLEAN, |
| 1, |
| LOCK.getBytes(), |
| VALUE.getBytes() |
| ); |
| } |
| }); |
| } |
| } else { |
| System.out.println("Field"); |
| } |
| } |
| |
| } |
对于上面的看门狗机制,其实是一个极其朴素的实现,实际上需要考虑到的东西还有很多。
另外上述的实现仍缺少一些高级应用场景的功能:
- 如何实现锁的可重入:增加重入次数的参数,实现锁的成对加锁和释放。
- 如何实现阻塞的锁:客户端轮询(性能开销大)或者发布订阅
而这些功能想要自己去实现是非常麻烦的,因此一般利用 Redisson 实现分布式锁。
Redisson 内置了一系列的分布式对象,分式集合,分布式锁,分布式服务等诸多功能特性,是一款基于 Redis 实现,拥有一系列分布式
系统功能特性的工具包,是实现分布式系统架构中缓存中间件的最佳选择。
引入依赖:
| <dependency> |
| <groupId>org.redisson</groupId> |
| <artifactId>redisson</artifactId> |
| <version>3.27.2</version> |
| </dependency> |
编写 Redisson 配置类:
| @Configuration |
| public class RedissonConfig { |
| |
| @Bean |
| public RedissonClient redissonClient() { |
| Config config = new Config(); |
| config.useSingleServer().setAddress("redis://127.0.0.1:6379"); |
| return Redisson.create(config); |
| } |
| } |
注入到 Controller:
| @RestController |
| @RequestMapping("/redisson") |
| public class RedissonAppController { |
| |
| String LOCK = "REIDSSON:TICKETSELLER"; |
| String KEY = "TICKET"; |
| |
| |
| @Resource |
| RedissonClient redissonClient; |
| |
| @Resource |
| StringRedisTemplate stringRedisTemplate; |
| |
| @GetMapping("/sell/ticket") |
| public void redissonSellTicket() { |
| |
| RLock rLock = redissonClient.getLock(LOCK); |
| rLock.lock(); |
| try { |
| int count = Integer.parseInt((String) stringRedisTemplate.opsForValue().get(KEY)); |
| if (count > 0) { |
| stringRedisTemplate.opsForValue().set(KEY, String.valueOf(count - 1)); |
| System.out.println("Reidsson get ticket"); |
| } else { |
| System.out.println("Field"); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } finally { |
| |
| rLock.unlock(); |
| } |
| } |
| |
| } |
首先来看 package org.redisson
包下的 lock
方法的具体实现:
| private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { |
| long threadId = Thread.currentThread().getId(); |
| |
| |
| |
| |
| |
| |
| Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); |
| if (ttl != null) { |
| |
| CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId); |
| this.pubSub.timeout(future); |
| RedissonLockEntry entry; |
| if (interruptibly) { |
| entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future); |
| } else { |
| entry = (RedissonLockEntry)this.commandExecutor.get(future); |
| } |
| |
| try { |
| while(true) { |
| |
| ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); |
| if (ttl == null) { |
| return; |
| } |
| |
| if (ttl >= 0L) { |
| try { |
| entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException var14) { |
| if (interruptibly) { |
| throw var14; |
| } |
| |
| entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); |
| } |
| } else if (interruptibly) { |
| entry.getLatch().acquire(); |
| } else { |
| entry.getLatch().acquireUninterruptibly(); |
| } |
| } |
| } finally { |
| |
| this.unsubscribe(entry, threadId); |
| } |
| } |
| } |
接下来我们看 tryAcquire
尝试获取锁的方法 tryAcquireOnceAsync
:
| private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { |
| RFuture acquiredFuture; |
| if (leaseTime > 0L) { |
| acquiredFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); |
| } else { |
| |
| acquiredFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); |
| } |
| |
| CompletionStage<Boolean> acquiredFuture = this.handleNoSync(threadId, acquiredFuture); |
| CompletionStage<Boolean> f = acquiredFuture.thenApply((acquired) -> { |
| if (acquired) { |
| if (leaseTime > 0L) { |
| this.internalLockLeaseTime = unit.toMillis(leaseTime); |
| } else { |
| this.scheduleExpirationRenewal(threadId); |
| } |
| } |
| |
| return acquired; |
| }); |
| return new CompletableFutureWrapper(f); |
| } |
可以看到 tryLockInnerAsync
传参中多了一个参数 this.internalLockLeaseTime
,这个东西的初始化在:
| public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { |
| super(commandExecutor, name); |
| this.commandExecutor = commandExecutor; |
| |
| this.internalLockLeaseTime = this.getServiceManager().getCfg().getLockWatchdogTimeout(); |
| this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); |
| } |
这个续期时间的默认值可以在 Config 里面找到:
| public Config() { |
| this.transportMode = TransportMode.NIO; |
| |
| this.lockWatchdogTimeout = 30000L; |
| this.checkLockSyncedSlaves = true; |
| this.slavesSyncTimeout = 1000L; |
| this.reliableTopicWatchdogTimeout = TimeUnit.MINUTES.toMillis(10L); |
| this.keepPubSubOrder = true; |
| this.useScriptCache = false; |
| this.minCleanUpDelay = 5; |
| this.maxCleanUpDelay = 1800; |
| this.cleanUpKeysAmount = 100; |
| this.nettyHook = new DefaultNettyHook(); |
| this.useThreadClassLoader = true; |
| this.addressResolverGroupFactory = new SequentialDnsAddressResolverFactory(); |
| this.protocol = Protocol.RESP2; |
| } |
最后我们回到最底层的 tryLockInnerAsync
方法:
| <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { |
| return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)}); |
| } |
这里实际执行的是一个 LUA 脚本:
| |
| if (redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then |
| redis.call('hincrby', KEYS[1], ARGV[2], 1); |
| redis.call('pexpire', KEYS[1], ARGV[1]); |
| return nil; |
| else |
| return redis.call('pttl', KEYS[1]); |
| end; |
这里新版的 Redisson 加锁的逻辑简化了,以前是区分了加锁和可重入,现在进行了合并。
继续看 tryAcquireAsync
方法,在获取到锁后,走 scheduleExpirationRenewal
的逻辑:
| private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { |
| RFuture ttlRemainingFuture; |
| if (leaseTime > 0L) { |
| ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); |
| } else { |
| ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); |
| } |
| |
| CompletionStage<Long> s = this.handleNoSync(threadId, ttlRemainingFuture); |
| RFuture<Long> ttlRemainingFuture = new CompletableFutureWrapper(s); |
| CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> { |
| if (ttlRemaining == null) { |
| if (leaseTime > 0L) { |
| this.internalLockLeaseTime = unit.toMillis(leaseTime); |
| } else { |
| this.scheduleExpirationRenewal(threadId); |
| } |
| } |
| |
| return ttlRemaining; |
| }); |
| return new CompletableFutureWrapper(f); |
| } |
点进去看 scheduleExpirationRenewal
方法:
| protected void scheduleExpirationRenewal(long threadId) { |
| ExpirationEntry entry = new ExpirationEntry(); |
| ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); |
| if (oldEntry != null) { |
| oldEntry.addThreadId(threadId); |
| } else { |
| entry.addThreadId(threadId); |
| |
| try { |
| this.renewExpiration(); |
| } finally { |
| if (Thread.currentThread().isInterrupted()) { |
| this.cancelExpirationRenewal(threadId, (Boolean)null); |
| } |
| |
| } |
| } |
| |
| } |
创建定时任务的逻辑:
| private void renewExpiration() { |
| ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); |
| if (ee != null) { |
| Timeout task = this.getServiceManager().newTimeout(new TimerTask() { |
| public void run(Timeout timeout) throws Exception { |
| ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName()); |
| if (ent != null) { |
| Long threadId = ent.getFirstThreadId(); |
| if (threadId != null) { |
| CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId); |
| future.whenComplete((res, e) -> { |
| if (e != null) { |
| RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e); |
| RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName()); |
| } else { |
| if (res) { |
| RedissonBaseLock.this.renewExpiration(); |
| } else { |
| RedissonBaseLock.this.cancelExpirationRenewal((Long)null, (Boolean)null); |
| } |
| |
| } |
| }); |
| } |
| } |
| } |
| }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); |
| ee.setTimeout(task); |
| } |
| } |
每次定时任务触发,会执行 renewExpirationAsync
方法:
| protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { |
| return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId)); |
| } |
可以看到,本质仍然是一个 LUA 脚本:
| |
| if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then |
| redis.call('pexpire', KEYS[1], ARGV[1]); |
| return 1; |
| end; |
| return 0; |
首先来看 package org.redisson
包下的 unlock
方法的具体实现:
| public void unlock() { |
| try { |
| this.get(this.unlockAsync(Thread.currentThread().getId())); |
| } catch (RedisException var2) { |
| if (var2.getCause() instanceof IllegalMonitorStateException) { |
| throw (IllegalMonitorStateException)var2.getCause(); |
| } else { |
| throw var2; |
| } |
| } |
| } |
调用了方法 unlockAsync
:
| public RFuture<Void> unlockAsync(long threadId) { |
| return this.getServiceManager().execute(() -> { |
| return this.unlockAsync0(threadId); |
| }); |
| } |
| |
| |
| private RFuture<Void> unlockAsync0(long threadId) { |
| CompletionStage<Boolean> future = this.unlockInnerAsync(threadId); |
| CompletionStage<Void> f = future.handle((res, e) -> { |
| this.cancelExpirationRenewal(threadId, res); |
| if (e != null) { |
| if (e instanceof CompletionException) { |
| throw (CompletionException)e; |
| } else { |
| throw new CompletionException(e); |
| } |
| } else if (res == null) { |
| IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId); |
| throw new CompletionException(cause); |
| } else { |
| return null; |
| } |
| }); |
| return new CompletableFutureWrapper(f); |
| } |
接下来走 unlockInnerAsync
的逻辑:
| protected final RFuture<Boolean> unlockInnerAsync(long threadId) { |
| |
| String id = this.getServiceManager().generateId(); |
| |
| |
| MasterSlaveServersConfig config = this.getServiceManager().getConfig(); |
| |
| |
| int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts(); |
| timeout = Math.max(timeout, 1); |
| |
| |
| RFuture<Boolean> r = this.unlockInnerAsync(threadId, id, timeout); |
| |
| |
| CompletionStage<Boolean> ff = r.thenApply((v) -> { |
| CommandAsyncExecutor ce = this.commandExecutor; |
| |
| |
| if (ce instanceof CommandBatchService) { |
| |
| ce = new CommandBatchService(this.commandExecutor); |
| } |
| |
| |
| ((CommandAsyncExecutor)ce).writeAsync(this.getRawName(), |
| LongCodec.INSTANCE, |
| RedisCommands.DEL, |
| new Object[]{this.getUnlockLatchName(id)}); |
| |
| |
| if (ce instanceof CommandBatchService) { |
| ((CommandBatchService)ce).executeAsync(); |
| } |
| |
| |
| return v; |
| }); |
| |
| |
| return new CompletableFutureWrapper(ff); |
| } |
重点在于执行异步任务释放锁的过程:
| protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) { |
| return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local val = redis.call('get', KEYS[3]); if val ~= false then return tonumber(val);end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); redis.call('set', KEYS[3], 0, 'px', ARGV[5]); return 0; else redis.call('del', KEYS[1]); redis.call(ARGV[4], KEYS[2], ARGV[1]); redis.call('set', KEYS[3], 1, 'px', ARGV[5]); return 1; end; ", Arrays.asList(this.getRawName(), this.getChannelName(), this.getUnlockLatchName(requestId)), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId), this.getSubscribeService().getPublishCommand(), timeout}); |
| } |
可以看到,本质还是一个 LUA 脚本:
| |
| local val = redis.call('get', KEYS[3]) |
| |
| if val ~= false then |
| return tonumber(val) |
| end |
| |
| if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then |
| |
| return nil |
| end |
| |
| local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1) |
| |
| if (counter > 0) then |
| |
| redis.call('pexpire', KEYS[1], ARGV[2]) |
| |
| redis.call('set', KEYS[3], 0, 'px', ARGV[5]) |
| return 0 |
| else |
| |
| redis.call('del', KEYS[1]) |
| |
| redis.call(ARGV[4], KEYS[2], ARGV[1]) |
| |
| redis.call('set', KEYS[3], 1, 'px', ARGV[5]) |
| return 1 |
| end |
- 查询到的 key 不存在导致查询结果没有写入缓存
- 后续大量这样的请求直接打到数据库压力很大
这里主要是很多这种的请求打过来,查到的 key 不存在的次数较多,导致数据库压力倍增
较为简单的解决方案是将这种查询不到的 key 设置为空值缓存并返回,缺点是占内存,实际上可以采用更加优雅的解决方案——添加布隆过滤器:
- 一个 bitmap,多个 hash 函数
- 对于已经缓存的 key,经过多次 hash 在 bitmap 中做映射
- 如果请求的 key 多次映射后全部命中,则说明在 Redis 中可能存在,放行请求
- 否则拒绝请求
这里全部命中后的“可能”存在是因为存在哈希冲突的可能性:
- 假设已经存在缓存中的 x, y, z 这三个 key 做 hash 后命中的 bitmap 如上图所示
- 现在有 w 这个 key 请求打过来,多次 hash 后发现全部命中,这就造成了误判
技术上实现:
- 本地缓存:Caffine 或者 Guava
- 基于 Redisson 实现
- 对于设置了过期时间的 key,缓存在某个时间点过期的时出现大量高并发请求
- 请求发现缓存过期会从后端 DB 加载数据并写回到缓存,这个时候大并发的请求可能会瞬间把 DB 压垮
这里主要是针对某个 key 的,某个 key 恰好过期恰好大量请求打到这个 key 上
- 使用互斥锁:
- 缓存失效时不立即查询 DB
- 先用 redis 的 setnx 设置互斥锁
- 成功返回再查询 DB 并写回缓存,否则重试 get 缓存方法
- 能保证强一致性,但线程需要等待有死锁风险
- 设置 key 逻辑过期:
- 设置 key 时添加一个字段表示过期时间,然后设置永不过期
- 查询到该 key 比对过期时间判断是否过期
- 如果过期新开线程进行数据同步,当前线程正常返回数据,但数据不是最新的
- 不能保证一致性,但线程无需等待
- 大量设置缓存时间相同的 key 在同一时间大量失效
- 导致很多失效的 key 全部查询 DB
这里算是缓存击穿的升级版,大量的同一过期时间的 key 失效,结果大量请求过来,虽然查询的不是同一个 key,但未命中的流量占大部分
- 将缓存失效时间分散,在原有时间上设置随机数错开失效时间
- 采用加锁计数,或者使用合理的队列数量来避免缓存失效时对数据库造成太大的压力,但是会降低系统的吞吐量
- 分析用户行为,尽量让失效时间点均匀分布,尽量避免缓存雪崩的出现