Spring中实现单点Redis分布式锁

发布于 2020-04-09  327 次阅读


首先,先引入三种锁的概念,分别是线程锁,进程锁,分布式锁

线程锁

这个应该是最熟悉的了,java中Synchronized、ReentrantLock,AQS和Lock的那些都是,保证被锁的方法或代码块在多线程下只有一个线程在访问,当然还分资源独占和资源共享的,像Semaphore、CountdownLatch、CyclicBarrier这种资源共享的一次可以分配多个资源。

进程锁

同一个操作系统内,控制多个进程访问共享资源,因为不同进程是无法干扰对方对资源的访问,所以可以用操作系统的信号量控制。

分布式锁

就是在上面两个的基础上再进行延伸,不同系统,不同进程,不同线程之间的锁,需要通过第三方存储介质来存储锁的信息,常用的Redis,Zookeeper,etcd,MySQL,这几种方式各有优缺,分布式锁是不可能像线程锁那么高效高可用,不那么容易出问题的,所以取决于应用场景,选择适合自己的。

其实他们就是锁的作用范围不同,可以看到 分布式锁>进程锁>线程锁

分布式锁要满足的条件

  • 互斥性,任意时刻只有一个客户端可以拿到锁
  • 避免死锁,防止一个客户端拿到锁后,长时间不释放,比如客户端崩溃,持锁期间其他代码执行出问题,需要被动的释放,在Redis里使用expire设置过期时间,可以很方便的避免这点
  • 一致性,加锁客户端必须和解锁客户端一致,不同客户端不能相互干扰,所以每个客户端申请锁时都要用一个唯一标示的ID作为value,释放锁时对比value相同才允许释放
  • 高可用、容错性等等,这篇文章只是单点Redis的,慢慢来。。。。。。
redis分布式锁简易示意图

Redis分布式锁核心指令

SET key value [EX seconds] [PX milliseconds] [NX|XX]

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:

  • EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。
  • PX milliseconds : 将键的过期时间设置为 milliseconds 毫秒。 执行 SET key value PX milliseconds 的效果等同于执行 PSETEX key milliseconds value 。
  • NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value 。
  • XX : 只在键已经存在时, 才对键进行设置操作。

因为 SET 命令可以通过参数来实现 SETNX 、 SETEX 以及 PSETEX 命令的效果, 所以 Redis 将来的版本可能会移除并废弃 SETNX 、 SETEX 和 PSETEX 这三个命令。

代码示例

127.0.0.1:6379> set testKey 123456 EX 120 NX     //设置key为testKey的值为123456,过期时间120秒,附加NX参数
 OK
127.0.0.1:6379> set testKey 123456 EX 120 NX    //这一步返回了nil,因为附加了NX参数,这个Key已存在,所以设置失败
(nil)

看到这,应该就大概知道了如何利用Redis实现锁,如果set nx成功,说明申请到锁,如果set nx失败,说明key已经存在,被其他进程/线程占用。

如果释放锁,直接删除该key就行,但是为了保证不同线程所删除的是它自己当时申请的锁,防止误删除,需要对比这个value,所以每个线程申请锁时,需要设置一个唯一标识的ID为value,而删除key时,需要对比这个value是否一致,一致才删除,避免特殊情况误删除。

如果正常来想,在程序中首先get key,比较下value,相同则删除,但是这是两步操作:

  • get key 并比较value
  • 删除key

那么就会出现原子性问题,如果刚get key后且比较value相等,这时候正好其他线程又获取到了锁,那么你删除的就是其他线程的锁,所以需要利用lua脚本,将这两个操作合并为一段代码,在redis中用eval执行lua脚本是具有原子性的,lua脚本如下

if redis.call('get', KEYS[1]) == ARGV[1] 
    then return redis.call('del', KEYS[1])
else return 0 end

如果在redis控制台中执行,那么如下

eval "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 testKey 123456

evel语句的格式: eval script numkeys key [key...] arg [arg...],

其实就是:eval "lua脚本" key数量 key arg arg arg,那个numkeys代表后面的那些N个参数,前几个是key,剩下的则都做为arg

RedisLock实现类

