Redis分布式锁实战:解决缓存穿透

环境准备
在开始实现分布式锁之前,我们需要搭建一个可靠的开发与测试环境。本教程将基于Redis 7.0及以上版本进行演示,因为它提供了更完善的命令和性能优化。我们将使用Docker来快速启动Redis服务,确保环境的一致性。同时,我们将使用Python 3.8+作为客户端语言,因为它在后端开发中广泛使用,且redis-py库提供了对Redis命令的良好支持。请确保你的开发机器已安装Docker和Python环境。
- 安装 Redis 7.0 或更高版本。使用 Docker 快速启动:docker run -d -p 6379:6379 redis:7.0-alpine。
- 准备开发环境,例如 Python 3.8+ 并安装 redis-py 库:pip install redis。
- 确保 Redis 服务端口(默认 6379)可访问。可以使用 redis-cli ping 测试连接。
- 了解基本 Redis 命令:SET, GET, DEL, EVAL。这些是实现分布式锁的基础。
分布式锁原理与SETNX命令
分布式锁是协调多个进程或节点对共享资源进行互斥访问的机制。在微服务架构中,多个服务实例可能同时尝试操作同一份数据(例如,扣减库存),如果没有锁,会导致数据不一致。Redis的SETNX(SET if Not eXists)命令是实现分布式锁的基础。它仅在键不存在时设置键的值,返回1表示成功获取锁,0表示锁已被占用 [来源#1]。然而,单纯使用SETNX存在风险,例如客户端在执行完业务逻辑后崩溃,未能释放锁,会导致死锁。因此,必须为锁设置一个过期时间(TTL),确保即使客户端异常,锁也能自动释放。Redis的SET命令结合NX和PX选项可以原子性地完成这一操作,例如:SET lock_key my_lock_value NX PX 30000,其中NX表示仅当键不存在时设置,PX 30000表示30秒后自动过期 [来源#1]。锁的值应使用唯一标识(如UUID),以确保只有持有锁的客户端才能正确释放锁,避免误删其他客户端的锁。
使用Lua脚本实现原子操作
释放锁时,需要确保“检查锁值是否匹配”和“删除锁键”这两个操作是原子的。如果使用多个Redis命令(如先GET再DEL),在命令执行间隙,锁可能被其他客户端获取或释放,导致误删。Redis支持执行Lua脚本,并且在执行脚本期间,Redis会阻塞其他命令,从而保证原子性 [来源#2]。一个典型的释放锁Lua脚本如下:
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
该脚本接受两个参数:KEYS[1]是锁的键名,ARGV[1]是客户端持有的唯一值。只有值匹配时才执行删除,否则返回0表示释放失败。在Python中,可以使用redis.eval()执行Lua脚本,确保原子性。
步骤拆解:实现分布式锁
![正文配图:代码:if redis.call('get', KEYS[1]) ==](https://wnluo.com/wp-content/uploads/2026/02/20260214072220093883.jpg)
实现一个健壮的分布式锁需要遵循清晰的步骤。我们将使用Python代码来演示整个过程,包括连接Redis、生成唯一锁值、尝试获取锁、执行业务逻辑以及安全释放锁。每个步骤都包含详细的注释和预期输出,以帮助你理解其工作原理。
- 步骤 1:连接 Redis。使用 Python 客户端建立连接。
- 步骤 2:生成唯一锁值。使用 UUID 生成唯一标识符。
- 步骤 3:尝试获取锁。使用 SET 命令(SETNX 的替代,支持更丰富的选项)设置锁键。
- 步骤 4:执行业务逻辑。获取锁成功后,执行关键代码。
- 步骤 5:释放锁。使用 Lua 脚本原子性地检查并删除锁。
import redis
import uuid
import time
# 连接 Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 释放锁的 Lua 脚本
RELEASE_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
def acquire_lock(lock_key, timeout=30):
"""尝试获取分布式锁"""
lock_value = str(uuid.uuid4())
# 使用 SET 命令,NX 表示仅当键不存在时设置,PX 设置毫秒级过期时间
result = r.set(lock_key, lock_value, nx=True, px=timeout * 1000)
if result:
return lock_value
return None
def release_lock(lock_key, lock_value):
"""释放分布式锁"""
# 执行 Lua 脚本,原子性地检查并删除锁
return r.eval(RELEASE_SCRIPT, 1, lock_key, lock_value)
# 示例使用
lock_key = "my_resource_lock"
lock_value = acquire_lock(lock_key, timeout=10)
if lock_value:
try:
print("获取锁成功,执行业务逻辑...")
time.sleep(2) # 模拟业务处理
print("业务逻辑完成")
finally:
# 释放锁
result = release_lock(lock_key, lock_value)
if result == 1:
print("释放锁成功")
else:
print("释放锁失败,可能锁已过期或被其他客户端释放")
else:
print("获取锁失败,资源被占用")
- 预期输出:如果锁获取成功,将打印“获取锁成功,执行业务逻辑...”和“业务逻辑完成”,然后“释放锁成功”。如果锁被占用,则打印“获取锁失败,资源被占用”。
- 验证方法:在多个终端或进程中同时运行此代码,观察只有一个进程能获取锁并执行业务逻辑,其他进程会失败。可以使用 redis-cli GET my_resource_lock 查看锁键的值和过期时间。
应用分布式锁解决缓存穿透
缓存穿透是指大量请求查询数据库中不存在的数据,导致请求直接穿透缓存打到数据库,造成数据库压力剧增甚至崩溃。使用分布式锁可以防止多个线程同时查询数据库。当缓存未命中时,使用分布式锁确保只有一个线程去数据库查询,并将结果写入缓存。其他线程等待锁释放后直接从缓存读取 [来源#2]。锁的粒度应基于查询的key,避免锁住整个缓存系统。
- 步骤:1. 查询缓存;2. 缓存未命中则尝试获取锁;3. 获取锁后查询数据库,写入缓存;4. 释放锁;5. 其他线程从缓存读取。
- 注意:对于不存在的数据,也应缓存一个空值(如"NOT_FOUND"),并设置较短的过期时间,以防止同一key的持续穿透。
import redis
import uuid
import time
# 连接 Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 释放锁的 Lua 脚本(同上)
RELEASE_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
def acquire_lock(lock_key, timeout=30):
lock_value = str(uuid.uuid4())
result = r.set(lock_key, lock_value, nx=True, px=timeout * 1000)
if result:
return lock_value
return None
def release_lock(lock_key, lock_value):
return r.eval(RELEASE_SCRIPT, 1, lock_key, lock_value)
def query_data_with_cache(key):
"""查询数据,使用分布式锁防止缓存穿透"""
# 1. 查询缓存
cached_value = r.get(key)
if cached_value:
print(f"从缓存读取数据: {cached_value}")
return cached_value
# 2. 缓存未命中,尝试获取锁
lock_key = f"lock:{key}"
lock_value = acquire_lock(lock_key, timeout=10)
if lock_value:
try:
# 3. 获取锁后,再次检查缓存(避免锁竞争后重复查询)
cached_value = r.get(key)
if cached_value:
print(f"锁内从缓存读取数据: {cached_value}")
return cached_value
# 4. 模拟查询数据库(这里返回一个不存在的值)
print("查询数据库...")
time.sleep(1) # 模拟数据库查询耗时
db_value = None # 假设数据库返回空
# 5. 将结果写入缓存,设置较短的过期时间避免脏数据
if db_value:
r.setex(key, 60, db_value) # 缓存 60 秒
print(f"写入缓存: {db_value}")
return db_value
else:
# 对于不存在的数据,也缓存空值,防止穿透
r.setex(key, 10, "NOT_FOUND") # 缓存 10 秒
print("写入空值缓存")
return None
finally:
# 6. 释放锁
result = release_lock(lock_key, lock_value)
if result == 1:
print("释放锁成功")
else:
print("释放锁失败")
else:
# 7. 未获取锁,等待后重试或直接返回空
print("未获取锁,等待或返回空")
time.sleep(0.1) # 简单等待
return query_data_with_cache(key) # 递归重试,实际中应有限制
# 示例使用
key = "non_existent_key"
print("第一次查询:")
result1 = query_data_with_cache(key)
print(f"结果: {result1}")
print("\n第二次查询(应从缓存读取):")
result2 = query_data_with_cache(key)
print(f"结果: {result2}")
- 预期输出:第一次查询会打印“查询数据库...”和“写入空值缓存”,第二次查询会直接从缓存读取“NOT_FOUND”。
- 验证方法:使用 redis-cli 检查缓存键:GET non_existent_key,应返回“NOT_FOUND”。同时,观察日志,确保只有一次数据库查询。可以使用 redis-cli KEYS 查看所有键,确认锁键在释放后消失。
结果验证
验证是确保代码正确性的关键步骤。我们需要从多个角度验证分布式锁和缓存穿透防护的效果。这包括验证锁的互斥性、缓存数据的正确性以及系统在高并发下的表现。

- 验证分布式锁:在多个终端运行锁获取代码,使用 redis-cli 监控锁键:WATCH lock_key 或 GET lock_key,确认只有一个客户端持有锁。业务完成后,锁键应被删除。
- 验证缓存穿透防护:使用 redis-cli 监控数据库查询次数(在代码中添加计数器),或观察日志。对于不存在的 key,数据库查询应仅发生一次。使用 redis-cli GET non_existent_key 返回“NOT_FOUND”。
- 使用 Redis 的 INFO 命令检查内存和键数量,确保缓存键正确设置过期时间。例如,执行 redis-cli INFO keyspace 查看数据库键数量。
- 压力测试:使用工具如 ab (Apache Bench) 模拟并发请求,观察数据库负载是否降低。例如:ab -n 100 -c 10 http://your-api/query?key=test。

- 预期输出:锁验证中,GET lock_key 应返回一个 UUID 值,业务完成后键消失。缓存验证中,GET non_existent_key 返回“NOT_FOUND”,且数据库查询日志仅出现一次。
- 关键事实:Redis 官方文档指出,SETNX 命令是实现分布式锁的基础,但需结合过期时间使用 [来源#1]。Redis 解决方案文档强调 Lua 脚本在缓存穿透防护中的原子性优势 [来源#2]。
- 验证方法:在 Python 代码中添加日志记录数据库查询次数,或使用 Redis 的 MONITOR 命令观察命令执行情况。例如,执行 redis-cli MONITOR,然后运行代码,观察是否有重复的数据库查询命令。
常见错误与排查
在实现分布式锁和解决缓存穿透的过程中,可能会遇到一些常见错误。了解这些错误及其排查方法,可以帮助你快速定位和解决问题,提高系统的稳定性。
- 错误 1:锁过期时间设置过短,导致业务未完成锁已释放。解决方案:根据业务耗时调整 PX 值,例如业务需 5 秒,则设置 10 秒以上。可以使用 redis-cli PTTL lock_key 查看剩余过期时间。
- 错误 2:释放锁时误删其他客户端的锁。解决方案:使用 Lua 脚本检查锁值,确保原子性 [来源#1]。避免使用 DEL 命令直接删除。
- 错误 3:网络分区导致锁无法释放。解决方案:使用 Redis 集群或哨兵模式提高可用性,并设置合理的重试机制。例如,使用 redis-cli --cluster 创建集群。
- 错误 4:缓存穿透防护中,空值缓存时间过长。解决方案:根据业务调整空值 TTL,例如 10-60 秒 [来源#2]。避免长期占用内存。
- 错误 5:Redis 连接失败。解决方案:检查 Redis 服务状态,使用连接池管理连接。在 Python 中,可以使用 redis.ConnectionPool。
总结与最佳实践
通过本教程,我们学习了如何使用Redis的SETNX命令和Lua脚本实现分布式锁,并将其应用于解决缓存穿透问题。从环境准备到步骤拆解,再到结果验证和错误排查,我们覆盖了整个实现流程。分布式锁是解决缓存穿透的有效手段,但需结合业务场景调整参数。始终使用Lua脚本确保原子操作,避免竞态条件。监控Redis性能,定期清理过期键,防止内存泄漏。对于高并发场景,考虑使用Redlock算法或Redis集群增强锁的可靠性 [来源#1]。测试是关键:在生产环境前,进行充分的并发和故障测试。
- 分布式锁是解决缓存穿透的有效手段,但需结合业务场景调整参数。
- 始终使用 Lua 脚本确保原子操作,避免竞态条件。
- 监控 Redis 性能,定期清理过期键,防止内存泄漏。
- 对于高并发场景,考虑使用 Redlock 算法或 Redis 集群增强锁的可靠性 [来源#1]。
- 测试是关键:在生产环境前,进行充分的并发和故障测试。
参考资源

- Redis 官方分布式锁模式文档:https://redis.io/docs/manual/patterns/distributed-locks/ [来源#1]
- Redis 解决方案:避免缓存穿透:https://developer.redis.com/howtos/solutions/cache/cache-avoid-penetration/ [来源#2]