深入Redis学习教程之缓存,事务和分布式锁

作者 : IT 大叔 本文共11025个字,预计阅读时间需要28分钟 发布时间: 2020-10-26

一、redis缓存的雪崩、穿透和击穿

以电商的缓存数据为例,目前电商首页以及热点数据都会去做缓存 ,一般缓存都是定时任务去刷新,或者是查不到之后去更新的,定时任务刷新就有一个问题。

1.缓存雪崩

举个简单的例子:如果所有首页的Key失效时间都是12小时,中午12点刷新的,我零点有个秒杀活动大量用户涌入,假设当时每秒 6000 个请求,本来缓存在可以扛住每秒 5000 个请求,但是缓存当时所有的Key都失效了。此时 1 秒 6000 个请求全部落数据库,数据库必然扛不住,它会报一下警,真实情况可能DBA都没反应过来就直接挂了。此时,如果没用什么特别的方案来处理这个故障,DBA 很着急,重启数据库,但是数据库立马又被新的流量给打死了。这就是我理解的缓存雪崩。

也就是说:缓存大面积失效,请求都到了数据库上,导致数据库承受不住压力挂掉,这就是缓存雪崩。

解决方式:
在批量往Redis存数据的时候,把每个Key的失效时间都加个随机值就好了,这样可以保证数据不会在同一时间大面积失效。
或者设置热点数据永远不过期,有更新操作就更新缓存就好了。

2.缓存穿透

缓存穿透是指缓存和数据库中都没有的数据。由于缓存中不存在这个数据,所以请求会落在数据库中,用户可以不断发起请求从而增加数据库的压力,消耗io资源。

缓存穿透的发生一般都是攻击者的行为。

例如我们数据库的 id 都是1开始自增上去的,如发起为id值为 -1 的数据或 id 为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大,严重会击垮数据库。

简单地说就是绕开redis,直接打击数据库。

解决方式:
可以进行用户鉴权校验和对key进行简单的校验,例如判断一下id是否小于0或者大于最大的id
也可以用nginx的限流功能,限制单个ip的每秒访问次数。

或者如果这个key在数据库中也不存在的话,那么在缓存层中对这个key缓存为空值。这样攻击者用相同的id请求的时候就会打在缓存上。

还有一种方式就是使用redis布隆过滤器判断Key是否在数据库中存在,不存在直接return false就好了

3.缓存击穿
缓存击穿是指一个热点key被大量用户请求,所有的并发都集中在这个key上面,当这个key失效的瞬间(过了有效期),所有请求就会都打在了mysql上。

解决方式:
热点key设置永不过期
或者
使用互斥锁

下面使用python实现互斥锁解决缓存击穿的问题。

# coding=utf-8

import redis,pymysql,time

r = redis.Redis(host='127.0.0.1', port=6379)

def getValueByDb(id):
    pass    # 略

def getValueById(prefix, id):   # prefix是key前缀
    key = prefix + "_" + id
    value = r.get(key)

    # 缓存没有命中
    if not value:
        lock_name = "lock_" + key
        if not r.get(lock_name):    # 某用户第一次触发击穿缓存
            r.set(lock_name, 1)
    
            # 从mysql中读取数据
            value = getValueByDb()
            r.setex(key, 86400, value)
            r.delete(lock_name)        # 解锁
        else:   # 数据还在从mysql中查询中,阻塞住其他并发查询的用户
            time.sleep(0.05)    # 阻塞50毫秒再查
            value = getValueById(prefix, id)
            
    return value    
    

二、redis中的事务
redis的事务和mysql的事务有所不同。

multi    # 开启事务    
# 一系列操作
exec    # 执行操作
discard # 在multi后执行,可以撤销事务中的操作

对比一下mysql的事务,下面是MySQL的事务的命令

BEGIN:显式地开启一个事务;
COMMIT:提交事务,将对数据库进行的所有修改变成为永久性的;
ROLLBACK:结束用户的事务,并撤销正在进行的所有未提交的修改;

redis事务和mysql事务的区别:
redis事务的原理是在multi之后,所执行的命令都不会马上去执行(包括读命令也不会马上执行),而是将这些命令按顺序放入队列当中。exec是会将队列中所有的命令按顺序执行(原子性的执行),discard命令会将discard前的命令从队列中删去。