在spring中,直接通过redisTemplate执行redis命令即可,但是要注意redis的序列化器设定的是什么,必须如下。

我刚开始用的还是fastjson作为value的序列化器,转json嘛,无论什么会带上双引号的,用redisTemplate执行lua脚本时,就是不成功,比较value总是不相等,哪怕用string.gsub替换掉双引号再比较也不行,但是手动执行eval命令又可以,所以只能分开,这个redis锁单独用默认序列化器的。

 @Bean("redisStringTemplate")
 public RedisTemplate<String, Serializable> redisStringTemplate(LettuceConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Serializable> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        template.setEnableTransactionSupport(true);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new JdkSerializationRedisSerializer()); //重点 用默认的就行
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new JdkSerializationRedisSerializer());
        template.setDefaultSerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        return template;
 }
/**
 * @author : LoneKing
 */
@Slf4j
@Component
public class RedisLockUtil implements Lock {
    /**
     * 解锁成功标志
     */
    private static final String RELEASE_SUCCESS = "1";
    private static final String REDIS_LOCK_KEY = "REDIS_LOCK_KEY";
    private final RedisTemplate redisTemplate;
    private ThreadLocal<String> uuidMap = new ThreadLocal<>();

    public RedisLockUtil(RedisTemplate redisStringTemplate) {
        this.redisTemplate = redisStringTemplate;
    }
    /**
     * 如果键不存在则新增,存在则不改变已经有的值
     *
     * @param key
     * @param value
     * @param expireTime 过期时间
     * @param unit       时间单位
     * @return
     */
    public boolean setIfAbsent(String key, String value, long expireTime, TimeUnit unit) {
        Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, unit);
        return result != null && result;
    }
    @Override
    public void lock() {
        if (tryLock()) {
            return;
        }
        try {
            //稍微等待一下再继续尝试获取
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //递归获取锁
        lock();
    }
    @Override
    public void lockInterruptibly() throws InterruptedException { }

    @Override
    public boolean tryLock() {
        //每个客户端申请锁前获取唯一ID
        String uuid = UUID.randomUUID().toString();
        //保存到ThreadLocal中
        uuidMap.set(uuid);
        return setIfAbsent(REDIS_LOCK_KEY, uuid, 2000, TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        //每个客户端申请锁前获取唯一ID
        String uuid = UUID.randomUUID().toString();
        //保存到ThreadLocal中
        uuidMap.set(uuid);
        return setIfAbsent(REDIS_LOCK_KEY, uuid, time, unit);
    }

    @Override
    public void unlock() {
        boolean isUnlock = false;
        try {
            //获取锁对应的value值,如果相等则删除锁(解锁),lua脚本是原子性的
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return " +
                    "redis.call('del', KEYS[1]) else return 0 end";
            DefaultRedisScript redisScript = new DefaultRedisScript<>(script, Long.class);
            //当前客户端线程申请锁时用的uuid
            String currentUUID = uuidMap.get();
            Object result = redisTemplate.execute(redisScript,
                    Collections.singletonList(REDIS_LOCK_KEY), currentUUID);
            if (result != null && RELEASE_SUCCESS.equals(result.toString())) {
                isUnlock = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("redis解锁异常:" + e.getMessage());
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

使用方法

因为实现自Lock接口,所以和其他锁使用方法是一样的,测试用例如下

@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class TestLock {
    @Resource(name = "redisLockUtil")
    private Lock lock;

    public TestLock() {
    }

    public static int count = 0;

    @Test
    public void testLock() {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                200, 200, 3L,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(300),
                new ThreadPoolExecutor.CallerRunsPolicy());
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 200; i++) {
            poolExecutor.execute(new Runnable() {
                @Override
                public void run() { 
                    //加锁
                    lock.lock();
                    count++;
                    System.out.println(count);
                    //释放锁
                    lock.unlock();

                }
            });
        }
        poolExecutor.shutdown();
        while (!poolExecutor.isTerminated()) {
        }
        System.out.println("执行结束:" + count);
        System.out.println("耗时:" + (System.currentTimeMillis() - startTime) / 1000);
    }
}