首先,先引入三种锁的概念,分别是线程锁,进程锁,分布式锁
线程锁
这个应该是最熟悉的了,java中Synchronized、ReentrantLock,AQS和Lock的那些都是,保证被锁的方法或代码块在多线程下只有一个线程在访问,当然还分资源独占和资源共享的,像Semaphore、CountdownLatch、CyclicBarrier这种资源共享的一次可以分配多个资源。
进程锁
同一个操作系统内,控制多个进程访问共享资源,因为不同进程是无法干扰对方对资源的访问,所以可以用操作系统的信号量控制。
分布式锁
就是在上面两个的基础上再进行延伸,不同系统,不同进程,不同线程之间的锁,需要通过第三方存储介质来存储锁的信息,常用的Redis,Zookeeper,etcd,MySQL,这几种方式各有优缺,分布式锁是不可能像线程锁那么高效高可用,不那么容易出问题的,所以取决于应用场景,选择适合自己的。
其实他们就是锁的作用范围不同,可以看到 分布式锁>进程锁>线程锁
分布式锁要满足的条件
- 互斥性,任意时刻只有一个客户端可以拿到锁
- 避免死锁,防止一个客户端拿到锁后,长时间不释放,比如客户端崩溃,持锁期间其他代码执行出问题,需要被动的释放,在Redis里使用expire设置过期时间,可以很方便的避免这点
- 一致性,加锁客户端必须和解锁客户端一致,不同客户端不能相互干扰,所以每个客户端申请锁时都要用一个唯一标示的ID作为value,释放锁时对比value相同才允许释放
- 高可用、容错性等等,这篇文章只是单点Redis的,慢慢来。。。。。。
Redis分布式锁核心指令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
从 Redis 2.6.12 版本开始, SET
命令的行为可以通过一系列参数来修改:
EX seconds
: 将键的过期时间设置为seconds
秒。 执行SET key value EX seconds
的效果等同于执行SETEX key seconds value
。PX milliseconds
: 将键的过期时间设置为milliseconds
毫秒。 执行SET key value PX milliseconds
的效果等同于执行PSETEX key milliseconds value
。NX
: 只在键不存在时, 才对键进行设置操作。 执行SET key value NX
的效果等同于执行SETNX key value
。XX
: 只在键已经存在时, 才对键进行设置操作。
因为 SET
命令可以通过参数来实现 SETNX
、 SETEX
以及 PSETEX
命令的效果, 所以 Redis 将来的版本可能会移除并废弃 SETNX
、 SETEX
和 PSETEX
这三个命令。
代码示例
127.0.0.1:6379> set testKey 123456 EX 120 NX //设置key为testKey的值为123456,过期时间120秒,附加NX参数
OK
127.0.0.1:6379> set testKey 123456 EX 120 NX //这一步返回了nil,因为附加了NX参数,这个Key已存在,所以设置失败
(nil)
看到这,应该就大概知道了如何利用Redis实现锁,如果set nx成功,说明申请到锁,如果set nx失败,说明key已经存在,被其他进程/线程占用。
如果释放锁,直接删除该key就行,但是为了保证不同线程所删除的是它自己当时申请的锁,防止误删除,需要对比这个value,所以每个线程申请锁时,需要设置一个唯一标识的ID为value,而删除key时,需要对比这个value是否一致,一致才删除,避免特殊情况误删除。
如果正常来想,在程序中首先get key,比较下value,相同则删除,但是这是两步操作:
- get key 并比较value
- 删除key
那么就会出现原子性问题,如果刚get key后且比较value相等,这时候正好其他线程又获取到了锁,那么你删除的就是其他线程的锁,所以需要利用lua脚本,将这两个操作合并为一段代码,在redis中用eval执行lua脚本是具有原子性的,lua脚本如下
if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else return 0 end
如果在redis控制台中执行,那么如下
eval "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" 1 testKey 123456
evel语句的格式: eval script numkeys key [key…] arg [arg…],
其实就是:eval “lua脚本” key数量 key arg arg arg,那个numkeys代表后面的那些N个参数,前几个是key,剩下的则都做为arg
RedisLock实现类
在spring中,直接通过redisTemplate执行redis命令即可,但是要注意redis的序列化器设定的是什么,必须如下。
我刚开始用的还是fastjson作为value的序列化器,转json嘛,无论什么会带上双引号的,用redisTemplate执行lua脚本时,就是不成功,比较value总是不相等,哪怕用string.gsub替换掉双引号再比较也不行,但是手动执行eval命令又可以,所以只能分开,这个redis锁单独用默认序列化器的。
@Bean("redisStringTemplate")
public RedisTemplate<String, Serializable> redisStringTemplate(LettuceConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Serializable> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
template.setEnableTransactionSupport(true);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new JdkSerializationRedisSerializer()); //重点 用默认的就行
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new JdkSerializationRedisSerializer());
template.setDefaultSerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
/**
* @author : LoneKing
*/
@Slf4j
@Component
public class RedisLockUtil implements Lock {
/**
* 解锁成功标志
*/
private static final String RELEASE_SUCCESS = "1";
private static final String REDIS_LOCK_KEY = "REDIS_LOCK_KEY";
private final RedisTemplate redisTemplate;
private ThreadLocal<String> uuidMap = new ThreadLocal<>();
public RedisLockUtil(RedisTemplate redisStringTemplate) {
this.redisTemplate = redisStringTemplate;
}
/**
* 如果键不存在则新增,存在则不改变已经有的值
*
* @param key
* @param value
* @param expireTime 过期时间
* @param unit 时间单位
* @return
*/
public boolean setIfAbsent(String key, String value, long expireTime, TimeUnit unit) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, unit);
return result != null && result;
}
@Override
public void lock() {
if (tryLock()) {
return;
}
try {
//稍微等待一下再继续尝试获取
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//递归获取锁
lock();
}
@Override
public void lockInterruptibly() throws InterruptedException { }
@Override
public boolean tryLock() {
//每个客户端申请锁前获取唯一ID
String uuid = UUID.randomUUID().toString();
//保存到ThreadLocal中
uuidMap.set(uuid);
return setIfAbsent(REDIS_LOCK_KEY, uuid, 2000, TimeUnit.MILLISECONDS);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//每个客户端申请锁前获取唯一ID
String uuid = UUID.randomUUID().toString();
//保存到ThreadLocal中
uuidMap.set(uuid);
return setIfAbsent(REDIS_LOCK_KEY, uuid, time, unit);
}
@Override
public void unlock() {
boolean isUnlock = false;
try {
//获取锁对应的value值,如果相等则删除锁(解锁),lua脚本是原子性的
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return " +
"redis.call('del', KEYS[1]) else return 0 end";
DefaultRedisScript redisScript = new DefaultRedisScript<>(script, Long.class);
//当前客户端线程申请锁时用的uuid
String currentUUID = uuidMap.get();
Object result = redisTemplate.execute(redisScript,
Collections.singletonList(REDIS_LOCK_KEY), currentUUID);
if (result != null && RELEASE_SUCCESS.equals(result.toString())) {
isUnlock = true;
}
} catch (Exception e) {
e.printStackTrace();
log.error("redis解锁异常:" + e.getMessage());
}
}
@Override
public Condition newCondition() {
return null;
}
}
使用方法
因为实现自Lock接口,所以和其他锁使用方法是一样的,测试用例如下
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class TestLock {
@Resource(name = "redisLockUtil")
private Lock lock;
public TestLock() {
}
public static int count = 0;
@Test
public void testLock() {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
200, 200, 3L,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(300),
new ThreadPoolExecutor.CallerRunsPolicy());
long startTime = System.currentTimeMillis();
for (int i = 0; i < 200; i++) {
poolExecutor.execute(new Runnable() {
@Override
public void run() {
//加锁
lock.lock();
count++;
System.out.println(count);
//释放锁
lock.unlock();
}
});
}
poolExecutor.shutdown();
while (!poolExecutor.isTerminated()) {
}
System.out.println("执行结束:" + count);
System.out.println("耗时:" + (System.currentTimeMillis() - startTime) / 1000);
}
}