MySQL实现事务,是基于UNDO/REDO日志 。
UNDO日志 记录修改前 状态,ROLLBACK 基于UNDO日志实现;
REDO日志 记录修改后 的状态 ,COMMIT 基于REDO日志实现;
在MySQL中无论是否开启事务,SQL都会被立即执行并返回执行结果。只是事务开启后执行后的状态 只是记录在REDO日志 ,执行COMMIT 之后,数据才会被写入磁盘。

举个例子:
在redis客户端中:

例子1:

multi
set a 1
set b 2
aaa            # 报错导致队列中所有命令都discard
set c 3
exec

# 结果: a b c 都为null

例子2:

multi
set a 1
set b 2
discard        # 删掉队列中discard前的命令
set c 3
exec

# 结果: a b 为null,c为3

例子3:

multi
set a 1
set b name
incr b        # 会报错,redis会忽略这一条命令,不影响其他命令
set c 3
exec

# 结果: a 1, b name,c 3

例子4:

set num 10
multi
incr num
# 此时有人在另一个客户端中先执行了incr num
exec

# incr的结果是12

在mysql客户端中:
# num字段初始为10

例子1:

begin;
update transaction set num=num+1 where id=1;
select * from transaction;        # 结果为11,说明他是直接执行,而不像redis中放在队列等之后再执行

#此时再开另一个mysql客户端,查询num字段,发现结果为10,说明这个修改还没有真正写到磁盘中。
commit;        # 写入到磁盘中。

例子2:

begin;
update transaction set num=num+1 where id=1;
select * from transaction;        # 结果为11,说明他是直接执行,而不像redis中放在队列等之后再执行

#此时再开另一个mysql客户端,update num字段为num-1,发现光标在等待,命令的执行被阻塞住了
commit;        # 写入到磁盘中。此时另一个客户端的命令也执行了。

#最终的结果为num = 10

对比redis的例子4和mysql的例子2,在redis中,当一个客户端开启事务对num进行操作的时候,另一个客户端依旧可以对num进行修改,因为redis的事务并没有上锁。而在mysql的事务中,系统会对事务中操作的行上锁,所以当mysql其他线程对相同的行进行修改的时候就会被阻塞住,直到其他线程的事务执行完为止。

Redis 提供的不是严格的事务,Redis 只保证串行执行命令,并且能保证全部执行,但是执行命令失败时并不会回滚,而是会继续执行下去。而mysql会在命令执行发生错误时进行回滚。

redis事务没有用到锁,是否意味着redis的事务没有保护数据一致的能力呢?不是的,redis事务还可以配合watch命令使用,形成乐观锁。
乐观锁也允许客户端1在事务中修改某个(某些)key时让客户端2修改相同的key,但是当客户端2修改成功时,会打断客户端的事务,阻止事务中所有命令的执行(事务中所有命令都不会执行)。

像redis watch这样允许客户端1在修改某个数据时也让其他客户端对这个数据进行修改的情况就是乐观锁
像mysql这样在某个线程中对某条记录进行修改时,阻塞其他线程对相同的记录进行修改(不许其他线程对相同的记录进行修改)的情况就是悲观锁。

Redis Watch

Redis Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断(事务中的命令从队列中全部删除,执行exec无效)

例如在客户端1:

# money和goods的初始值为1000 和 0

watch money goods
multi
set money 500        # 此时不会马上执行,而是先将这个命令放入队列中
set goods 5            # 此时不会马上执行,而是先将这个命令放入队列中
(在此时客户端2执行set goods 10)
exec

# 此时money还是1000,goods为10,事务没有生效
# 当事务结束的时候,watch的监控就会失效,下一次再执行multi的时候,其他客户端修改事务中的key就不会打断事务,所以下一次开启事务之前还要再watch一次才行。

再举一个例子:

# money和goods的初始值为1000 和 0

# 客户端1                      | # 客户端2
watch money goods    # 1      |  watch money goods  # 2
multi                 # 3     |  multi               # 4
set money 100         # 5     |  set money 1000       # 6
set goods 10         # 7      |  set goods 100    # 8
exec                 # 10     |  exec            # 9    

结果客户端2执行事务成功,客户端1回滚。因为客户端2先执行exec,先执行了对money和goods的修改,所以把客户端1的事务打断。

查询操作不会影响事务的执行(无论是在MySQL还是在redis中)。

用乐观锁可以解决高并发秒杀中的超卖问题(超卖问题就是商品库存出现负数的问题)。

