分布式锁学习笔记

分布式锁学习笔记

分布式锁学习笔记持续更新中,预计10月下旬更完~

该笔记中除了视频中核心的笔记、代码,还额外补充了视频中老师讲解过的但官方笔记中没有提到的重要内容。

有错误或者遗漏的地方请大家提出来呀,有不懂的地方留言我尽量回答(毕竟我也很菜~

2022.10.14:更换图床解决了图片加载问题

2022.10.16:基于Redis的分布式锁笔记已更完…(p51之前)

2022.10.17:zookeeper基础已更完…(p62之前)

2022.10.21:完结撒花~,笔记全部做完啦(Curator底层原理加锁部分待完善…

1. 传统锁 - 减库存案例

多线程同时操纵同一个服务的共享资源,必然会造成线程安全问题。

解决方案:采用JVM提供的 Reentrantlock 或者 Synchronized

1.1 初始化工程

这一部分跟着视频完成,以下内容省略了未连接数据库之前的部分(个人认为之前未连接数据库的部分仅演示了并发流程,与实际业务关联性不大,也较为容易,可自行实现)。

  1. 创建SpringBoot工程,选择 JDK8 版本,引入 Spring Web

  2. pom.xml引入如下依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.3.1</version>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>
  3. 创建数据库 distributed_lock,在数据库内创建表 db_stock,创建如下字段。

    新建一条数据记录

  4. 修改application.properties文件,规定Web启动端口号

    1
    2
    3
    4
    5
    server.port=10010
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/distributed_lock
    spring.datasource.username=root
    spring.datasource.password=your password
  5. 在目录下创建 pojo 包,用于存放实体类 Stock 对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Data
    @TableName("db_stock")
    public class Stock {

    private Long id;
    private String productCode;
    private String warehouse;
    private Integer count;
    }

    创建 mapper 包,创建 StockMapper

    1
    2
    3
    public interface StockMapper extends BaseMapper<Stock> {

    }

    更改 SpringBoot 启动类,添加如下注解

    1
    @MapperScan("com.ltyzzz.distributedlock.mapper")

    创建 service 包,创建 StockService

    try-finally块之后用于加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Service
    public class StockService {

    @Autowired
    private StockMapper stockMapper;

    private ReentrantLock lock = new ReentrantLock();

    public void deduct() {
    try {
    Stock stock = this.stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_code", "1001"));
    if (stock != null && stock.getCount() > 0) {
    stock.setCount(stock.getCount() - 1);
    this.stockMapper.updateById(stock);
    }
    } finally {

    }
    }

    创建 controller 包,创建 StockController

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    public class StockController {

    @Autowired
    StockService stockService;

    @GetMapping("stock/deduct")
    public String deduct() {
    stockService.deduct();
    return "hello stock deduct!";
    }
    }

1.2 JVM本地锁

1.2.1 使用JMeter进行并发测试

首先启动 SpringBoot 库存项目

利用 JMeter 进行压力测试

  1. 创建 Thread Group

  2. 设置 Thread Properties

  3. 设置 request

  4. 添加 Aggregate Report

  5. 运行压力测试,正常执行情况最终库存应该为0

最终数据库结果如下

此时产生了并发操作

有两种极限情况

  • 所有并发请求都完美的交错执行,正确地完成了数据库更新,最终库存为0

  • 所有并发请求同一时间并发,同时执行了数据库更新操作,并发线程数为100,循环次数为50,最终库存为4950

1.2.2 加入JVM本地锁解决并发问题

修改 StockService。使用 ReentrantLockSynchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
public class StockService {

@Autowired
private StockMapper stockMapper;

private ReentrantLock lock = new ReentrantLock();

public void deduct() {
lock.lock();
try {
Stock stock = this.stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_code", "1001"));
if (stock != null && stock.getCount() > 0) {
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
} finally {
lock.unlock();
}
}
}

启动 SpringBoot 工程,继续进行压力测试,最终库存为0。

1.2.3 JVM本地锁失效的情况

共有三种情况

多例模式

修改 StockService,设置多例模式

添加注解

1
@Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)

JDK动态代理使用 INTERFACES,CGLIB代理使用 TARGET_CLASS

默认情况下使用JDK动态代理,原生Spring默认是JDK动态代理

而SpringBoot在2.0版本之后默认使用CGLIB代理

启动 SpringBoot 工程,继续进行压力测试,最终库存不为0,说明本地锁失效

事务

service 方法上添加事务注解 @Transactional

启动 SpringBoot 工程,继续进行压力测试,最终库存不为0,说明本地锁失效

事务采用AOP思想实现:

  1. 在AOP的前置方法内开启事务

  2. 进入service方法,获取锁

  3. 进行查询库存、更新库存操作

  4. 释放锁

  5. 在AOP的后置方法内提交或回滚事务

在并发情况下,如图所示

b用户在a用户还未提交事务之前,就获取到了a用户释放的锁,读取到了库存,但该库存为旧的库存(因为a用户还未提交事务),所以b用户是在旧库存的基础上进行修改,因此产生了并发问题。

解决方案:设置事务隔离级别

将注解改为:@Transactional(isolation = Isolation.READ_UNCOMMITTED)

即此时b用户可以读取到a用户未提交的数据,即该数据为最新数据,可以解决此问题。

但实际业务中不能这样去使用,采用事务就是为了保证原子性,要么全部成功,要么全部失败。假设b用户在这种方式下读取到了最新数据,虽然可以解决超卖问题,但是如果a回滚了事务,那么b用户读取到的数据就是错误的,又会产生新的混乱。

集群部署

多个服务器部署

去掉 @Transactional 注解,保证在单机模式下锁生效,无并发问题。

这部分看视频做

  1. 设置两个SpringBoot启动类,设置不同的端口号,并启动

  2. 更改 nginx 配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    upstream distributedLock {
    server localhost:10086;
    server localhost:10010;
    }

    server {
    listen 80;
    server_name localhost;
    location / {
    proxy_pass http://distributedLock;
    }
    }
  3. 启动nginx

任务管理器中出现两个nginx即代表启动成功,一个工作进程,一个主进程

以默认端口80进行访问:localhost:80/stock/deduct

之后,使用JMeter进行压力测试,5000个请求成功发出,但最终库存不为0,锁失效。

1.3 单SQL语句解决并发问题

1.3.1 实现过程

Service 代码流程

  1. 查询库存

  2. 判断库存是否充足

  3. 更新库存到数据库

该流程可以使用一个SQL语句实现

StockMapper 添加方法 updateStock,

参数为商品编号 productCode 与减库存数量count

1
2
3
4
public interface StockMapper extends BaseMapper<Stock> {
@Update("update db_stock set count = count - #{count} where product_code = #{productCode} and count >= #{count}")
int updateStock(@Param("productCode") String productCode, @Param("count") Integer count);
}

SQL语句如下:

1
update db_stock set count = count - #{count} where product_code = #{productCode} and count >= #{count}

在MySQL中,更新、新增、删除写操作,会加悲观锁,具有原子性,可以解决并发问题。

StockService 中调用该Mapper方法

1
2
3
public void deduct() {
this.stockMapper.updateStock("1001", 1);
}

在集群部署情况下,使用JMeter进行压力测试,5000个请求成功发出,最终库存为0。

1.3.2 优缺点

优点:

  1. 可以解决多例模式、事务、集群部署下的并发问题

缺点:

  1. 锁范围问题

  2. 同一个商品有多条库存记录(多仓库)

    • 如果仍然使用该方法,则会对所有仓库库存数进行自减

    • 正常情况应灵活从各个仓库进行减库存发货

  3. 无法记录库存变化前后的状态

    • 原因:单条SQL语句
锁范围问题

表级锁 VS 行级锁

  • 未添加索引之前为 表级锁

    • 在用户a中,先开启事务,再去更新 productCode = 1001 记录,但不提交事务

    • 这导致:在用户b中,更新 productCode = 1002 记录,但进入了阻塞

    • 用户a提交了事务之后,用户b才能提交成功,证明为表级锁

  • 添加索引之后为 行级锁

    1. 锁的查询或更新条件必须为索引字段

      e.g.:根据 productCode 进行查询或更新,为其添加索引

    2. 再次进行之前的操作,用户b不会进入阻塞,即变为了行级锁

    3. 使用行级锁的条件:

      . 查询或者更新条件必须为索引字段

      . 查询或者更新条件必须为具体值

      . SQL语句中where条件有索引时,不能为 'like' 或者 '!=',否则为表级锁

1.4 悲观锁

select … for update

1.4.1 情景引入

用户a与b并发执行

  1. 用户a开启事务,并使用如下SQL语句查询库存

    1
    select * from db_stock where product_code='1001';
  2. 用户b使用如下SQL语句

    1
    update db_stock set count = count - 1 where id=1;

这时库存已经发生变化,而用户a不知情,仍会已它查出来的库存数作为标准进行下一步操作

解决方案:用户a采用如下SQL语句,进行加锁操作

1
select * from db_stock where product_code='1001' for update;

加锁范围:此时满足行级锁条件,因此该悲观锁为行级锁,只锁对应的记录

1.4.2 代码实现

  1. 在StockMapper中添加查询库存方法

    1
    2
    @Select("select * from db_stock where product_code=#{productCode} for update")
    List<Stock> queryStock(String productCode);
  2. 更新StockService方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Transactional
    public void deduct() {
    // 查询库存信息并锁定
    List<Stock> stocks = this.stockMapper.queryStock("1001");
    // 取第一个库存
    Stock stock = stocks.get(0);
    // 判断库存是否充足
    if (stock != null && stock.getCount() > 0) {
    // 扣减库存
    stock.setCount(stock.getCount() - 1);
    this.stockMapper.updateById(stock);
    }
    }
  3. 使用压力测试工具进行测试,库存数量减为0,解决了并发问题

1.4.3 优缺点

优点:更加灵活

  1. 解决同一个商品有多条库存记录的问题

    可以先根据商品编号查询库存记录,查询到之后再根据现有技术分析。

  2. 解决无法记录库存变化前后状态的问题

缺点:

  1. 性能问题:效率比JVM本地锁稍高一些,但比单SQL语句低

  2. 死锁问题:对多条数据加锁时,加锁顺序必须一致

    e.g.:用户a与b执行如下顺序操作

    1. 用户a对 id=1 商品加锁

    2. 用户b对 id=2 商品加锁

    3. 用户a想对 id=2 商品加锁,但陷入阻塞

    4. 用户b想对 id=1 商品加锁,报错 Deadlock

  3. 库存操作需要统一

1.5 乐观锁

借助时间戳、version版本号实现,利用 CAS 机制

CAS:Compare and Swap 比较并交换

关于CAS,可以看看这篇博客:漫画:什么是CAS机制

  • 如果变量 X 等于旧值 A,则更新将 X 更新为 B

  • 若不等于,则放弃更新

1.5.1 实现思路

  1. db_stock 表中添加 version 字段

    类型为int,长度11,Not Null

  2. 查询对应的商品库存以及 version

    1
    SELECT * FROM db_stock where product_code='1001';
  3. 执行更新库存操作时,where条件中需要确认当前 version 是否等于之前查出来的

    . 若等于,则更新库存操作成功

    . 否则,则说明存在并发操作,该条库存数据已被修改。继续查询重试

    1
    UPDATE db_stock SET count=4999, version=version+1 WHERE id=1 AND version=0;

1.5.2 代码实现

在 Stock 实体类中添加 version 属性,类型为Integer

修改 StcokService 中的deduct()方法

第一版代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Transactional
public void deduct() {
// 查询库存信息并锁定
List<Stock> stocks = this.stockMapper.selectList(new QueryWrapper<Stock>().eq("product_code", "1001"));
// 取第一个库存
Stock stock = stocks.get(0);
// 判断库存是否充足
if (stock != null && stock.getCount() > 0) {
// 扣减库存
stock.setCount(stock.getCount() - 1);
Integer version = stock.getVersion();
stock.setVersion(version + 1);
int update = this.stockMapper.update(stock, new UpdateWrapper<Stock>().eq("id", stock.getId()).eq("version", version));
if (update == 0) {
deduct();
}
}
}

采用JMeter压力测试工具测试时,报错

错误分析:

  1. stack over flow:栈溢出

    这是由于高并发的情况下,更新失败率很高,就会不断地递归调用deduct方法,导致栈溢出。

    解决方案:调用线程类中的sleep方法,暂时睡眠,避开并发高峰

  2. 连接超时错误

    这是由于开启了事务注解,为手动事务。

    当执行到更新操作时,会对数据进行加锁。而当前请求无法更新,就会一直调用deduct方法,并一直持有锁。其他请求进入代码,会进入阻塞状态,直至连接超时。

    而关闭了事务注解,为自动事务。若更新操作执行失败,会放弃锁。

最终代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void deduct() {
// 查询库存信息并锁定
List<Stock> stocks = this.stockMapper.selectList(new QueryWrapper<Stock>().eq("product_code", "1001"));
// 取第一个库存
Stock stock = stocks.get(0);
// 判断库存是否充足
if (stock != null && stock.getCount() > 0) {
// 扣减库存
stock.setCount(stock.getCount() - 1);
Integer version = stock.getVersion();
stock.setVersion(version + 1);
int update = this.stockMapper.update(stock, new UpdateWrapper<Stock>().eq("id", stock.getId()).eq("version", version));
if (update == 0) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
deduct();
}
}
}

通过JMeter测试,最终库存减为0

1.5.3 优缺点

推荐阅读:Java并发问题–乐观锁与悲观锁以及乐观锁的一种实现方式-CAS

优点:

  1. 乐观锁在竞争不激烈时,较悲观锁更具有优势,因为悲观锁会锁住代码块,并且加锁、释放锁、线程上下文均占用额外资源。

缺点:

  1. 在高并发情况下,乐观锁的性能不如悲观锁,因为重试次数过多。

  2. 乐观锁存在ABA问题

    . 在查询与更新的中间,X 值被多次更改,但最终又回到了查询时的值

  3. 读写分离情况下,乐观锁不可靠

    从数据库与主数据库进行网络传输,具有较大时延,可能主数据库内是新数据,而从数据库内是旧数据,乐观锁下会造成从数据库不断地重试,

1.6 MySQL锁总结

性能:单SQL语句 > 悲观锁 > JVM锁 > 乐观锁

  • 追求极致性能、业务场景简单、无需记录数据更新前后变化的情况下 -> 单SQL语句

  • 并发量低、读多于写、竞争不激烈的情况下 -> 乐观锁

  • 并发量高、冲突激烈 -> 悲观锁

  • 不推荐JVM锁

1.7 Redis锁

1.7.1 并发问题引入

  1. 在Maven工程pom.xml文件中加入依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
  2. 在application.properties文件中配置redis

    1
    2
    spring.redis.host=localhost
    spring.redis.password=123456
  3. 更改 StockService类中的deduct方法

    采用StringRedisTemplate更加方便

    而RedisTemplate需要手动设置序列化器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Autowired
    private StringRedisTemplate redisTemplate;

    public void deduct() {
    // 查询库存信息
    String stock = this.redisTemplate.opsForValue().get("stock");
    // 判断库存是否充足
    if (stock != null && stock.length() != 0) {
    Integer st = Integer.valueOf(stock);
    if (st > 0) {
    // 扣减库存
    this.redisTemplate.opsForValue().set("stock", String.valueOf(--st));
    }
    }
    }
  4. 在redis-cli命令窗口中,设置库存:set stock 5000

启动SpringBoot与压力测试工具JMeter,最终库存不为0,出现并发问题。

1.7.2 解决方案

  1. JVM本地锁机制(较简单不做演示)

  2. Redis乐观锁:watch multi exec

    1. watch:监控一个或多个key的值,如果exec执行之前,如果key的值发生变化,则取消事务执行。

    2. multi:开启事务

    3. exec:执行事务

1.7.3 代码实现

修改StockService中的deduct方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void deduct() {
this.redisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch("stock");
// 查询库存信息
String stock = operations.opsForValue().get("stock").toString();
// 判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// multi
operations.multi();
// 扣减库存
operations.opsForValue().set("stock", String.valueOf(--st));
// exec 执行事务
List exec = operations.exec();
// 如果事务执行结果为空,代表操作失败,重试
if (exec == null || exec.size() == 0) {
try {
Thread.sleep(40);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
deduct();
}
return exec;
}
}
return null;
}
});
}

