Post

分布式锁笔记

分布式锁笔记

何为分布式锁?

当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。

用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

分布式锁的条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

分布式锁的实现:

分布式锁的实现由很多种,文件锁、数据库、Redis等等,比较多;分布式锁常见的多种实现方式:

  1. 数据库悲观锁;
  2. 数据库乐观锁;
  3. 基于Redis的分布式锁;
  4. 基于ZooKeeper的分布式锁。

在实践中,还是Redis做分布式锁性能会高一些

数据库悲观锁

所谓悲观锁,悲观锁是对数据被的修改持悲观态度(认为数据在被修改的时候一定会存在并发问题),因此在整个数据处理过程中将数据锁定。

悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在应用层中实现了加锁机制,也无法保证外部系统不会修改数据)。

数据库的行锁、表锁、排他锁等都是悲观锁,这里以行锁为例进行介绍。

以我们常用的MySQL为例,我们通过使用 select…for update 语句, 执行该语句后,会在表上加持行锁,一直到事务提交,解除行锁。

使用场景举例:

在秒杀案例中,生成订单和扣减库存的操作,可以通过商品记录的行锁,进行保护。们通过使用select…for update语句,在查询商品表库存时将该条记录加锁,待下单减库存完成后,再释放锁。

示例的SQL如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 0.开始事务

begin;

# 1.查询出商品信息

select stockCount from seckill_good where id=1 for update;

# 2.根据商品信息生成订单

insert into seckill_order (id,good_id) values (null,1);

# 3.修改商品stockCount减一

update seckill_good set stockCount=stockCount-1 where id=1;

# 4.提交事务

commit;

以上,在对id = 1的记录修改前,先通过for update的方式进行加锁,然后再进行修改。这就是比较典型的悲观锁策略。

如果以上修改库存的代码发生并发,同一时间只有一个线程可以开启事务并获得id=1的锁,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。

我们使用select_for_update,另外一定要写在事务中注意:要使用悲观锁,我们必须关闭MySQL数据库中自动提交的属性,命令set autocommit=0;即可关闭,因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。

悲观锁的实现,往往依靠数据库提供的锁机制。在数据库中,悲观锁的流程如下:

  1. 在对记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)。
  2. 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。具体响应方式由开发者根据实际需要决定。
  3. 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
  4. 其间如果有其他事务对该记录做加锁的操作,都要等待当前事务解锁或直接抛出异常。

数据库乐观锁

使用乐观锁就不需要借助数据库的锁机制了。

乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。

其实现方式有一种比较典型的就是Compare and Swap(CAS)技术。

CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。

CAS的实现中,在表中增加一个version字段,操作前先查询version信息,在数据提交时检查version字段是否被修改,如果没有被修改则进行提交,否则认为是过期数据。

比如前面的扣减库存问题,通过乐观锁可以实现如下:

1
2
3
4
5
6
# 1.查询出商品信息
select stockCount, version from seckill_good where id=1;
# 2.根据商品信息生成订单
insert into seckill_order (id,good_id) values (null,1);
# 3.修改商品库存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;

以上,我们在更新之前,先查询一下库存表中当前版本(version),然后在做update的时候,以version 作为一个修改条件。

当我们提交更新的时候,判断数据库表对应记录的当前version与第一次取出来的version进行比对,如果数据库表当前version与第一次取出来的version相等,则予以更新,否则认为是过期数据。

CAS 乐观锁有两个问题:

(1) CAS 存在一个比较重要的问题,即ABA问题. 解决的办法是version字段顺序递增。

(2) 乐观锁的方式,在高并发时,只有一个线程能执行成功,会造成大量的失败,这给用户的体验显然是很不好的。

【资料】使用MySQL实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Resource
private JdbcTemplate jdbcTemplate;

public boolean lock(String lockName) {
    try {
        String sql = String.format("update optimistic_lock set lock_status=1, expire_at = NOW() + INTERVAL 1 MINUTE where lock_name ='%s' and lock_status = 0 ;", lockName);
        return jdbcTemplate.update(sql) == 1;
    } catch (Exception e) {
        return false;
    }
}

public void unLock(String lockName) {
    String sql = String.format("update optimistic_lock set lock_status=0 ,expire_at=now() where lock_name='%s' ;", lockName);
    jdbcTemplate.update(sql);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
mysql> desc sms_task_cron;
+-------------+------------------+------+-----+---------+----------------+
| Field       | Type             | Null | Key | Default | Extra          |
+-------------+------------------+------+-----+---------+----------------+
| id          | bigint unsigned  | NO   | PRI | NULL    | auto_increment |
| phone       | varchar(50)      | NO   |     | NULL    |                |
| task_name   | varchar(255)     | NO   |     | NULL    |                |
| content     | varchar(255)     | NO   |     | NULL    |                |
| status      | tinyint          | NO   |     | 1       |                |
| update_time | int unsigned     | NO   |     | NULL    |                |
| create_time | int unsigned     | NO   |     | NULL    |                |
| is_deleted  | tinyint unsigned | NO   |     | 0       |                |
+-------------+------------------+------+-----+---------+----------------+
8 rows in set (0.00 sec)

# ----------------
ALTER TABLE sms_task_cron
ADD COLUMN expire_at timestamp DEFAULT CURRENT_TIMESTAMP;

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private OptimisticLock optimisticLock;

@Test
void testOptimisticLock() {
    String lockName = "Hanson";
    IntStream.range(1, 10).parallel().forEach(x -> {
        try {
            if (optimisticLock.lock(lockName)) {
                log.info("get lock success");
            } else {
                log.warn("get lock error");
            }
        } finally {
            optimisticLock.unLock(lockName);
        }
    });
}

参考文章

  1. 【MySQL】优雅的使用MySQL实现分布式锁 https://blog.csdn.net/weixin_45683778/article/details/144564485

  2. MySql实现分布式锁的示例代码 https://www.freexyz.net/MySQL/2954.html

This post is licensed under CC BY 4.0 by the author.