# coding=utf-8

from threading import Thread,Semaphore
import redis,time,random

# 连接redis
r = redis.Redis(host='127.0.0.1', port=6379)

# 定义线程类
class MiaoShaThread(Thread):
    def __init__(self, redis, sem, thread_no):
        super(MiaoShaThread, self).__init__()
        self.r = redis
        self.sem = sem
        self.thread_no = thread_no      # 记录线程的序号

    def run(self):
        with self.r.pipeline() as pipe:     # 在python中watch必须配合pipeline使用
            try:
                # 监视商品库存 inventory
                pipe.watch('inventory')

                # 获取库存必须放在multi之前,否则取到的是一个pipeline对象而不是value值
                # 获取库存必须放在watch之后,否则可能出现一获取到库存之后,库存马上被别的线程改为0,然后本线程才执行watch的情况
                inventory = int(pipe.get('inventory'))

                # 判断商品库存 inventory 是否大于0
                if inventory > 0:
                    pipe.multi()  # 开启事务,开启事务不能放在if之外
                    pipe.decr('inventory')      # 商品数-1
                    pipe.execute()              # 执行事务
                    print("第%s号用户抢购成功" % str(self.thread_no))
                    # time.sleep(random.uniform(0,1))
                else:
                    print("第%s号用户抢购失败" % str(self.thread_no))
            except:     # 如果出现异常说明在执行事务的过程中,其他线程先对库存进行了修改导致本线程执行事务失败
                print("第%s号用户抢购失败,您的商品被别人抢走了" % str(self.thread_no))

        self.sem.release()

# 用15个线程模拟50000个人抢购商品,一直维持着有15个线程在工作
thread_list = []
thread_num = 15
sem = Semaphore(thread_num)
for i in range(5000):
    sem.acquire()
    thread = MiaoShaThread(r, sem, i+1)
    thread_list.append(thread)
    thread.start()

for thread in thread_list:
    thread.join()

print(r.get('inventory'))

这个乐观锁实现高并发秒杀是在简书的一个作者的博客上看到的,不过作者使用的是java实现,我这里看了作者的代码后重新用python实现了一下

运行结果如下:
第4号用户抢购失败,您的商品被别人抢走了
第6号用户抢购成功
第1号用户抢购失败,您的商品被别人抢走了
第15号用户抢购成功
第9号用户抢购失败,您的商品被别人抢走了
第12号用户抢购成功
第11号用户抢购失败,您的商品被别人抢走了
第10号用户抢购成功
第8号用户抢购成功
.....
第1435号用户抢购成功
第1434号用户抢购失败,您的商品被别人抢走了
第1436号用户抢购成功
第1437号用户抢购失败,您的商品被别人抢走了
第1438号用户抢购失败
第1439号用户抢购失败
第1440号用户抢购失败
第1441号用户抢购失败
第1442号用户抢购失败
第1443号用户抢购失败
第1444号用户抢购失败
第1445号用户抢购失败
.....

在这个作者的博客中,还有其他的解决高并发秒杀的方式
1、虽然能用数据库的锁避免,超过限量的问题。但是在大并发的情况下,大大影响数据库性能
2、为了避免并发操作数据库,我们可以使用队列来限制,但是并发量会让队列内存瞬间升高
3、我们又可以用悲观锁来实现,但是这样会造成用户等待,响应慢体验不好

乐观锁参考博客链接:https://www.jianshu.com/p/06f1bce98451

三、redis分布式锁

redis除了乐观锁,还可以实现分布式锁,它一样可以解决高并发下数据不一致的问题。

使用场景:
在多服务修改redis数据的情况下,需要对存在并发竞争的数据进行加锁。
这里大家可能会疑惑,redis不是单线程吗,为什么还需要加锁?

redis是单线程的没错,但是你的服务器可不只一个,可能有多台nginx服务器进行负载均衡哦,如果多台服务器同时向redis服务器发起请求还是可能导致数据不一致的问题。

例如:
有3个服务器A,B,C同时请求抢购某一个商品。redis中的库存数为1。
如果不使用乐观锁的话,A,B,C都会读取到库存数为1,都会去执行incr,导致redis库存为-2

下面就是用redis分布式锁来解决这个问题。

