分布式锁
分布式锁
一、分布式锁
1、介绍
在多线程环境下,为了保证数据的线程安全,锁保证同一时刻,只有一个可以访问和更新共享数据。在单机系统我们可以使用synchronized
锁或者Lock
锁保证线程安全。synchronized
锁是Java
提供的一种内置锁,在单个JVM
进程中提供线程之间的锁定机制,控制多线程并发,只适用于单机环境下的并发控制,但是如果想要锁定多个节点服务,synchronized
就不适用于了,想要在多个节点中提供锁定,在分布式系统并发控制共享资源,确保同一时刻只有一个访问可以调用,避免多个调用者竞争调用和数据不一致问题,保证数据的一致性。
分布式锁就是控制分布式系统不同进程访问共享资源的一种锁的机制。不同进程之间调用需要保持互斥性,任意时刻,只有一个客户端能持有锁。
从单体锁到分布式锁,只不过是将锁的对象从一个进程的多个线程,转成多个进程。
2、特性
- 互斥性
- 分布式锁最基本的特性,同一时刻只能一个节点服务拥有该锁,当有节点获取锁之后,其他节点无法获取锁,不同节点之间具有互斥性。
- 超时机制
不考虑异常,正常情况下,请求获取锁之后,处理任务,处理完成之后释放锁。但是如果在处理任务发生服务异常,或者网络异常时,导致锁无法释放。其他请求都无法获取锁,变成死锁。
为了防止锁变成死锁,需要设置锁的超时时间。过了超时时间后,锁自动释放,其他请求能正常获取锁。
- 自动续期
锁设置了超时机制后,如果持有锁的节点处理任务的时候过长超过了超时时间,就会发生线程未处理完任务锁就被释放了,其他线程就能获取到该锁,导致多个节点同时访问共享资源。对此,就需要延长超时时间。
开启一个监听线程,定时监听任务,监听任务线程还存活就延长超时时间。当任务完成、或者任务发生异常就不继续延长超时时间。
3、实现
实现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
- 利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
3、使用zookeeper实现
- zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
二、实现案例
1、数据库实现
下边基于数据库方式实现分布锁,开始执行任务将任务执行状态更新为4表示任务执行中。
下边的sql语句可以实现更新操作:
update media_process m set m.status='4' where m.id=?
如果是多个线程去执行该sql都将会执行成功,但需求是只能有一个线程抢到锁,所以此sql无法满足需求。
使用乐观锁方式实现更新操作:
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?
多个线程同时执行上边的sql只会有一个线程执行成功。
什么是乐观锁、悲观锁?
synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。
数据库的乐观锁实现方式是在表中增加一个version字段,更新时判断是否等于某个版本,等于则更新否则更新失败,如下方式。
update t1 set t1.data1 = '',t1.version='2' where t1.version='1'
2、Mysql使用for update
实现
Mysql
数据库可以使用select xxx for update
来实现分布式锁。
for update
是一种行级锁,也叫排它锁。如果一条select
语句后面加上for update
,其他事务可以读取,但不能进进行更新操作。
select id,name,store from t_product where id = xxx for update
注意
使用for update
行级锁可以实现分布式锁,通过行级锁锁住库存,where
后条件一定要走索引,不然会触发表锁,会降低MySQL
的性能。
不过基于MySQL
实现的分布式锁,存在性能瓶颈,在Repeatable read
隔离级别下select for update
操作是基于间隙锁实现,这是一种悲观锁,会存在线程阻塞问题。
当有大量的线程请求的情况下,大部分请求会被阻塞等待,后续的请求只能等前面的请求结束后,才能排队进来处理。
3、Zookeeper 实现分布式锁
数据库实现分布式锁存在性能瓶颈,无法支撑高并发的请求。可以使用Zookeeper
实现分布式锁,Zookeeper
提供一种分布式服务协调
的中心化服务,而分布式锁的实现是基于Zookeeper
的两个特性。
提示
1、顺序临时节点:
Zookeeper
数据模型znode
是以多层节点命名的空间,每个节点都用斜杠/
分开的路径来表示,类似文件系统的目录。节点类型分成持久节点和临时节点,每个节点还可以标记有序性。一旦节点被标记为有序性,那整个节点就有自动递增的特点。利用以上的特性,创建一个持久节点作为父节点,在父节点下面创建一个临时节点,并标记该临时节点为有序性。
2、Watch 机制:
Zookeeper
还提供了另一个重要的特性:Watch
(事件监听器),在指定节点的上注册监听事件。当事件触发时,会将事件通知给对应的客户。
了解了Zookeeper
的两个特性之后,那如何使用这两种特性来实现分布式锁呢?
首先,创建一个持久类型的父节点,当用户请求时,就在父节点创建临时类型的子节点,并标记临时节点为有序性。
建立子节点之后,对父节点下面所有临时节点进行排序,判断刚创建的临时节点是否是最小的节点,如果是最小的节点,就获取锁。如果不最小的节点,则等待锁,并且获取该节点上一个顺序节点,并为其注册监听事件,等待触发事件并获得锁。
当请求完毕后,删除该节点,并触发监听事件,下一个顺序节点获得锁,流程如下所示