采用JMeter进行并发测试后,库存减为0,解决了并发问题

但是吞吐量性能较低,甚至可能由于电脑性能问题,出现连接不够用的情况。

2. Redis分布式锁

2.1 实现思路

借助于redis中的命令 setnx(key, value)

设置 key 为lock

  • lock 存在,则说明有其他请求已经获取到锁,则当前请求重复重试
  • lock 不存在,则说明当前锁未被获取,当前请求获取锁成功,继续执行业务操作
  • 设置 lock 之后,只有一个请求可以获取到锁并执行成功,其他请求只能等待

2.2 基本代码实现

2.2.1 递归版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void deduct() {
// 加锁setnx
Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "123");
// 递归调用
if (!lock) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
deduct();
} else {
try {
// 查询库存信息
String stock = redisTemplate.opsForValue().get("stock").toString();
// 判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
// 解锁
this.redisTemplate.delete("lock");
}
}
}

2.2.2 循环版本

注意:即使不采用递归版本,在重试获取锁的过程中,也可以使用线程睡眠,这样可以减小锁的竞争压力,提升性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void deduct() {
// 加锁setnx
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "123")) {
// 循环重试
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
// 查询库存信息
String stock = redisTemplate.opsForValue().get("stock").toString();
// 判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
// 解锁
this.redisTemplate.delete("lock");
}
}

