Redis 实战:用 SETNX 和 Lua 脚本实现分布式锁,彻底解决缓存穿透

配图:标题:Redis 实战:用 SETNX 和 Lua 脚本实现分布式锁,彻

环境准备

首先,我们需要一个可用的 Redis 实例。本教程使用 Docker 快速启动 Redis 6.2.6 版本,这是一个稳定且广泛使用的版本。请确保你的机器已安装 Docker。启动命令如下。

docker run -d --name redis-lock -p 6379:6379 redis:6.2.6

启动 Redis 容器

执行后,使用 `redis-cli` 连接并验证。预期输出应显示 Redis 服务器信息,表示连接成功。

redis-cli ping

测试 Redis 连接

预期输出为 `PONG`。接下来,我们还需要一个用于测试的客户端环境。这里使用 Python 3.8+ 和 `redis-py` 库。安装命令如下。

pip install redis==4.5.4

安装 redis-py 库

安装完成后,创建一个测试脚本文件 `test_lock.py`,用于后续验证。目录结构如下:

mkdir redis-lock-demo && cd redis-lock-demo
touch test_lock.py
ls

创建项目目录和文件

步骤拆解:实现分布式锁

分布式锁的核心是确保同一时间只有一个客户端能获取到锁。我们使用 Redis 的 SETNX 命令来实现。SETNX 仅在键不存在时设置值,这天然适合锁的获取操作 [来源#2]。但简单的 SETNX 存在锁无法自动释放的问题,因此我们需要结合 Lua 脚本实现原子性的加锁和解锁。

  • 加锁:使用 SETNX 设置一个唯一的锁键(如 `lock:resource_id`),并设置一个过期时间(TTL)以防止死锁。
  • 解锁:使用 Lua 脚本原子性地检查锁键的值是否与客户端持有的令牌一致,然后删除锁键。
  • 锁续期:在业务执行期间,通过后台线程定期延长锁的过期时间,防止业务未完成锁已过期。

下面是一个完整的 Python 实现示例,包含加锁、解锁和锁续期功能。请将以下代码保存到 `test_lock.py` 中。

import redis
import time
import uuid
import threading

class RedisDistributedLock:
    def __init__(self, redis_client, lock_key, timeout=10):
        self.redis = redis_client
        self.lock_key = lock_key
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
        self.renewal_thread = None
        self._stop_renewal = threading.Event()

    def acquire_lock(self):
        """尝试获取锁,成功返回 True,失败返回 False"""
        # SETNX 命令,仅当 key 不存在时设置值
        # 使用 set 方法并设置 nx=True 来实现 SETNX
        acquired = self.redis.set(self.lock_key, self.identifier, nx=True, ex=self.timeout)
        if acquired:
            # 启动锁续期线程
            self._start_renewal()
            return True
        return False

    def _start_renewal(self):
        """启动后台线程续期锁"""
        def renew():
            while not self._stop_renewal.is_set():
                time.sleep(self.timeout // 2)  # 每 half TTL 续期一次
                if self._stop_renewal.is_set():
                    break
                # 使用 Lua 脚本原子性地检查并续期
                lua_script = """
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('expire', KEYS[1], ARGV[2])
                else
                    return 0
                end
                """
                try:
                    self.redis.eval(lua_script, 1, self.lock_key, self.identifier, self.timeout)
                except Exception as e:
                    # 续期失败,记录日志并停止续期
                    print(f"锁续期失败: {e}")
                    break
        
        self.renewal_thread = threading.Thread(target=renew, daemon=True)
        self.renewal_thread.start()

    def release_lock(self):
        """释放锁,使用 Lua 脚本确保原子性"""
        # Lua 脚本:只有持有者才能释放锁
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        try:
            result = self.redis.eval(lua_script, 1, self.lock_key, self.identifier)
            # 停止续期线程
            self._stop_renewal.set()
            if self.renewal_thread:
                self.renewal_thread.join(timeout=1)
            return result == 1
        except Exception as e:
            print(f"释放锁失败: {e}")
            return False

# 使用示例
if __name__ == "__main__":
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    lock = RedisDistributedLock(r, "lock:test_resource", timeout=10)
    
    if lock.acquire_lock():
        print("锁获取成功,执行业务逻辑...")
        time.sleep(5)  # 模拟业务执行
        print("业务执行完毕")
        lock.release_lock()
        print("锁已释放")
    else:
        print("锁获取失败")

分布式锁核心实现代码

代码说明:`acquire_lock` 使用 SETNX 命令尝试获取锁,成功后启动一个后台线程定期续期。`release_lock` 使用 Lua 脚本确保只有锁的持有者才能释放锁,避免了误删其他客户端锁的问题 [来源#1]。

结果验证

验证分为两部分:单客户端锁行为和高并发场景下的锁竞争。首先,运行上面的测试脚本,观察输出。

python test_lock.py

运行单客户端锁测试

预期输出如下:

锁获取成功,执行业务逻辑...
业务执行完毕
锁已释放

单客户端锁测试预期输出

同时,使用 `redis-cli` 监控锁键,确保其被正确设置和删除。

redis-cli monitor

监控 Redis 命令

在另一个终端运行测试脚本,你会看到类似 `SET lock:test_resource ...` 和 `DEL lock:test_resource` 的命令。接下来,模拟高并发场景。创建一个新脚本 `concurrent_test.py`,使用多线程模拟多个客户端竞争锁。

import redis
import time
import threading
from test_lock import RedisDistributedLock

def worker(lock, thread_id):
    if lock.acquire_lock():
        print(f"线程 {thread_id} 获取锁成功")
        time.sleep(2)  # 模拟业务执行
        lock.release_lock()
        print(f"线程 {thread_id} 释放锁")
    else:
        print(f"线程 {thread_id} 获取锁失败")

if __name__ == "__main__":
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    threads = []
    for i in range(5):
        lock = RedisDistributedLock(r, "lock:concurrent_test", timeout=10)
        t = threading.Thread(target=worker, args=(lock, i))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    print("所有线程执行完毕")

高并发锁竞争测试代码

运行此脚本:

python concurrent_test.py

运行高并发锁测试

预期输出应显示只有一个线程成功获取锁,其他线程获取失败。例如:

线程 0 获取锁成功
线程 1 获取锁失败
线程 2 获取锁失败
线程 3 获取锁失败
线程 4 获取锁失败
线程 0 释放锁
所有线程执行完毕

高并发锁测试预期输出

这证明了锁的互斥性。如果看到多个线程同时获取锁成功,则说明锁实现有误。

常见错误与排查

在实现分布式锁时,常见错误包括锁无法释放、锁被误删和锁续期失败。以下是三个典型问题及其排查方法。

  • 锁无法自动释放:如果客户端崩溃,锁将永久存在,导致死锁。排查方法:确保锁设置了合理的过期时间(TTL),并在业务代码中捕获异常后强制释放锁。验证:使用 `redis-cli ttl lock:key` 检查过期时间。
  • 锁被误删:客户端 A 释放了客户端 B 的锁。排查方法:使用 Lua 脚本确保原子性检查锁值再删除 [来源#1]。验证:在释放锁时打印锁值,确保与客户端标识一致。
  • 锁续期失败:后台线程因异常退出,导致锁过期。排查方法:将续期线程设为守护线程(daemon),并添加异常捕获。验证:监控锁的 TTL 变化,确保在业务执行期间 TTL 不归零。

此外,网络分区可能导致锁失效。Redis 官方文档指出,SETNX 命令在分布式环境下需谨慎使用,建议结合 Redlock 算法或使用 Redisson 等成熟库 [来源#1]。本教程的实现是基础方案,适用于单 Redis 实例场景。


通过以上步骤,你已成功实现了一个基于 SETNX 和 Lua 脚本的分布式锁方案。该方案能有效解决高并发下的缓存穿透问题,确保数据一致性。请根据实际业务调整锁的超时时间和续期策略。

参考链接

阅读剩余
THE END