redis分布式锁是怎么解决这个问题的呢?
在任意一个时刻,只让一个客户端持有锁,只让一个客户端修改数据,其他并发请求的客户端等待锁释放。是不是很像互斥锁,不过之前说的互斥锁只对多线程的数据安全有效,这里的锁是redis生成的,所以对多个服务多个进程也有效。

除了上面的互斥性,还要注意:
1. 锁超时。一个客户端持有锁的期间崩溃而没有主动释放锁,也需要保证后续其他客户端能够加锁成功(否则会造成死锁,其他客户端都会处于一直阻塞的状态),这个可以通过给锁设置超时时间做到。
2.加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给释放了。

redis分布式锁本质上就是一个有过期时间的String类型的key而已。

实现版本1

# coding=utf-8

from threading import Thread, Semaphore
import redis,uuid
from time import sleep

class DistributedLock:
    def __init__(self, redis, name, expire=2, timeout=3):
        self.r = redis
        self.expire = expire      # 锁的过期时间
        self.timeout = timeout      # 等待获取锁的时间    
        self.name = "lock_" + name          # 锁的key名
        self.uuid = str(uuid.uuid4())    # 锁的唯一标识,不同客户端生成的锁的唯一标识不同

    # 上锁
    def lock(self):
        while True:
            if not self.r.get(self.name):   # 如果其他线程或进程没有拿到锁,则本线程就先持有锁
                self.r.set(self.name, self.uuid)
                return True
            else:   # 如果没有拿到锁,则等待其他线程释放锁
                print("锁被占用")
                sleep(0.01)

    # 解锁
    def unlock(self):
        self.r.delete(self.name)

class BuyGoods(Thread):
    def __init__(self, redis, sem, thread_no):
        super(BuyGoods, self).__init__()
        self.r = redis
        self.thread_no = thread_no
        self.sem = sem
        self.lock = DistributedLock(self.r, "goods_inventory")

    def run(self):
        self.lock.lock()
        if int(self.r.get("inventory")) > 0:
            self.r.decr("inventory")
            print("抢购成功:" + str(self.thread_no + 1))
        else:
            print("库存不足")
        self.lock.unlock()

        self.sem.release()


# 连接redis
r = redis.Redis(host='127.0.0.1', port=6379)

# 用15个线程模拟50000个人抢购商品,一直维持着有15个线程在工作
thread_list = []
thread_num = 15
sem = Semaphore(thread_num)
for i in range(5000):
    sem.acquire()
    thread = BuyGoods(r, sem, i)
    thread_list.append(thread)
    thread.start()

for thread in thread_list:
    thread.join()

print(r.get('inventory'))

改善点如下:
1.上锁的时候分成了两句:
self.r.get(self.name)  判断其他线程是否持有锁
self.r.set(self.name, self.uuid)    上锁

这里可以合并为1句,变成一个原子操作,redis提供了这样的接口:self.r.setnx()
该方法作用是:如果有某个key则不作为,返回0;如果不存在某个key则添加这个key,返回1

2.我这里使用一个线程模拟一个用户,如果是在真实情况下,某个服务器在BuyGoods的执行self.lock.lock()之后挂掉了,没能执行的了self.lock.unlock解锁,那么其他服务器就会一直sleep,发生死锁。
为了解决这个问题,需要在上锁的时候给锁加一个过期时间。

if self.r.setnx(self.name, self.uuid):
    self.r.expire(self.name, self.expire)
else:
    sleep(0.01)

但是这样还是有一点不合理,如果服务器在self.r.setnx()执行完之后挂掉了,锁就还是没有过期时间。

此时可以使用python提供的 self.r.set(self.name, self.uuid, ex=self.expire, nx=True) ,他将setex,setnx方法结合了起来,保证了他们是一个原子操作。

3.解锁的时候不能直接delete删除锁,要考虑一种情况:线程1在执行减库存时由于特殊原因导致这个时间特别的长,长到锁的过期时间已经过去了,锁已经自动解开了,这个时候其他的线程就拿到了锁。线程1又进行了delete操作,就会把其他线程持有的锁给解开了。没有了锁的保护,就会导致数据不一致。

为了解决这个问题,我们可以设置两层保护机制:
一个是检验锁的value是否为本线程的uuid,是就说明这是本线程自己加的锁,可以删除
一个是使用watch和事务,监控锁这个key,如果这个key被其他线程修改时事务操作就会抛出WatchError异常而终止。

实现版本2