2.3 问题解决

2.3.1 死锁与原子性

在某一请求获取到lock锁之后,当前服务器突然宕机,造成该锁无法得到释放,此时其他请求仍然处于不断地递归或循环重试获取锁,造成死循环。

解决方案:获取到lock之后,为lock设置过期时间,expire 指令

但是:可能在 获取lock 与 为lock设置过期时间 之间,服务器发生宕机。

因此,需要确保两个操作的原子性:采用如下指令,将多个操作复合到一个指令中

1
set lock 123 ex 20 nx

修改加锁部分的代码:

1
2
3
4
5
6
7
8
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "123", 3, TimeUnit.SECONDS)) {
// 循环重试
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

2.3.2 防误删

假设处理请求的时长大于锁的有效时间,当某一请求还未执行完当前操作时,Redis锁便会自动释放掉(删除lock),等到该请求执行完当前操作后,还会手动地将Redis锁释放掉。此时,可能别的请求已经获取到了锁,那么该锁很有可能被该请求误删掉。

解决方案:

为了防止出现这一情况,需要为当前锁设置唯一标识UUID,作为锁的值。

在删除锁之前,先判断当前锁是否属于自己,然后再进行删除。

修改 StockService 的 deduct方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void deduct() {
String uuid = UUID.randomUUID().toString();
// 加锁setnx
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS)) {
// 循环重试
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
// 查询库存信息
String stock = redisTemplate.opsForValue().get("stock").toString();
// 判断库存是否充足
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
// 先判断,再解锁
if (StringUtils.equals(this.redisTemplate.opsForValue().get("lock"), uuid)) {
this.redisTemplate.delete("lock");
}
}
}

判断与删除操作需要保证原子性,否则还会出现并发问题

  • 刚判断完该锁是自己的,还没进行删除,锁就到期自动释放且被其他请求获取,此时会造成误删操作

借助 lua 脚本实现原子性操作:Lua 教程 | 菜鸟教程

redis-cli支持lua脚本,可以一次性给redis发送多个指令。

  • redis为单线程,执行指令遵守 one-by-one 规则
1
EVAL script numkeys [key [key ...]] [arg [arg ...]]

删除LUA脚本:

1
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end

更改代码中finally部分

1
2
3
4
5
6
// 先判断,再解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del', KEYS[1]) " +
"else return 0 " +
"end";
this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);

2.3.3 可重入性

之前的加锁指令采用的是Redis中的 setnx 指令,即锁的名字固定为lock(键值固定),一旦加锁成功(键设置成功),该线程便无法继续加锁。如下即为具体情景代码:

某线程执行a方法,获取到锁之后,其又需要继续执行b方法,需要继续获取锁。但此时锁已经被自己占有,相当于自己和自己产生了死锁,这是不可重入锁带来的问题。

1
2
3
4
5
6
7
public synchronized void a() {
b();
}

public synchronized void b() {
// pass
}

解决方案:可重入锁

可重入锁可:当某一线程获得锁之后,需要再次执行加锁方法时(锁对象为同一个),将加锁次数加1即可。执行完毕之后,释放锁时对加锁次数减1。当加锁次数等于0时,说明该锁已经被完全释放。

当可重入锁需要在分布式环境实现时,需要统计加锁次数。

实现方案:Redis Hash + lua脚本

加锁 -> 判断锁是否存在:exists

  • 若不存在,则直接创建锁并获取:hset key field value

  • 若存在,则判断当前锁是否属于自己:hexists

    • 若属于,则重入:hincrby key field increment

    • 若不属于,递归或循环自旋重试

加锁脚本

1
2
3
4
5
6
7
8
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 
then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end

解锁 -> 判断锁是否属于自己:hexists

  • 若不属于,说明正在恶意释放锁,返回 nil

  • 若属于,对加锁次数减1,并判断当前加锁次数是否为0

    • 若为0,则返回1,代表完全解锁成功

    • 若不为0,则返回0

解锁脚本

1
2
3
4
5
6
7
8
9
if redis.call('hexists', KEYS[1], ARGV[1]) == 0
then
return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0
then
return redis.call('del', KEYS[1])
else
return 0
end

