分布式锁(redis & zookeeper)

多个客户端同时对同一数据进行修改,由于读取和存储操作不是原子性的,所以就会遇到并发问题,这种场景需要用分布式锁解决。

redis 分布式锁

锁操作

redis 2.6.12 版本中加入了针对分布式锁的命令: SET key value [expiration EX seconds|PX milliseconds] [NX|XX]

  • EX seconds – 设置过期时间,单位秒
  • PX milliseconds – 设置过期时间,单位毫秒
  • NX – key 不存在才设置 key 的值
  • XX – key 已经存在才可以设置 key 的值

eg.

1
2
3
redis> set lock_name true ex 5 nx
...... 处理业务逻辑 ......
redis> del lock_name
超时问题

redis 的分布式锁机制存在超时问题,如果处理业务逻辑所用的时间超过了锁的过期时间,就会出现加锁和释放锁混乱。比如:

  • 线程1 加锁 set lock_name true ex 5 nx
  • 线程1 处理业务逻辑时间大于 5s 后,redis 自动释放锁
  • 线程2 获得锁 set lock_name true ex 5 nx
  • 线程2 处理业务逻辑中……线程1 业务处理完成,执行 del lock_name
  • 线程2 的锁没有达到超时时间就被释放,从此,锁就混乱了

redis 并没有为锁的超时问题提供可靠的解决方案,所以 redis 更适合为业务逻辑简单的场景加锁,如果业务逻辑处理时间比较长,可以相应延长超时时间。对于要求非常严格的场景,可以考虑使用更加严谨的分布式锁机制,比如 zookeeper,etcd。

锁冲突处理

高并发的场景下,客户端在请求加锁时很可能不成功,处理锁冲突有一下几种方式

直接抛出特定类型的异常

把异常反馈给客户端,由客户端用户决定是否重新连接

sleep

sleep 的方式会阻塞当前连接,对于高并发的应用,这种并不合适

延时队列

把冲突的请求的消息加入另一个队列,另做处理

zookeeper 分布式锁(kazoo)

zookeeper 实现锁的原理
  • 设置一个 znode, 假设为 /lock
  • 客户端连接 zookeeper,并在/lock下创建临时有序(ephemeral_sequential))的子节点, 例如:第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,依次类推。
  • 客户端调用getChild() 获取 /lock下的子节点列表,判断自己创建的子节点是否为所有子节点中序号最小的,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁。
  • 执行业务代码。
  • 完成业务流程后,删除对应的子节点释放锁。
独占锁(排它锁)
1
2
3
4
5
zk = KazooClient()
zk.start()
lock = zk.Lock("/lockpath", "my-identifier")
with lock: # blocks waiting for lock acquisition
# do something with the lock
共享锁

共享锁的读取端:

1
2
3
4
5
zk = KazooClient()
zk.start()
lock = zk.ReadLock("/lockpath", "my-identifier")
with lock: # blocks waiting for outstanding writers
# do something with the lock

共享锁的写入端:

1
2
3
4
5
zk = KazooClient()
zk.start()
lock = zk.WriteLock("/lockpath", "my-identifier")
with lock: # blocks waiting for lock acquisition
# do something with the lock
测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import sys
import time
from kazoo.client import KazooClient


def work(zk, path, sleep, is_lock, times):
lock = zk.Lock("/lock", "incr lock")

if is_lock == "yes":
with lock:
_work(zk, path, sleep, times)
else:
_work(zk, path, sleep, times)


def _work(zk, path, sleep, times):
value = int(zk.get(path)[0])
get_time = time.time()
time.sleep(sleep)
zk.set(path, str(value + 1).encode())
set_time = time.time()
print("time:{:<20} get:{:<5} time:{:<20} set:{:<5} loop_times:{:<5}"
.format(get_time, value, set_time, value + 1, times))


if __name__ == '__main__':
# 为了方便观察而设置的阻塞时间
sleep = 1 if len(sys.argv) < 2 else int(sys.argv[1])
# 是否使用锁 "yes"/"no"
is_lock = "yes" if len(sys.argv) > 2 and sys.argv[2] == "yes" else "no"

# zookeeper 地址
hosts = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
# 在这个节点上做自增操作
path = "/data"

# 启动 kazoo
zk = KazooClient(hosts=hosts)
zk.start()

# 初始化数据节点
exists = zk.exists(path)
if exists is None:
zk.create(path, b"0")

# 执行次数
times = 0
while True:
times += 1
work(zk, path, sleep, is_lock, times)

if times > 10 // sleep:
break

print("该节点当前的值:", zk.get(path)[0])
zk.stop()
观察不加锁的情况

步骤:

  • 使用 zookeeper 客户端手动设置 /data0: set /data 0
  • 开启两个终端,传入不同的 sleep 值运行程序
  • 运行结束后使用 zookeeper 客户端查看 /data 的值: get /data

观察两个终端打印的信息可以发现,一共应该自增了 11 + 6 = 17 次, 最后/data节点的值应该是 17 才对,这说明数据出现了不一致。

观察加锁的情况

步骤:

  • 使用 zookeeper 客户端手动设置 /data0: set /data 0
  • 开启两个终端,传入不同的 sleep 值运行程序
  • 运行结束后使用 zookeeper 客户端查看 /data 的值: get /data

观察两个终端打印的信息可以发现,一共应该自增了 11 + 6 = 17 次, 观察/data节点的值是正确的,再仔细观察两边 getset 的时间可以发现,每当一个客户端获取到锁的时候,另一个客户端就无法获取只能等待。

文章标题:分布式锁(redis & zookeeper)

文章字数:1.4k

本文作者:Waterandair

发布时间:2018-09-10, 23:38:23

最后更新:2019-12-28, 14:03:59

原始链接:https://waterandair.github.io/2018-09-10-distributed-lock.html

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏

github