本文小编为大家详细介绍“Redisson分布式信号量RSemaphore如何使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Redisson分布式信号量RSemaphore如何使用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
一、RSemaphore的使用
@Test
public void testRSemaphore() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
// 设置5个许可,模拟五个停车位
rSemaphore.trySetPermits(5);
// 创建10个线程,模拟10辆车过来停车
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
rSemaphore.acquire();
System.out.println(Thread.currentThread().getName() + "进入停车场...");
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
System.out.println(Thread.currentThread().getName() + "离开停车场...");
rSemaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "A" + i).start();
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
二、RSemaphore设置许可数量
初始化RSemaphore,需要调用trySetPermits()设置许可数量:
/**
* 尝试设置许可数量,设置成功,返回true,否则返回false
*/
boolean trySetPermits(int permits);
trySetPermits()内部调用了trySetPermitsAsync():
// 异步设置许可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断分布式信号量的key是否存在,如果不存在,才设置
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
// set "semaphore" permits
// 使用String数据结构设置信号量的许可数
+ "redis.call('set', KEYS[1], ARGV[1]); "
// 发布一条消息到redisson_sc:{semaphore}通道
+ "redis.call('publish', KEYS[2], ARGV[1]); "
// 设置成功,返回1
+ "return 1;"
+ "end;"
// 否则返回0
+ "return 0;",
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.thenAccept(r -> {
if (r) {
log.debug("permits set, permits: {}, name: {}", permits, getName());
} else {
log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
}
});
}
return future;
}
可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:
参数说明:
KEYS[1]: 我们指定的分布式信号量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
KEYS[2]: 释放锁的channel名称,redisson_sc:{分布式信号量key},在本例中,就是redisson_sc:{semaphore}
ARGV[1]: 设置的许可数量
总结设置许可执行流程为:
get semaphore,获取到semaphore信号量的当前的值
第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3。(注意到,如果之前设置过了信号量,将无法再次设置,直接返回0。想要更改信号量总数可以使用addPermits方法)
然后redis发布一些消息,返回1
三、RSemaphore的加锁流程
许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。
public void acquire() throws InterruptedException {
acquire(1);
}
public void acquire(int permits) throws InterruptedException {
// 尝试获取锁成功,直接返回
if (tryAcquire(permits)) {
return;
}
// 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息
CompletableFuture<RedissonLockEntry> future = subscribe();
semaphorePubSub.timeout(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
try {
// 不断循环尝试获取许可
while (true) {
if (tryAcquire(permits)) {
return;
}
entry.getLatch().acquire();
}
} finally {
// 取消订阅
unsubscribe(entry);
}
// get(acquireAsync(permits));
}
可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。
我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:
// 异步获取许可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return new CompletableFutureWrapper<>(true);
}
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 获取当前剩余的许可数量
"local value = redis.call('get', KEYS[1]); " +
// 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
// 通过decrby减少剩余可用许可
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
// 返回1
"return 1; " +
"end; " +
// 其它情况,返回0
"return 0;",
Collections.<Object>singletonList(getRawName()), permits);
}
从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;
总结加锁执行流程为:
get semaphore,获取到一个当前的值,比如说是3,3 > 1
decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
decrby semaphore 1
decrby semaphore 1
执行3次加锁后,semaphore值为0
此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁
四、RSemaphore的解锁流程
通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return new CompletableFutureWrapper<>((Void) null);
}
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
// 通过incrby增加许可数量
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
// 发布一条消息到redisson_sc:{semaphore}中
"redis.call('publish', KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.thenAccept(o -> {
log.debug("released, permits: {}, name: {}", permits, getName());
});
}
return future;
}