代码实现

  1. 创建分布式锁的工厂类方法。通过一个工厂类方法,可以获得不同的分布式锁(基于Redis、基于Zookeeper、基于MySQL)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Component
    public class DistributedLockClient {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private String uuid;

    public DistributedLockClient() {
    this.uuid = UUID.randomUUID().toString();
    }

    public DistributedRedisLock getRedisLock(String lockName) {
    return new DistributedRedisLock(redisTemplate, lockName, uuid);
    }
    }
  2. 创建分布式Redis锁的实现类,其实现了Lock接口。通过之前编写好的加锁、解锁脚本,实现了可重入锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    public class DistributedRedisLock implements Lock {

    @Autowired
    private StringRedisTemplate redisTemplate;

    public String lockName;

    private String uuid;

    private long expire = 30;

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
    this.redisTemplate = redisTemplate;
    this.lockName = lockName;
    this.uuid = uuid;
    }

    @Override
    public void lock() {
    this.tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
    try {
    return this.tryLock(-1L, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    this.expire = time != -1 ? unit.toSeconds(time) : expire;
    String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 \n" +
    "then\n" +
    " redis.call('hincrby', KEYS[1], ARGV[1], 1)\n" +
    " redis.call('expire', KEYS[1], ARGV[2])\n" +
    " return 1\n" +
    "else\n" +
    " return 0\n" +
    "end";
    while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))) {
    Thread.sleep(50);
    }
    return true;
    }

    @Override
    public void unlock() {
    String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0\n" +
    "then \n" +
    " return nil\n" +
    "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0\n" +
    "then \n" +
    " return redis.call('del', KEYS[1])\n" +
    "else \n" +
    " return 0\n" +
    "end";
    Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());
    if (flag == null) {
    throw new IllegalMonitorStateException("this lock does not belong to you");
    }
    }

    @Override
    public Condition newCondition() {
    return null;
    }

    String getId() {
    return uuid + ":" + Thread.currentThread().getId();
    }
    }
  3. 修改StockService中的deduct方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public void deduct() {
    DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");
    redisLock.lock();
    try {
    // 查询库存信息
    String stock = redisTemplate.opsForValue().get("stock").toString();
    // 判断库存是否充足
    if (stock != null && stock.length() != 0) {
    Integer st = Integer.valueOf(stock);
    if (st > 0) {
    // 扣减库存
    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
    }
    }
    this.test();
    } finally {
    redisLock.unlock();
    }
    }

需要注意的一点是:如何去标识每一个服务中的每一个线程的锁的uuid?

这里的解决方案是:通过服务生成的uuid与每个线程的线程id拼接得到唯一标识uuid

  1. 每一个服务是通过DistributedLockClient这一工厂类获得锁,由于该类交给Spring容器管理,是单例对象。即每一个服务只有一个唯一的DistributedLockClient对象。

    因此可以在该类构造方法上生成一个uuid,作为当前服务的唯一标识

  2. 每一个线程也会有一个唯一的id

  3. 将两者拼接起来(getId方法),即可唯一标识每一个服务的每一个线程。若需要进行重入操作,同一线程重复获取锁,可通过此唯一标识进行判断。

反之,如果每次通过DistributedLockClient获取Redis分布式锁,均重新创建一个新的uuid,那么便永远无法实现可重入锁。

  • 因为在同一线程进行可重入操作获取锁时,会出现内层锁与外层锁uuid不匹配的错误情况,与理论事实相矛盾。

2.3.4 自动续期

假设处理请求的时长大于锁的有效时间,会出现请求还未处理完,锁就被释放了,此时可能引起并发问题。

解决方案:

  • 采用Timer定时任务,每隔一段时间为锁续期

  • 配合lua脚本执行最后的续期操作

    先判断锁是否存在。若存在则重置过期时间

    1
    2
    3
    4
    5
    6
    7
    if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
    then
    redis.call('expire', KEYS[1], ARGV[2])
    return 1
    else
    return 0
    end

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void renewExpire() {
String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) \n" +
"then \n" +
" redis.call('expire', KEYS[1], ARGV[2])\n" +
" return 1\n" +
"else \n" +
" return 0\n" +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
renewExpire();
}
}
}, this.expire * 1000 / 3);
}

其中需要注意的点是:

  • 之前是通过 getId() 方法,将当前服务的uuid与当前线程的id拼接到一起,组成唯一标识。

    而现在的问题为:Timer定时任务是子线程,它的本意是监测主线程的锁的过期时间并为其续期,但是在其内部调用getId方法,得到的锁唯一标识是拼接的Timer定时任务子线程的id。

  • 因此,需要对代码进行进一步修改与优化,删去getId方法,修改DistributedRedisLock的构造方法,在创建该类时便生成唯一的uuid。

    1
    2
    3
    4
    5
    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {
    this.redisTemplate = redisTemplate;
    this.lockName = lockName;
    this.uuid = uuid + ":" + Thread.currentThread().getId();
    }

2.4 RedLock红锁算法

我结合老师的讲解,单独写了一篇图文详解RedLock(结合官方文档)

其中一些内容参照了官方文档以及其他大佬的博客

这一部分均为理论内容,没有实际代码

2.5 Redisson

2.5.1 环境搭建

参考 Redisson文档

  1. 引入依赖

    1
    2
    3
    4
    5
    <dependency>  
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.17.1</version>
    </dependency>
  2. 由于Redisson不能使用yml进行配置,因此需要编写额外的配置类

    新建 config 包,创建 RedissonConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Configuration
    public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456").setDatabase(0);
    return Redisson.create(config);
    }
    }
  3. 修改 StockService 中的 deduct 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Autowired  
    private RedissonClient redissonClient;

    public void deduct() {
    RLock lock = this.redissonClient.getLock("lock");
    lock.lock();
    // 查询库存信息
    String stock = redisTemplate.opsForValue().get("stock").toString();
    // 判断库存是否充足
    try {
    if (stock != null && stock.length() != 0) {
    Integer st = Integer.valueOf(stock);
    if (st > 0) {
    // 扣减库存
    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
    }
    }
    } finally {
    lock.unlock();
    }
    }

    也可以设置lock的过期时间,到时间时自动释放锁

    1
    lock.lock(10, TimeUnit.SECONDS);

常用的Redisson配置

2.5.2 可重入锁

底层原理

RLock接口的lock方法是通过继承JUC包下的Lock接口获得,最终RedissonLock间接实现了RLock接口以及其中的中法。

继承关系如图所示:

RedissonLock中具体lock方法实现如下:

1
2
3
4
5
6
7
8
@Override  
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}

根据顺序,一步步进入方法查看:

lock -> tryAcquire -> tryAcquireAsync -> tryLockInnerAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {  
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

可以看到Redisson中的加锁方法也是采用lua脚本

Redisson锁的自动定时续期是通过:scheduleExpirationRenewal 方法实现

其方法内部通过 renewExpiration 重置过期时间,采用TimerTask完成定时任务

解锁方法unlock也是采用lua脚本

Redisson自动续期、可重入与防死锁

实现方案与之前手动实现的Redis分布式锁类似

  1. 如果当前代码运行时间过长,超出了锁的过期时间,但是程序仍在执行过程中,所以需要为锁续期

    -> Redisson内部提供了一个监控锁的看门狗(定时任务),它的作用是在Redisson实例被关闭前,不断地延长锁的有效期。默认情况下,看门狗每隔30秒检查一次。

  2. 如果负责储存分布式锁的Redisson节点宕机后,而这个锁恰好处于锁住状态,这时候会出现死锁。

    -> Redisson通过为锁设置超时时间(有效期),若有效期内没有定时任务为其续期,则其超过该时间就会自动解开。

测试代码

测试自动续期代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void deduct() {
RLock lock = this.redissonClient.getLock("lock");
lock.lock();
// 查询库存信息
String stock = redisTemplate.opsForValue().get("stock").toString();
// 判断库存是否充足
try {
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
TimeUnit.SECONDS.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

测试可重入代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void deduct() {
RLock lock = this.redissonClient.getLock("lock");
lock.lock();
// 查询库存信息
String stock = redisTemplate.opsForValue().get("stock").toString();
// 判断库存是否充足
try {
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
lock.unlock();
}
this.test();
}

public void test() {
RLock lock = this.redissonClient.getLock("lock");
lock.lock();
System.out.println("测试可重入锁...");
lock.unlock();
}

2.5.3 公平锁

公平的体现:公平锁保证当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。

所有的请求线程会在一个队列中排队。如果某线程宕机后,Redisson等待5秒后继续下一个线程。

举个🌰:若前面有5个线程宕机,那么延迟将会是25秒。

顺便提一点:个人认为此处Redisson官方的中文文档描述略有不妥,不是前面5个线程处于等待状态,而是处于宕机状态

所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

对照英文版本:

All waiting threads are queued and if some thread has died then Redisson waits its return for 5 seconds. For example, if 5 threads are died for some reason then delay will be 25 seconds.

测试代码
  1. 在StockController添加新的测试方法

    1
    2
    3
    4
    5
    @GetMapping("test/fair/lock/{id}")
    public String testFairLock(@PathVariable("id") Long id) {
    this.stockService.testFairLock(id);
    return "hello test fair lock";
    }
  2. 在StockService添加新的测试方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void testFairLock(Long id) {
    RLock fairLock = this.redissonClient.getFairLock("fairLock");
    fairLock.lock();
    System.out.println("测试公平锁==================" + id);
    try {
    TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    } finally {
    fairLock.unlock();
    }
    }

启动SpringBoot,依次在浏览器请求 test/fair/lock/{id} 5次

可以看到最终的输出顺序和请求顺序一致,说明实现了公平锁

image-20221014004141382

如果开启了两个SpringBoot并采用nginx进行负载均衡,会出现一个请求被发送多次的情况。

这是因为nginx有纠错机制,当发送一个请求长时间未得到响应时,nginx会再次发送。

解决方案:配置nginx.conf文件,将超时时间延长。重新配置之后记得重新启动nginx。

1
2
3
4
5
6
7
8
9
10
server {
listen 80;
server_name localhost;
proxy_connect_timeout 12000
proxy_send_timeout 12000
proxy_read_timeout 12000
location / {
proxy_pass http://distributedLock;
}
}

2.5.4 联锁

Redisson可以将多个RLock对象关联为一个联锁。即要么同时上锁,要么同时解锁

1
2
3
4
5
6
7
8
9
10
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();

但是缺点很明显:任何一个Redis宕机之后,其他所有的Redis都不可用,一般不用。

2.5.5 红锁

红锁与联锁的不同之处在于:红锁只要大部分(过半数)节点上加锁成功就算成功。红锁也不重要

1
2
3
4
5
6
7
8
9
10
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

2.5.6 读写锁

读写锁特性为:读操作可以并发,但是写操作之间不可以并发,读操作与写操作不可以并发

举例论证读写锁存在的必要:

  • 假设读操作之间可以并发,写操作之间不可以并发。这种加锁方式保证了写操作的安全性
  • 但这种方式确保不了读与写操作的安全性,因为没有限制读和写的并发,因此需要引入读写锁
测试代码
  1. 在StockController中新增以下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @GetMapping("test/read/lock")
    public String testReadLock() {
    this.stockService.testReadLock();
    return "hello test read lock";
    }

    @GetMapping("test/write/lock")
    public String testWriteLock() {
    this.stockService.testWriteLock();
    return "hello test write lock";
    }
  2. 在StockService中新增以下代码:选择超时自动解锁的方式加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void testReadLock() {
    RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");
    rwLock.readLock().lock(10, TimeUnit.SECONDS);
    System.out.println("执行写操作...");
    //rwLock.readLock().unlock();
    }

    public void testWriteLock() {
    RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");
    rwLock.writeLock().lock(10, TimeUnit.SECONDS);
    System.out.println("执行写操作...");
    //rwLock.writeLock().unlock();
    }

测试对应的方法,然后查看redis中读写锁是如何存储的。

  1. 对于读,可以存在多个读操作,mode为read

    image-20221015231801864

  2. 对于写,同一时间只能有一个写操作,其余均阻塞,mode为write

    image-20221015231952586

  3. 在读的过程中,写操作会被阻塞;在写的过程中,读操作也会被阻塞

2.5.7 信号量

与JUC中信号量的共同之处:都可以对请求进行限流,均适用于需求量大资源有限的情景

JUC信号量

回顾JUC中的信号量:模拟6个线程争抢有限的3个资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到了停车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
System.out.println(Thread.currentThread().getName() + "停了一会儿开走了");
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, i + "号车").start();
}

输出如下:

image-20221015233103573

但是JUC信号量只能对单机情况下进行限流,无法做到对分布式环境进行限流

分布式信号量
  1. 在StockController中添加如下代码

    1
    2
    3
    4
    5
    @GetMapping("test/semaphore")
    public String testSemaphore() {
    this.stockService.testSemaphore();
    return "hello test semaphore";
    }
  2. 在StockService中添加如下代码:通过redis记录争抢资源的顺序日志,更直观地证明分布式下的正确性

    如果要更改semaphore可允许线程数的值,记得将redis中旧的值删去(或者起一个新的名字)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void testSemaphore() {
    RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");
    semaphore.trySetPermits(3); // 设置限流的线程数
    try {
    semaphore.acquire(); // 获取资源成功之后才可继续处理业务操作,否则阻塞
    this.redisTemplate.opsForList().rightPush("log", "10010获取资源,开始处理..." + Thread.currentThread().getName());
    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
    this.redisTemplate.opsForList().rightPush("log", "10010处理完成,释放资源..." + Thread.currentThread().getName());
    semaphore.release(); // 手动释放资源
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }

启动两个服务,redis中的输出日志如下:

image-20221016001007227

2.5.8 倒计数器(闭锁)

一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行

JUC倒计数器

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":开始执行...");
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ":结束执行...");
countDownLatch.countDown();
}, i + "号线程").start();
}
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ":其他线程均已结束,执行最后任务...");

输出如下:

image-20221016002330475