# coding=utf-8

from threading import Thread, Semaphore
import redis,uuid
import time

class DistributedLock:
    def __init__(self, redis, name, expire=1, timeout=2):
        self.r = redis
        self.expire = expire      # 锁的过期时间
        self.timeout = timeout    # 获取锁的超时时间
        self.name = "lock_" + name          # 锁的key名
        self.uuid = str(uuid.uuid4())    # 锁的唯一标识,不同客户端生成的锁的唯一标识不同

    # 上锁
    def lock(self):
        end_time = time.time() + self.timeout
        while time.time() < end_time:
        # while True:
            if self.r.set(self.name, self.uuid, ex=self.expire, nx=True):   # 如果其他线程或进程没有拿到锁,则本线程就先持有锁
                return True
            else:   # 如果没有拿到锁,则等待其他线程释放锁
                print("锁被占用,请等待一下")
                time.sleep(0.01)

        return False    # 过了获取锁的超时时间

    # 解锁
    def unlock(self):
        with self.r.pipeline() as pipe:
            try:
                pipe.watch(self.name)
                now_uuid = pipe.get(self.name).decode('utf-8')      # 从redis取出来的字符串是编码类型,要解码一下才行
                # print(now_uuid + "|" + self.uuid)
                if now_uuid and self.uuid == now_uuid:  # 如果当前redis分布式锁的key值还是本线程的uuid,说明这个锁是本线程自己生成的,可以删除
                    pipe.multi()    # 使用事务删除锁,如果其他线程获取到了锁(即修改了self.name这个key)就可以破坏这个事务
                    pipe.delete(self.name)
                    pipe.execute()
            except: # 说明事务被破坏
                print("事务被破坏")
                pass

class BuyGoods(Thread):
    def __init__(self, redis, sem, thread_no):
        super(BuyGoods, self).__init__()
        self.r = redis
        self.thread_no = thread_no
        self.sem = sem
        self.lock = DistributedLock(self.r, "goods_inventory")

    def run(self):
        lock_result = self.lock.lock()
        if lock_result:
            if int(self.r.get("inventory")) > 0:
                self.r.decr("inventory")
                print("抢购成功:" + str(self.thread_no + 1))
            else:
                print("库存不足")
            self.lock.unlock()
        else:   # 走到这里说明在整个锁的过期时间内这个线程都抢不到锁,但这个几率非常非常的小
            print("系统正忙请重试")    # 没有抢到锁自然也不用解锁啦

        self.sem.release()


# 连接redis
r = redis.Redis(host='127.0.0.1', port=6379)

# 用15个线程模拟50000个人抢购商品,一直维持着有15个线程在工作
thread_list = []
thread_num = 15
sem = Semaphore(thread_num)
for i in range(2000):
    sem.acquire()
    thread = BuyGoods(r, sem, i)
    thread_list.append(thread)
    thread.start()

for thread in thread_list:
    thread.join()

print(r.get('inventory'))

其实分布式锁和之前学的互斥锁,条件变量的本质没有什么不同,都是为了保护数据一致性。只不过之前的互斥锁,条件变量都只能用于多线程编程,而如果到了服务器集群这种场景下就显得无用武之地了,此时就需要靠分布式锁来保护数据一致性。

其他的分布式锁的实现:
1.用mysql的事务也可以做到相同的效果,不过mysql的分布式锁的使用情景是:库存是存在mysql中,而请求也是直接请求到mysql中进行修改库存。但是请求打到mysql上性能会明显降低,毕竟磁盘读写再快也快不过内存。
2.使用Zookeeper,不过这个具体还不了解。

免责声明:
1. 本站资源转自互联网,源码资源分享仅供交流学习,下载后切勿用于商业用途,否则开发者追究责任与本站无关!
2. 本站使用「署名 4.0 国际」创作协议,可自由转载、引用,但需署名原版权作者且注明文章出处
3. 未登录无法下载,登录使用金币下载所有资源。
IT小站 » 深入Redis学习教程之缓存,事务和分布式锁

常见问题FAQ

没有金币/金币不足 怎么办?
本站已开通每日签到送金币,每日签到赠送五枚金币,金币可累积。
所有资源普通会员都能下载吗?
本站所有资源普通会员都可以下载,需要消耗金币下载的白金会员资源,通过每日签到,即可获取免费金币,金币可累积使用。

发表评论