curator
将上面实现分布式锁的思路封装好了,直接调用即可。
引入curator
依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
使用InterProcessMutex
分布式可重入排它锁,一般流程如下:
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 加锁
interProcessMutex.acquire();
// 执行代码xxxxxxx
// 解锁
interProcessMutex.release();
使用interProcessMutex
获取锁和释放锁:
- 获取锁
interProcessMutex.acquire()
- 释放锁
interProcessMutex.release()
为了避免每次请求都要创建InterProcessMutex
实例,创建InterProcessMutex
的bean
:
private String address = "xxxxx";
@Bean
public InterProcessMutex interProcessMutex() {
CuratorFramework zkClient = getZkClient();
String lockPath = "/lock";
InterProcessMutex lock = new InterProcessMutex(zkClient,lockPath);
return lock;
}
private CuratorFramework getZkClient() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000,3,5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(address)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retry).build();
zkClient.start();
return zkClient;
}
如果获取锁之后,系统发生异常,系统就一直持有锁,后续请求也无法获取锁,导致死锁。需要设置锁超时机制,interProcessMutex.acquire
添加超时时间:
interProcessMutex.acquire(watiTime,TimeUnit);
超时时间设置要根据业务执行时间来设定,不能太长,也不能太短。
Zookeeper一些特点
Zookeeper
实现的分布式锁,相对数据库,性能有很大的提高。Zookeeper
配置集群,发生单点故障时、或者系统挂掉时,临时节点会因为 session 连接断开而自动删除。- 频繁的创建和删除节点,并且每个节点都有
watch
事件,对Zookeeper
服务来说压力大。相对Redis
的性能,还存在差距。
4、Redis NX实现分布式锁
redis实现分布式锁的方案可以在redis.cn网站查阅,地址
使用命令: SET resource-name anystring NX EX max-lock-time
即可实现。
NX:表示key不存在才设置成功。
EX:设置过期时间
如何在代码中使用Set nx去实现分布锁呢?
使用spring-boot-starter-data-redis 提供的api即可实现set nx。
添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version>
</dependency>
添加依赖后,在bean中注入restTemplate。
我们先分析一段伪代码如下:
if(缓存中有){
返回缓存中的数据
}else{
获取分布式锁
if(获取锁成功){
try{
查询数据库
}finally{
释放锁
}
}
}
1、获取分布式锁
使用redisTemplate.opsForValue().setIfAbsent(key,vaue)
获取锁
这里考虑一个问题,当set nx
一个key/value
成功1后,这个key(就是锁)需要设置过期时间吗?如果不设置过期时间当获取到了锁却没有执行finally这个锁将会一直存在,其它线程无法获取这个锁。所以执行set nx时要指定过期时间,即使用如下的命令
SET resource-name anystring NX EX max-lock-time
具体调用的方法是:redisTemplate.opsForValue().setIfAbsent(K var1, V var2, long var3, TimeUnit var5)
2、如何释放锁
释放锁分为两种情况:key到期自动释放,手动删除。
- 1)key到期自动释放的方法
因为锁设置了过期时间,key到期会自动释放,但是会存在一个问题就是 查询数据库等操作还没有执行完时key到期了,此时其它线程就抢到锁了,最终重复查询数据库执行了重复的业务操作。
怎么解决这个问题?
可以将key的到期时间设置的长一些,足以执行完成查询数据库并设置缓存等相关操作。如果这样效率会低一些,另外这个时间值也不好把控。
- 2)手动删除锁
如果是采用手动删除锁可能和key到期自动删除有所冲突,造成删除了别人的锁。
比如:当查询数据库等业务还没有执行完时key过期了,此时其它线程占用了锁,当上一个线程执行查询数据库等业务操作完成后手动删除锁就把其它线程的锁给删除了。
要解决这个问题可以采用删除锁之前判断是不是自己设置的锁,伪代码如下:
if(缓存中有){
返回缓存中的数据
}else{
获取分布式锁: set lock 01 NX
if(获取锁成功){
try{
查询数据库
}finally{
if(redis.call("get","lock")=="01"){
释放锁: redis.call("del","lock")
}
}
}
}
以上代码第11行到13行非原子性,也会导致删除其它线程的锁。
查看文档上的说明:http://www.redis.cn/commands/set.html