分布式倒计数器
  1. 在StockController中添加如下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @GetMapping("test/await")
    public String testLAwait() {
    this.stockService.testAwait();
    return "wait for other threads...";
    }

    @GetMapping("test/countdown")
    public String testCountDown() {
    this.stockService.testCountDown();
    return "execute...";
    }
  2. 在StockService中添加如下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public void testAwait() {
    RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");
    cdl.trySetCount(6);
    try {
    cdl.await(); // 阻塞,直到其余所有线程执行完毕
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }

    public void testCountDown() {
    RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");
    cdl.countDown();
    }

3. Zookeeper分布式锁

3.1 Zookeeper基本指令

  1. 查看某个目录下的子节点

    1
    2
    ls /
    ls /node
  2. 创建节点 create命令

    在根目录创建 node 节点,其内容为 “abc”

    1
    create /node "abc"
  3. 更新节点内容 set命令

    1
    set /node "aaa"
  4. 获取节点数据 get命令

    1
    2
    get /node
    get -s /node 获取详细内容
  5. 删除节点 delete命令

    1
    delete /node

3.2 ZNode节点类型

永久节点:一旦节点被创建就一直存在,及时Zookeeper宕机也不会被删除,只能手动将其删除

1
create /node

临时节点:临时节点的生命周期与客户端会话相绑定,会话消失则节点也会随之消失。并且临时节点只能做叶子节点,不能创建子节点

1
create -e /node

永久序列化节点:具有永久节点的特性,而且在创建该类节点时,Zookeeper会在节点名称后加入顺序编号

1
create -s /node

临时序列化节点:具有临时节点的特性,而且在创建该类节点时,Zookeeper会在节点名称后加入顺序编号

1
create -e -s /node

3.3 事件监听

Watcher事件监听器:监听是一次性的,一个监听指令只能监听到一次变化

同时开启两个zk客户端,一个客户端用于监听,另一个客户端用于操作节点

  1. 节点创建:NodeCreated

    1
    stat -w /xx
  2. 节点删除:NodeDeleted

    1
    stat -w /xx
  3. 节点数据变化:NodeDataChanged

    1
    get -w /xx
  4. 子节点变化:NodeChildrenChanged

    1
    2
    # 监听xx下的子节点变化
    ls -w /xx

3.4 Java客户端

3.4.1 案例搭建

  1. 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
    <exclusions>
    <exclusion>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
  2. 创建ZkTest类,在main方法编写如下代码

    • Zookeeper构造方法中需要填的参数
      • ip地址与端口号port
      • 超时时间
      • Watcher事件监听器匿名类
    • 用CountDownLatch,使Zookeeper在获取到链接之后才进行之后的操作
    • Watcher事件监听中:监听获取链接与关闭链接两个事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    ZooKeeper zooKeeper = null;
    CountDownLatch countDownLatch = new CountDownLatch(1);
    try {
    zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
    Event.KeeperState state = watchedEvent.getState();
    if (Event.KeeperState.SyncConnected.equals(state)) {
    System.out.println("获取链接:" + watchedEvent);
    countDownLatch.countDown();
    } else if (Event.KeeperState.Closed.equals(state)) {
    System.out.println("关闭链接:" + watchedEvent);
    }
    }
    });
    countDownLatch.await();
    System.out.println("执行操作...");
    } catch (IOException e) {
    throw new RuntimeException(e);
    } finally {
    if (zooKeeper != null) {
    zooKeeper.close();
    }
    }
    1
    2
    3
    获取链接:WatchedEvent state:SyncConnected type:None path:null
    执行操作...
    关闭链接:WatchedEvent state:Closed type:None path:null

