社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
分布式锁有两种经典的实现,一种是Zookeeper实现,另一种就是Redis实现了,下面简单介绍分布式锁原理以及Redis实现
分布式锁(Distributed Locks
)一词给人的第一种感觉就是一种深不可测的、高大上的感觉:
其实仔细了解后发现,分布式锁其实就是两块内容,一是分布式特性,这也是分布式锁相较于普通的锁来说需要解决的问题,二是锁特性,锁特性就如平常的同步锁、互斥锁一样 分布式锁首先要解决的问题,是多个系统中如何保证对同一资源的使用一致性,也就是对这个资源上锁,并且让其他系统也可立刻感知到这把锁的存在,从而阻塞或者自旋等待锁。以下两张图可以直观的感受这个过程:分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
Redis能实现分布式锁的原因在于它具有一条原子性命令:SETNX KEY VALUE
,即KEY不存在时则插入VALUE,返回1;若KEY存在,则插入失败,返回0。这就是一个锁的请求过程:若不存在KEY这个锁,则获取这个锁(插入一条以KEY标识的字符串);若存在KEY这个锁,则原地自旋请求至成功或超时退出。
127.0.0.1:6379> setnx dis-lock test
(integer) 1 //获取锁成功
127.0.0.1:6379> setnx dis-lock test
(integer) 0 //获取锁失败
由于SETNX命令是原子性的,所以不会出现获取锁时的并发问题而导致判断失误,重复地获取锁,但是这种方法有一个致命的问题:如果系统A给一个资源上了锁,但是A系统发生崩溃导致无法释放锁,这样就导致了死锁问题,若没有外力作用则其他系统都将无法获取到该锁,而只能等待,这可能会导致整个系统的崩坏。
为了解决上面可能出现的死锁问题,我们可以在获取锁之后,给该锁一个过期时间,这样就可以避免系统出现问题而无法释放锁而导致的死锁问题:
127.0.0.1:6379> setnx dis-lock test
(integer) 1 //获取锁成功
127.0.0.1:6379> expire dis-lock 10
(integer) 1 //设置过期时间为10s
127.0.0.1:6379> setnx dis-lock test
(integer) 0 //上锁的系统崩溃,无法释放锁,其他系统获取该锁失败
127.0.0.1:6379> setnx dis-lock test
(integer) 1 //在10S后锁过期,其他系统正常获取该锁
那么SETNX命令和EXPIRE命令配合使用是否可以实现一个完善的分布式锁呢?答案是不能。我们可以设想以下,如果A系统获取锁成功,但是在设置过期时间时发送错误,那么该锁仍然不会过期,这样子就又重复了上面的那个情况。解决办法是让SETNX命令和EXPIRE命令具备原子性。
这个命令是Redis 2.8中,作者为了解决SETNX和EXPIRE命令的原子性问题而产生的,现在通过这个命令,就可以使SETNX和EXPIRE命令统一成一个原子命令:若锁不存在,则一定会添加锁并且设置过期时间;
127.0.0.1:6379> set dis-lock test ex 10 nx
OK //获取锁并且设置过期时间
127.0.0.1:6379> set dis-lock test ex 10 nx
(nil) //获取锁失败
127.0.0.1:6379> set dis-lock test ex 10 nx
OK //获取锁失败
package redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
/**
* @Auther: ARong
* @Date: 19-9-28 下午1:01
* @Description:
**/
public class JedisUtil {
private static JedisPool pool = null;
static {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(500);
config.setMaxIdle(5);
config.setMaxWaitMillis(10 * 1000);
config.setTestOnBorrow(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 10 * 1000);
}
/**
* @auther: Arong
* @description: 从连接池中获取Jedis
* @param: []
* @return: redis.clients.jedis.Jedis
* @date: 下午1:02 19-9-28
*/
public static Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (JedisException e) {
e.printStackTrace();
} finally {
pool.returnBrokenResource(jedis);
}
return jedis;
}
}
package redis;
import redis.clients.jedis.Jedis;
import java.util.logging.Logger;
/**
* @Auther: ARong
* @Date: 19-9-28 下午1:45
* @Description: Redis 分布式锁实现
**/
public class RedisDistributedLock {
private final static Logger logger = Logger.getLogger(String.valueOf(RedisDistributedLock.class));
//每次请求的时间为200ms
private Long PER_REQ_MILL = null;
//总体等待时间不超过10s
private Long WAIT_TIME_OUT = null;
//锁名称
private String LOCK_NAME = null;
//锁过期时间
private Long EXPIRE_TIME = null;
public RedisDistributedLock() {
LOCK_NAME = Thread.currentThread().getName();
PER_REQ_MILL = 200l; //默认每次自旋请求间隔200ms
WAIT_TIME_OUT = 10 * 1000l; //默认自旋时间10s
EXPIRE_TIME = 2 * 1000l;//默认过期时间2s
}
public RedisDistributedLock(String lockName, Long perReqMill, Long waitTimeOut, Long expireTime) {
LOCK_NAME = lockName;
PER_REQ_MILL = perReqMill;
WAIT_TIME_OUT = waitTimeOut;
EXPIRE_TIME = expireTime;
}
/**
* @auther: Arong
* @description: 获取锁/上锁,不成功则原地自旋
* @param: [lockName]
* @return: void
* @date: 下午1:49 19-9-28
*/
public void lock() {
//先用set key value nx ex expireAt 命令查询是否已经有了该锁
Jedis jedis = JedisUtil.getJedis();
String isSet = jedis.set(this.LOCK_NAME, this.LOCK_NAME, "NX", "EX", this.EXPIRE_TIME / 1000);
if ("OK".equals(isSet)) {
//没有该锁,则直接占用
logger.info("<<<<<线程" + Thread.currentThread().getName() + "占用" + this.LOCK_NAME + "锁成功");
return;
} else {
//该锁仍然存在,原地自旋,每隔一段时间请求一次,直至成功或超时抛出错误
spin();
}
}
/**
* @auther: Arong
* @description: 原地自旋,每隔 perReqMill 毫秒进行一次请求, 总体超时时间为 waitTimeOut 毫秒
* @param: [lockName, perReqMill, waitTimeOut]
* @return: void
* @date: 下午2:13 19-9-28
*/
private void spin() {
//进入时的时间
long beginTime = System.currentTimeMillis();
//获取锁
Jedis jedis = JedisUtil.getJedis();
String isSet = jedis.set(this.LOCK_NAME, this.LOCK_NAME, "NX", "EX", this.EXPIRE_TIME / 1000);
if ("OK".equals(isSet)) {
//没有该锁,则直接占用
logger.info("<<<<<线程" + Thread.currentThread().getName() + "占用" + this.LOCK_NAME + "锁成功");
} else {
while (true) {
try {
//睡眠短暂时间继续请求
Thread.sleep(this.PER_REQ_MILL);
logger.info("线程"+Thread.currentThread().getName()+"自旋睡眠"+this.PER_REQ_MILL+"毫秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
if (System.currentTimeMillis() - beginTime >= this.WAIT_TIME_OUT) {
//已经超时
logger.info("线程"+Thread.currentThread().getName()+"超过了" + this.WAIT_TIME_OUT + "毫秒都无法获取到名为"+this.LOCK_NAME+"的锁,超时退出");
throw new RuntimeException("获取锁失败");
} else {
//没有超时,继续请求
isSet = jedis.set(this.LOCK_NAME, this.LOCK_NAME, "NX", "EX", this.EXPIRE_TIME / 1000);
if ("OK".equals(isSet)) {
//上锁成功
logger.info("<<<<<线程" + Thread.currentThread().getName() + "占用" + this.LOCK_NAME + "锁成功");
break;
}
}
}
}
}
/**
* @auther: Arong
* @description: 解锁
* @param: [lockName]
* @return: void
* @date: 下午2:39 19-9-28
*/
public void unlock() {
Jedis jedis = JedisUtil.getJedis();
//先获取该锁
String lockValue = jedis.get(this.LOCK_NAME);
if (lockValue == null) {
logger.info(">>>>>" + Thread.currentThread().getName() + "解除了对" + this.LOCK_NAME + "的占用");
return;
} else if (this.LOCK_NAME.equals(lockValue)){
Long del = jedis.del(this.LOCK_NAME);
logger.info(">>>>>"+Thread.currentThread().getName() + "解除了对" + this.LOCK_NAME + "的占用");
return;
}
logger.info("*****"+Thread.currentThread().getName() + "无法解除对" + this.LOCK_NAME + "的占用");
}
}
@Test
public void fun() throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(() -> {
//设置每500ms请求自旋一次,超时时间为5s,锁过期时间为5s
RedisDistributedLock lock1 = new RedisDistributedLock("lock", 500l, 10 * 1000l, 5 * 1000l);
lock1.lock();
try {
//占用锁4s钟
Thread.sleep(4 * 1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.unlock();
});
RedisDistributedLock lock2 = new RedisDistributedLock("lock", 500l, 10 * 1000l, 5 * 1000l);
lock2.lock();
Thread.sleep(2000l);
lock2.unlock();
}
接下来模拟自旋超时的情况:
@Test
public void fun3() throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(() -> {
//设置每500ms请求自旋一次,超时时间为5s,锁过期时间为5s
RedisDistributedLock lock1 = new RedisDistributedLock("lock", 500l, 5 * 1000l, 5 * 1000l);
lock1.lock();
try {
//占用锁3s钟
Thread.sleep(3 * 1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.unlock();
});
//请求超时时间2s
RedisDistributedLock lock2 = new RedisDistributedLock("lock", 500l, 2 * 1000l, 5 * 1000l);
lock2.lock();
Thread.sleep(2000l);
lock2.unlock();
}
关于Redis分布式锁的问题其实还有很多可以细究的,例如但是由于个人需求以及技术限制,目前就看到这里吧!
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!