在调用setnx命令设置key/value时,每个线程设置不一样的value值,这样当线程去删除锁时可以先根据key查询出来判断是不是自己当时设置的vlaue,如果是则删除。这整个操作是原子的,实现方法就是去执行上边的lua脚本。
Lua 是一个小巧的脚本语言,redis在2.6版本就支持通过执行Lua脚本保证多个命令的原子性。
5、Redisson实现分布式锁
我们选用Java的实现方案 https://github.com/redisson/redisson
Redisson的文档地址:https://github.com/redisson/redisson/wiki/Table-of-Content
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) 。

使用Redisson可以非常方便将Java本地内存中的常用数据结构的对象搬到分布式缓存redis中。
也可以将常用的并发编程工具如:AtomicLong、CountDownLatch、Semaphore等支持分布式。
使用RScheduledExecutorService 实现分布式调度服务。
支持数据分片,将数据分片存储到不同的redis实例中。
支持分布式锁,基于Java的Lock接口实现分布式锁,方便开发。
下边使用Redisson将Queue队列的数据存入Redis,实现一个排队及出队的接口。

添加redisson的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.2</version>
</dependency>
从课程资料目录拷贝singleServerConfig.yaml到config工程下
在redis配置文件中添加:
spring:
redis:
redisson:
#配置文件目录
config: classpath:singleServerConfig.yaml
#config: classpath:clusterServersConfig.yaml
redis集群配置clusterServersConfig.yaml.
Redisson相比set nx实现分布式锁要简单的多,工作原理如下:

- 加锁机制
线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis
- WatchDog自动延期看门狗机制
第一种情况:在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(当不设置默认30秒时),这样的目的主要是防止死锁的发生
第二种情况:线程A业务还没有执行完,时间就过了,线程A 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。
- lua脚本-保证原子性操作
主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性
具体使用RLock操作分布锁,RLock继承JDK的Lock接口,所以他有Lock接口的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。
public interface RRLock {
//----------------------Lock接口方法-----------------------
/**
* 加锁 锁的有效期默认30秒
*/
void lock();
/**
* 加锁 可以手动设置锁的有效时间
*
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);
/**
* tryLock()方法是有返回值的,用来尝试获取锁,
* 如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
*/
boolean tryLock();
/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,
* 只不过区别在于这个方法在拿不到锁时会等待一定的时间,
* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
*
* @param time 等待时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 比上面多一个参数,多添加一个锁的有效时间
*
* @param waitTime 等待时间
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
* waitTime 大于 leaseTime
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 解锁
*/
void unlock();
}
lock():
- 此方法为加锁,但是锁的有效期采用默认30秒
- 如果主线程未释放,且当前锁未调用unlock方法,则进入到watchDog机制
- 如果主线程未释放,且当前锁调用unlock方法,则直接释放锁