3.4.2 操作节点

  1. 创建节点:第一个参数为节点名路径,第二参数为节点内容,第三个参数为节点权限,第四个参数为节点类型

    1
    2
    3
    4
    5
    6
    7
    8
    // 永久节点
    zooKeeper.create("ltyzzz/test1", "hello zookeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    // 临时节点
    zooKeeper.create("ltyzzz/test2", "hello zookeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    // 永久序列化节点
    zooKeeper.create("ltyzzz/test3", "hello zookeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    // 临时序列化节点
    zooKeeper.create("ltyzzz/test3", "hello zookeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  2. 查询节点

    1. 判断节点是否存在

      1
      2
      3
      4
      5
      6
      Stat exists = zooKeeper.exists("/ltyzzz", false);
      if (exists != null) {
      System.out.println("当前节点存在");
      } else {
      System.out.println("当前节点不存在");
      }
    2. 获取当前节点中的数据内容

      1
      2
      byte[] data = zooKeeper.getData("/ltyzzz", false, exists);
      System.out.println("当前节点内容:" + new String(data));
    3. 获取当前节点的子节点

      1
      2
      List<String> children = zooKeeper.getChildren("/ltyzzz", false);
      System.out.println("当前节点的子节点:" + children);
  3. 更新节点

    • 其中第三个参数为版本号。该版本号必须和当前节点版本号一致,否则更新失败(乐观锁)
    • 可以将其指定为-1,表示不关心版本号
    1
    2
    zooKeeper.setData("/ltyzzz", "zzz...".getBytes(), exists.getVersion());
    zooKeeper.setData("/ltyzzz", "zzz...".getBytes(), -1); // 不关心版本号
  4. 删除操作

    1
    zooKeeper.delete("/ltyzzz/test1", -1); // 不关心版本号

3.4.3 节点监听事件

每一个操作节点的方法中均有一个watch参数,为boolean类型。

watch为true时,代表启动节点监听事件

3.5 实现Zookeeper分布式锁

独占排他:ZNode节点不可重复

3.5.1 自旋锁版本

  1. 创建ZkClient,用于初始化时获取Zk链接与结束时释放Zk链接

    • 添加@Component注解,交给Spring容器管理,SpringBoot启动时会执行初始化操作
    • @PostConstruct:执行完ZkClient构造方法之后,获取Zookeeper链接
    • @PreDestroy:在ZkClient销毁之前,释放Zookeeper链接
    • getLock:用于创建锁对象
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    @Component
    public class ZkClient {

    ZooKeeper zooKeeper = null;

    @PostConstruct
    public void init() {
    // 获取Zk链接
    CountDownLatch countDownLatch = new CountDownLatch(1);
    try {
    zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
    Event.KeeperState state = watchedEvent.getState();
    if (Event.KeeperState.SyncConnected.equals(state)) {
    System.out.println("获取链接:" + watchedEvent);
    countDownLatch.countDown();
    } else if (Event.KeeperState.Closed.equals(state)) {
    System.out.println("关闭链接:" + watchedEvent);
    }
    }
    });
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    @PreDestroy
    public void destroy() {
    // 释放Zk链接
    if (zooKeeper != null) {
    try {
    zooKeeper.close();
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }
    }

    public ZkDistributedLock getLock(String lockName) {
    return new ZkDistributedLock(zooKeeper, lockName);
    }
    }
  2. 创建ZkDistributedLock类,用于实现锁

    • 所有的锁都将创建在 /locks 目录下,所以在构造方法中,需要先判断有没有该目录,没有的话则需要提前创建
    • 注意:/locks 为永久节点,而创建出来的锁为临时节点,这是为了防止服务器宕机造成的死锁问题,而设置为临时节点的话,服务器宕机之后,临时节点也会随之被删除。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    public class ZkDistributedLock implements Lock {

    private String lockName;
    private ZooKeeper zooKeeper;

    private static final String ROOT_PATH = "/locks";

    public ZkDistributedLock(ZooKeeper zooKeeper, String lockName) {
    this.zooKeeper = zooKeeper;
    this.lockName = lockName;
    try {
    if (zooKeeper.exists(ROOT_PATH, false) == null) {
    zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    } catch (KeeperException e) {
    throw new RuntimeException(e);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }

    @Override
    public void lock() {
    // 创建ZNode节点
    tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
    try {
    zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    return true;
    } catch (Exception e) {
    e.printStackTrace();
    try {
    Thread.sleep(80);
    tryLock();
    } catch (InterruptedException ex) {
    ex.printStackTrace();
    }
    }
    return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return false;
    }

    @Override
    public void unlock() {
    // 删除ZNode节点
    try {
    this.zooKeeper.delete(ROOT_PATH + "/" + lockName, -1);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    } catch (KeeperException e) {
    throw new RuntimeException(e);
    }
    }

    @Override
    public Condition newCondition() {
    return null;
    }
    }
  3. 在StockService中注入ZkClient对象,并修改deduct方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Autowired
    private ZkClient zkClient;

    public void deduct() {
    ZkDistributedLock lock = zkClient.getLock("lock");
    lock.lock();
    try {
    String stock = redisTemplate.opsForValue().get("stock").toString();
    if (stock != null && stock.length() != 0) {
    Integer st = Integer.valueOf(stock);
    if (st > 0) {
    // 扣减库存
    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
    }
    }
    } finally {
    lock.unlock();
    }
    }

3.5.2 阻塞公平锁版本

利用临时序列化节点与监听机制实现

  1. 临时序列化节点在创建节点时,节点的名称后会追加序列号。

    给每一个获取锁的请求都创建一个临时序列化节点,越先尝试获取锁的节点,其序列号越小,优先级越高,满足公平锁的定义

    1. 获取当前节点的前驱节点
      • 若前驱节点为空,则获取锁成功,否则监听前驱节点
    2. 获取锁成功后执行业务操作,最后释放当前节点的锁
  2. 后创建的节点监听它之前的一个节点,如果监听到它的前驱节点被删除,则相当于获取到锁;否则阻塞

    (类似于AQS中同步队列的定义)

改造之前的代码:更改tryLock与unlock代码

注意:

  1. Zookeeper调用create方法后,会返回其全路径,这里将其作为了成员变量,方便解锁时删除
  2. 路径中加上了“-”,方便之后获取每个节点的序列号
  3. 通过编写getPreNode方法,获取当前节点的前驱节点
  4. 通过CountDownLatch实现阻塞功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Override
public boolean tryLock() {
try {
currentNodePath = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取前驱节点:若前驱节点为空,则获取锁成功,否则监听该节点
String preNode = this.getPreNode();
if (preNode != null) {
// 利用闭锁实现阻塞
CountDownLatch countDownLatch = new CountDownLatch(1);
// 因为获取前驱节点操作不具备原子性,需要再次判断zk中的前驱节点是否存在
if (zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
countDownLatch.countDown();
}
}) == null) {
return true;
}
countDownLatch.await();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

private String getPreNode() {
try {
// 获取根节点下的所有节点
List<String> children = this.zooKeeper.getChildren(ROOT_PATH, false);
if (CollectionUtils.isEmpty(children)) {
throw new IllegalMonitorStateException("非法操作");
}
// 获取根节点中对应于当前锁的所有节点
List<String> nodes = children.stream().filter(node -> StringUtils.startsWith(node, lockName + "-")).collect(Collectors.toList());
if (CollectionUtils.isEmpty(nodes)) {
throw new IllegalMonitorStateException("非法操作");
}
// 按照序列号排序
Collections.sort(nodes);
String currentNode = StringUtils.substringAfterLast(currentNodePath, "/");
// 获取当前节点对应的下标
int index = Collections.binarySearch(nodes, currentNode);
if (index < 0) {
throw new IllegalMonitorStateException("非法操作");
}
if (index > 0) {
// 返回当前节点的前驱节点
return nodes.get(index - 1);
}
return null;
} catch (Exception e) {
e.printStackTrace();
throw new IllegalMonitorStateException("非法操作");
}
}

@Override
public void unlock() {
// 删除ZNode节点
try {
this.zooKeeper.delete(currentNodePath, -1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}

3.5.3 实现可重入功能

通过ThreadLocal这一线程局部变量,记录重入次数

改造代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public boolean tryLock() {
try {
// 判断thread local中是否已经有锁,有锁则直接重入+1
Integer flag = THREAD_LOCAL.get();
if (flag != null && flag > 0) {
THREAD_LOCAL.set(flag + 1);
return true;
}
currentNodePath = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取前驱节点:若前驱节点为空,则获取锁成功,否则监听该节点
String preNode = this.getPreNode();
if (preNode != null) {
// 利用闭锁实现阻塞
CountDownLatch countDownLatch = new CountDownLatch(1);
// 因为获取前驱节点操作不具备原子性,需要再次判断zk中的前驱节点是否存在
if (zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
countDownLatch.countDown();
}
}) == null) {
THREAD_LOCAL.set(1);
return true;
}
countDownLatch.await();
}
THREAD_LOCAL.set(1);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

@Override
public void unlock() {
// 删除ZNode节点
try {
THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
// 减为0则删除
if (THREAD_LOCAL.get() == 0) {
this.zooKeeper.delete(currentNodePath, -1);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}

3.6 Curator

Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes:

  • curator-framework:提供了常见的zk相关的底层操作
  • curator-recipes:提供了一些zk的典型使用场景的参考
  1. 引入依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
    <exclusions>
    <exclusion>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
    <exclusions>
    <exclusion>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
    </dependency>
  2. 配置Curator:新建CuratorConfig配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Configuration
    public class CuratorConfig {
    @Bean
    public CuratorFramework curatorFramework() {
    // 初始化重试策略,使用的指数补偿策略
    RetryPolicy retry = new ExponentialBackoffRetry(10000, 3); // 设置初始间隔时间与重试次数
    // 初始化curatork客户端
    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retry);
    client.start(); // 手动启动,否则很多方法功能不能够工作
    return client;
    }
    }

3.6.1 InterProcessMutex

与ReentrantLock类似,拥有可重入的特性。

使用案例

注入CuratorFramework,修改StockService中的deduct方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Autowired
private CuratorFramework curatorFramework;

public void deduct() {
InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/locks");
try {
mutex.acquire();
String stock = redisTemplate.opsForValue().get("stock").toString();
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
// 扣减库存
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
this.testSub(mutex); // 测试可重入
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
mutex.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

public void testSub(InterProcessMutex mutex) {
try {
mutex.acquire();
System.out.println("测试可重入锁。。。。");
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
底层原理

InterProcessMutex初始化原理

  1. 首先调用如下构造方法

    1
    2
    3
    4
    5
    // 将传入的参数再传入到下一个构造方法中
    public InterProcessMutex(CuratorFramework client, String path)
    {
    this(client, path, new StandardLockInternalsDriver());
    }
  2. 进一步调用构造方法

    1
    2
    3
    4
    5
    6
    7
    8
    // this中的第一个参数为传进来的client,第二个参数为传进来的path
    // 第三个参数为常量LOCK_NAME:"lock-"
    // 第四个参数为驱动Driver:new StandardLockInternalsDriver()
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
    {
    // 1为租约:maxLeases
    this(client, path, LOCK_NAME, 1, driver);
    }
  3. 再进一步调用构造方法

    1
    2
    3
    4
    5
    6
    7
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
    {
    // 这是对传进来的path参数做校验
    basePath = PathUtils.validatePath(path);
    // LockInternals实例化很关键,之后的加锁与解锁方法都需要用到这个类
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
  4. 创建LockInternals对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
    {
    this.driver = driver;
    this.lockName = lockName;
    this.maxLeases = maxLeases;

    this.client = client.newWatcherRemoveCuratorFramework();
    // 再次校验path参数
    this.basePath = PathUtils.validatePath(path);
    // 将路径与锁名称进行拼接
    this.path = ZKPaths.makePath(path, lockName);
    }

加锁原理

  1. 执行 mutex.acquire() 方法时,内部调用acquire方法

    1
    2
    3
    4
    5
    6
    7
    public void acquire() throws Exception
    {
    if ( !internalLock(-1, null) )
    {
    throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
    }
  2. 内部又调用了internalLock方法:直接看注释

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
    // 获取当前线程
    Thread currentThread = Thread.currentThread();
    // 通过当前线程获取localData对象
    LockData lockData = threadData.get(currentThread);
    // 如果lockData不为null,说明发生了重入
    if ( lockData != null )
    {
    // 将重入次数加1,然后直接返回即可
    lockData.lockCount.incrementAndGet();
    return true;
    }
    // lockData为null,说明是获取新的锁
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
    // 创建一个新的lockData,并将当前线程的lockData记录到哈希表中
    LockData newLockData = new LockData(currentThread, lockPath);
    threadData.put(currentThread, newLockData);
    return true;
    }

    return false;
    }

    其中threadData是一个ConcurrentMap,key为Thread,值为LockData。记录了重入信息。

    LockData是一个内部类

    threadData与LockData属性如下所示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    private static class LockData
    {
    final Thread owningThread; // 所属线程
    final String lockPath; // 锁路径
    final AtomicInteger lockCount = new AtomicInteger(1); // 重入次数

    private LockData(Thread owningThread, String lockPath)
    {
    this.owningThread = owningThread;
    this.lockPath = lockPath;
    }
    }
  3. 获取新的锁时,会调用 internals.attemptLock() 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
    final long startMillis = System.currentTimeMillis();
    final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int retryCount = 0;
    String ourPath = null;
    boolean hasTheLock = false;
    boolean isDone = false;
    // 死循环重复尝试获取锁
    while ( !isDone )
    {
    isDone = true;
    try
    {
    // 创建一个当前锁的节点(临时序列化节点),并获取当前锁的全路径
    ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
    }
    catch ( KeeperException.NoNodeException e )
    {
    if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
    {
    isDone = false;
    }
    else
    {
    throw e;
    }
    }
    }
    if ( hasTheLock )
    {
    return ourPath;
    }
    return null;
    }

3.6.2 其它锁

  1. InterceProcessSemaphoreMutex:不可重入锁,其它与InterProcessMutex一致

  2. InterProcessReadWriteMutex:读写锁 -> 读读可并发,读写不可并发,写写不可并发

    有一个不同于Redisson分布式读写锁的特点:写锁在释放之前仍会阻塞请求线程,而读锁不会

    1. 在StockController中加入如下两个方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      @GetMapping("test/zk/read/lock")
      public String testZkReadLock() {
      this.stockService.testZkReadLock();
      return "hello test Zk read lock";
      }

      @GetMapping("test/zk/write/lock")
      public String testZkWriteLock() {
      this.stockService.testZkWriteLock();
      return "hello test Zk write lock";
      }
    2. 在StockService中加入如下两个方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      public void testZkReadLock() {
      try {
      InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwLock");
      lock.readLock().acquire(10, TimeUnit.SECONDS);
      //lock.readLock().release();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }

      public void testZkWriteLock() {
      try {
      InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwLock");
      lock.writeLock().acquire(10, TimeUnit.SECONDS);
      //lock.readLock().release();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }
  3. InterProcessMultiLock:联锁。所有的锁都加锁成功才算成功,否则加锁失败

  4. InterProcessSemaphoreV2:信号量。限流作用

    修改StockService中的testSemaphore方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void testSemaphore() {
    InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(curatorFramework, "/curator/locks", 5);
    try {
    Lease lease = semaphore.acquire();
    this.redisTemplate.opsForList().rightPush("log", "10086获取资源,开始处理..." + Thread.currentThread().getName());
    TimeUnit.SECONDS.sleep(new Random().nextInt(10));
    this.redisTemplate.opsForList().rightPush("log", "10086处理完成,释放资源..." + Thread.currentThread().getName());
    semaphore.returnLease(lease);
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }
  5. 共享计数器:类似于CountDownLatch

    Curator提供了两个实现类:SharedCount与DistributedAtomicNumber

    SharedCount实现如下:

    1. StockController中新增如下方法

      1
      2
      3
      4
      5
      @GetMapping("test/sharedCount")
      public String testSharedCount() {
      this.stockService.testSharedCount();
      return "hello test SharedCount";
      }
    2. StockService中新增如下方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      public void testSharedCount() {
      try {
      SharedCount sharedCount = new SharedCount(curatorFramework, "/curator/sharedcount", 100);
      sharedCount.start();
      int count = sharedCount.getCount();
      int random = new Random().nextInt(1000);
      sharedCount.setCount(random);
      System.out.println("共享计数器初始值为:" + count + ", 现在值为:" + random);
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
      }

4. MySQL分布式锁

4.1 实现方案

数据库准备工作

  1. 新建一张表tb_lock,两个字段分别为:id、lock_name

    image-20221020232747128

  2. 为lock_name设置索引,索引类型为unique唯一键索引

    image-20221020232946531

思路为:

  1. 加锁时执行如下SQL语句,若执行成功则代表加锁成功

    1
    INSERT INTO tb_lock(lock_name) values ('lock')
  2. 释放锁时,通过delete删除对应的锁记录

代码实现:

  1. 在pojo包下新建Lock类

    1
    2
    3
    4
    5
    6
    7
    @Data
    @TableName("tb_lock")
    public class Lock {

    private Long id;
    private String lockName;
    }
  2. 在mapper包下创建LockMapper接口

    1
    2
    public interface LockMapper extends BaseMapper<Lock> {
    }
  3. 修改StockService中的deduct方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public void deduct() {
    try {
    Lock lock = new Lock();
    lock.setLockName("lock");
    this.lockMapper.insert(lock);
    String stock = redisTemplate.opsForValue().get("stock").toString();
    if (stock != null && stock.length() != 0) {
    Integer st = Integer.valueOf(stock);
    if (st > 0) {
    // 扣减库存
    redisTemplate.opsForValue().set("stock", String.valueOf(--st));
    }
    }
    this.lockMapper.deleteById(lock.getId());
    } catch (Exception e) {
    try {
    Thread.sleep(50);
    this.deduct();
    } catch (InterruptedException ex) {
    throw new RuntimeException(ex);
    }
    }
    }

经过JMeter测试之后发现,基于MySQL实现的分布式锁虽然实现了并发下将库存减为0的功能,但是性能极差。

4.2 小结

  1. 独占排他互斥使用:采用唯一键索引实现

  2. 防死锁:客户端程序获取到锁之后,客户端服务器宕机造成死锁。

    • 解决方案:为tb_lock表添加获取锁时间字段lock_time,根据系统当前时间进行判断是否超时。

      需要采用额外的定时器去检查获取锁的系统时间与当前系统时间的差值是否超过某一阈值

  3. 可重入:

    • 解决方案:为tb_lock表添加server_id字段,代表对应的服务器;添加thread_id字段,代表对应服务器的线程;添加count字段,代表重入次数。
  4. 防误删:借助于id的唯一性防误删

  5. 原子性:单个写操作,还可借助MySQL悲观锁

  6. 自动续期:使用服务内的定时器重置获取锁的系统时间

  7. 单机故障:搭建MySQL主备服务器

  8. 集群下锁机制失效:难解决

  9. 阻塞锁:难实现

5. 总结

  1. 简易程度:MySQL最简单,其次Redis,Zookeeper最难
  2. 性能:Redis > Zookeeper > MySQL
  3. 可靠性:Zookeeper > Redis = MySQL
  • 如果要求不高、只实现独占排他、不要求可靠性与性能,选择MySQL
  • 如果追求性能,选择Redis
  • 如果追求可靠性,选择Zookeeper

分布式锁学习笔记
https://ltyzzzxxx.github.io/2022/10/09/分布式锁学习笔记/
作者
ltyzzz
发布于
2022年10月9日
许可协议