温州营销网站制作联系电话,烟台企业展厅设计公司,做外汇网站做什么类型网站好,有赞微商城是什么↑↑↑请在文章开头处下载测试项目源代码↑↑↑ 文章目录 前言4.5 分布式锁-Redisson4.5.4 Redission锁重试4.5.5 WatchDog机制4.5.5 MutiLock原理 4.6 秒杀优化4.6.1 优化方案4.6.2 完成秒杀优化 4.7 Redis消息队列4.7.1 基于List实现消息队列4.7.2 基于PubSub的消息队列4.7.…↑↑↑请在文章开头处下载测试项目源代码↑↑↑ 文章目录 前言4.5 分布式锁-Redisson4.5.4 Redission锁重试4.5.5 WatchDog机制4.5.5 MutiLock原理 4.6 秒杀优化4.6.1 优化方案4.6.2 完成秒杀优化 4.7 Redis消息队列4.7.1 基于List实现消息队列4.7.2 基于PubSub的消息队列4.7.3 基于Stream的消息队列4.7.4 基于Stream的消息队列-消费者组4.7.5 基于Stream的消息队列实现异步秒杀下单 前言
Redis实战系列文章
Redis从入门到精通(四)Redis实战(一)短信登录 Redis从入门到精通(五)Redis实战(二)商户查询缓存 Redis从入门到精通(六)Redis实战(三)优惠券秒杀 Redis从入门到精通(七)Redis实战(四)库存超卖、一人一单与Redis分布式锁 Redis从入门到精通(八)Redis实战(五)分布式锁误删与原子性问题、Redisson
4.5 分布式锁-Redisson
上一节对Redisson进行了快速入门并分析了可重入锁的基本原理下面继续研究一些Redisson的几个功能。
4.5.4 Redission锁重试
// org.redisson.RedissonLock#lock()long threadId Thread.currentThread().getId();
Long ttl tryAcquire(-1, leaseTime, unit, threadId);
// 返回null表示获取锁成功否则返回锁的剩余生存时间
if (ttl null) {return;
}// ......// 重试获取锁
while (true) {ttl tryAcquire(-1, leaseTime, unit, threadId);if (ttl null) {break;}// ......
}由以上源码可知在RedissonLock类的lock()方法中会调用tryAcquire()方法尝试获取锁。tryAcquire()方法的原理在上一节已经分析过返回null表示获取锁成功否则返回锁的剩余生存时间。
如果第一次获取锁失败程序会进入一个while循环重试获取锁。
4.5.5 WatchDog机制
// org.redisson.RedissonLockprivate T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime ! -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 调用Lua脚本RFutureLong ttlRemainingFuture tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) - {if (e ! null) {return;}// 执行看门狗机制if (ttlRemaining null) {scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}private void scheduleExpirationRenewal(long threadId) {// ......} else {entry.addThreadId(threadId);// 执行看门狗renewExpiration();}
}private void renewExpiration() {// ......Timeout task commandExecutor.getConnectionManager().newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) throws Exception {// ......// 调用Lua脚本刷新锁的有效时间RFutureBoolean future renewExpirationAsync(threadId);future.onComplete((res, e) - {if (e ! null) {// loggerreturn;}if (res) {// 递归执行看门狗renewExpiration();}});}// 10s执行一次}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}由以上源码可知在RedissonLock类的tryAcquireAsync()方法中除了调用Lua脚本获取锁还会运行看门狗机制。该机制会调用Lua脚本刷新锁的有效时间同时每10s递归执行一次看门狗。
4.5.5 MutiLock原理
为了提高Redis的可用性一般会搭建集群或者主从。
以主从为例此时要去获取锁命令写在主机上主机会将数据同步给从机。假设在主机还没有来得及把数据写入到从机去的时候主机宕机了哨兵会发现主机宕机并且选举一个Slave变成Master但此时新的Master中实际上并没有锁信息相当于此时锁信息已经丢掉了。
为了解决这个问题Redission提出来了MutiLock锁使用这种锁后每个节点的地位都是一样的加锁的逻辑需要把数据写入到每一个主丛节点上只有所有的节点都写入成功此时才是真的加锁成功。
假设现在某个节点挂了那么去获得锁的时候有一个节点拿不到不能算是加锁成功就保证了加锁的可靠性。
4.6 秒杀优化
4.6.1 优化方案
现存问题 如上图所示秒杀下单包括六个步骤查询优惠券、判断秒杀库存、查询订单、校验一人一单、减库存、创建订单。
在这六步操作中有很多操作是要去操作数据库的而且还是一个线程串行执行 这样就会导致程序执行的很慢。 那么如何加速呢
优化方案
把简单的校验例如是否有库存、是否一人一单做完后就直接给用户返回成功或失败而不必等待订单创建完成。如果确定可以下单则将订单的相关信息写入队列然后再创建一个线程让新线程读取队列信息异步进行下单。 如下图所示 整体思路
当用户下单时首先通过Redis判断库存是否充足如果不充足则直接返回失败充足的话再通过Redis判断用户是否已经下过单如果已经下过单则直接返回失败如果没有下过单则说明可以下单进行库存扣减并将用户ID存入当前优惠券的集合中。由于以上过程需要保证原子性因此可以通过Lua脚本来完成。可以成功下单Lua脚本返回0。
接着判断Lua脚本的执行结果。如果Lua脚本返回0说明可以下单则将优惠券ID、用户ID和订单ID存入阻塞队列并返回订单ID给用户如果Lua脚本没有返回0则直接返回错误信息给用户。
最后进行异步下单即通过额外线程读取阻塞队列的信息并真正进行下单。完整的流程如下图所示。 4.6.2 完成秒杀优化
需求1新增秒杀优惠券的同时将优惠券信息保存到Redis中
// com.star.redis.dzdp.service.impl.VoucherServiceImplOverride
public BaseResult addSeckillVoucher(Voucher voucher) {log.info(add a seckill voucher, {}, voucher.toString());// 1.保存优惠券信息save(voucher);log.info(add voucher success. id {}, voucher.getId());// 2.保存秒杀信息SeckillVoucher seckillVoucher new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 3.将秒杀优惠券的库存保存到RedisString key seckill:stock: voucher.getId();stringRedisTemplate.opsForValue().set(key, voucher.getStock().toString());log.info(set to Redis : Key {}, Value {}, key, voucher.getStock().toString());return BaseResult.setOk(新增秒杀券成功);
}调用/voucher/seckill/order接口新增一个描述优惠券 在Redis中可以看到该秒杀优惠券的库存信息 需求2基于Lua脚本判断秒杀库存、一人一单决定用户是否抢购成功
在resources目录下新建一个order.lua文件其内容如下
-- 1.参数列表
-- 1.1.优惠券id
local voucherId ARGV[1]
-- 1.2.用户id
local userId ARGV[2]
-- 1.3.订单id
local orderId ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey seckill:stock: .. voucherId
-- 2.2.订单key
local orderKey seckill:order: .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call(get, stockKey)) 0) then-- 3.2.库存不足返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call(sismember, orderKey, userId) 1) then-- 3.3.存在说明是重复下单返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call(incrby, stockKey, -1)
-- 3.5.下单保存用户sadd orderKey userId
redis.call(sadd, orderKey, userId)
return 0需求3如果抢购成功将优惠券ID、用户ID和订单ID封装后存入阻塞队列
修改VoucherOrderServiceImpl类的下单方法seckillVoucher()
// com.star.redis.dzdp.service.impl.VoucherOrderServiceImpl/** 保存订单信息的队列 */
private BlockingQueueVoucherOrder orderTasks new ArrayBlockingQueue(1024 * 1024);Override
public BaseResultLong seckillVoucher(Long voucherId, Long userId) {log.info(开始秒杀下单...voucherId {}, userId {}, voucherId, userId);Long orderId RedisIdWorker.nextId(stringRedisTemplate, voucher_order);log.info(get orderId {}, orderId);// 1.执行Lua脚本DefaultRedisScriptLong script new DefaultRedisScript();script.setLocation(new ClassPathResource(order.lua));script.setResultType(Long.class);Long result stringRedisTemplate.execute(script, Collections.emptyList(),voucherId.toString(), userId.toString(), orderId.toString());log.info(execute order.lua result {}, result);// 2.判断执行结果if(result null || result ! 0) {// 结果为空或者不为0String message (result null || result 1) ? 库存不足 : 不能重复下单;log.error(message);return BaseResult.setFail(message);}// 3.结果为0将优惠券ID、用户ID和订单ID封装后存入阻塞队列VoucherOrder voucherOrder new VoucherOrder();voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(userId);voucherOrder.setId(orderId);orderTasks.add(voucherOrder);log.info(add voucherId {}, userId {}, orderId {} to queue.. done.,voucherId, userId, orderId);// 4.返回订单IDlog.info(秒杀下单返回...orderId {}, orderId);return BaseResult.setOkWithData(orderId);
}需求4开启线程任务不断从阻塞队列中获取信息实现异步下单功能
// com.star.redis.dzdp.service.impl.VoucherOrderServiceImpl/** 异步执行下单动作的线程池 */
private static final ExecutorService SECKILL_ORDER_EXECUTOR Executors.newSingleThreadExecutor();/** 类初始化之后立即初始化线程池 */
PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}/*** 处理订单的内部类*/
private class VoucherOrderHandler implements Runnable {Overridepublic void run() {// while循环持续读取队列中的信息while (true) {try {log.info(begin);// 1.获取队列中的订单信息VoucherOrder voucherOrder orderTasks.take();log.info(get from queue : {}, voucherOrder.toString());// 2.创建订单handleVoucherOrder(voucherOrder);log.info(end);} catch (Exception e) {log.error(处理异常订单, e);}}}/*** 处理订单*/private void handleVoucherOrder(VoucherOrder voucherOrder) {// 1.创建锁对象RLock rLock redissonClient.getLock(lock:order: voucherOrder.getUserId());// 2.尝试获取锁boolean isLock rLock.tryLock();log.info(isLock {}, isLock);// 3.判断是否获取锁成功if(!isLock) {// 获取锁失败log.error(不允许重复下单);return;}try {// 4.持锁真正创建订单checkAndCreateVoucherOrder(voucherOrder.getVoucherId(), voucherOrder.getUserId());} finally {// 5.释放锁rLock.unlock();log.info(unlock done.);}}/*** 持锁真正创建订单*/private void createVoucherOrder(VoucherOrder voucherOrder) {log.info(begin createVoucherOrder... voucherId {}, userId {}, orderId {},voucherOrder.getVoucherId(), voucherOrder.getUserId(), voucherOrder.getId());// 1.增加一人一单规则int count query().eq(voucher_id, voucherOrder.getVoucherId()).eq(user_id, voucherOrder.getUserId()).count();log.info(old order count {}, count);if(count 0) {// 该用户已下过单log.error(每个帐号只能抢购一张优惠券);return;}// 2.扣减库存boolean update seckillVoucherService.update().setSql(stock stock - 1).eq(voucher_id, voucherOrder.getVoucherId()).gt(stock, 0).update();log.info(update result {}, update);if(!update) {// 扣减库存失败返回抢券失败log.error(库存不足抢券失败);return;}// 3.创建订单voucherOrder.setPayTime(new Date());voucherOrderService.save(voucherOrder);}
}下面借助工具对秒杀下单接口进行性能测试结果如下 由于使用的是同一用户因此971个请求中只有一个请求是成功的其余的请求都失败。查看此时Redis中的订单数据只有1条 4.7 Redis消息队列 如上图所示最简单的消息队列包含3个角色
消息队列存储和管理消息也被称为消息代理Message Broker生产者发送消息到消息队列消费者从消息队列获取消息并处理消息。
使用队列的好处在于解耦。 在秒杀下单中用户下单之后利用Redis去进行校验下单条件再通过队列把消息发送出去然后再启动一个线程去消费这个消息完成解耦同时也加快了响应速度。
4.7.1 基于List实现消息队列
Redis的List数据结构是一个双向链表很容易模拟出队列效果。我们可以利用LPUSH结合RPOP、或者RPUSH结合LPOP来实现。
不过要注意的是当队列中没有消息时RPOP或LPOP操作会返回null并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。如图 基于List的消息队列有哪些优缺点
优点
利用Redis存储不受限于JVM内存上限基于Redis的持久化机制数据安全性有保证可以满足消息有序性。
缺点
无法避免消息丢失只支持单消费者。
4.7.2 基于PubSub的消息队列
PubSub发布订阅是Redis2.0版本引入的消息传递模型。顾名思义消费者可以订阅一个或多个channel生产者向对应channel发送消息后所有订阅者都能收到相关消息。如图 主要命令有
# 订阅一个或多个频道
SUBSCRIBE channel [channel]
# 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern[pattern]
# 向一个频道发送消息
PUBLISH channel msg基于PubSub的消息队列有哪些优缺点
优点
采用发布订阅模型支持多生产、多消费。
缺点
不支持数据持久化无法避免消息丢失消息堆积有上限超出时数据丢失。
4.7.3 基于Stream的消息队列
Stream是Redis 5.0引入的一种新数据类型可以实现一个功能非常完善的消息队列。
发送消息的命令是 例如
127.0.0.1:6379 XADD users * name Rose age 22
1712458704764-0
127.0.0.1:6379 XADD users * name Jack age 30
1712458778623-0读取消息的方式之一XREAD 例如使用XREAD读取第一个消息
127.0.0.1:6379 XREAD COUNT 1 STREAMS users 0
1) 1) users2) 1) 1) 1712458704764-02) 1) name2) Rose3) age4) 22XREAD阻塞方式读取最新消息
# 阻塞1秒
127.0.0.1:6379 XREAD COUNT 1 BLOCK 1000 STREAMS users $
(nil)
(1.02s)基于STREAM的消息队列的特点
消息可回溯一个消息可以被多个消费者读取可以阻塞读取有消息漏读的风险。
4.7.4 基于Stream的消息队列-消费者组
消费者组Consumer Group就是将多个消费者划分到一个组中监听同一个队列。它具备下列特点 创建消费者组
127.0.0.1:6379 XGROUP CREATE users a_group 0
OK给自定的消费者组添加消费者
127.0.0.1:6379 XGROUP CREATECONSUMER users a_group a_consumer1
(integer) 1从消费者组读取消息
127.0.0.1:6379 XREADGROUP GROUP a_group a_consumer1 COUNT 1 STREAMS users 0
1) 1) users2) (empty array)基于STREAM消费者组的消息队列的特点
消息可回溯可以多消费者争抢消息加快消费速度可以阻塞读取没有消息漏读的风险有消息确认机制保证消息至少被消费一次。
下面对比一下这4种消息队列的特点 经过比较本案例选择使用基于Stream的消息队列来实现异步秒杀下单。
4.7.5 基于Stream的消息队列实现异步秒杀下单
修改秒杀下单Lua脚本order.lua在认定有抢购资格后直接向stream.orders队列中添加消息内容包含voucherId、userId、orderId
-- ...-- 新增逻辑
-- 3.6.发送消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...
redis.call(xadd, stream.orders, *, userId, userId, voucherId, voucherId, id, orderId)return 0修改消息读取策略改为读取Redis的Stream结构队列
// com.star.redis.dzdp.service.impl.VoucherOrderServiceImpl#seckillVoucher()// ......// 3.结果为0将优惠券ID、用户ID和订单ID封装后存入阻塞队列
// 新逻辑这里不再保存队列在lua脚本中保存
// VoucherOrder voucherOrder new VoucherOrder();
// voucherOrder.setVoucherId(voucherId);
// voucherOrder.setUserId(userId);
// voucherOrder.setId(orderId);
// orderTasks.add(voucherOrder);
// log.info(add voucherId {}, userId {}, orderId {} to queue.. done.,
// voucherId, userId, orderId);// 4.返回订单ID
log.info(秒杀下单返回...orderId {}, orderId);
return BaseResult.setOkWithData(orderId);// com.star.redis.dzdp.service.impl.VoucherOrderServiceImplprivate class VoucherOrderHandler implements Runnable {Overridepublic void run() {// 持续读取队列中的信息while (true) {try {log.info(begin);// 1.获取队列中的订单信息// VoucherOrder voucherOrder orderTasks.take();// log.info(get from queue : {}, voucherOrder.toString());// 1.新逻辑读取Redis的Stream消息队列// XREADGROUP GROUP a_group a_consumer1 COUNT 1 BLOCK 2000 STREAMS stream.orders ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(a_group, a_consumer1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(stream.orders, ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if(list null || list.isEmpty()) {// 如果为空说明没有消息继续下一次循环continue;}// 3.解析数据MapRecordString, Object, Object record list.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);log.info(get from Redis Stream queue : id {}, {}, record.getId(), voucherOrder.toString());// 4.创建订单handleVoucherOrder(voucherOrder);// 5.确认消息stringRedisTemplate.opsForStream().acknowledge(stream.orders, a_group, record.getId());log.info(ack message done.);log.info(end);} catch (Exception e) {log.error(处理异常订单, e);}}}// ......
}测试
[http-nio-8081-exec-2] 开始秒杀下单...voucherId 15, userId 1012
[http-nio-8081-exec-2] get orderId 7354966481756487681
[http-nio-8081-exec-2] execute order.lua result 0
[http-nio-8081-exec-2] add voucherId 15, userId 1012, orderId 7354966481756487681 to queue.. done.
[http-nio-8081-exec-2] 秒杀下单返回...orderId 7354966481756487681
// 创建新线程异步处理下单逻辑
// 成功获取到Stream队列的消息
[pool-2-thread-1] get from Redis Stream queue : id 1712461578801-0, VoucherOrder(id7354966481756487681, userId1012, voucherId15, payTypenull, statusnull, createTimenull, payTimenull, useTimenull, refundTimenull, updateTimenull)
[pool-2-thread-1] isLock true
[pool-2-thread-1] begin createVoucherOrder... voucherId 15, userId 1012, orderId 7354966481756487681
[pool-2-thread-1] Preparing: SELECT COUNT( * ) FROM tb_voucher_order WHERE (voucher_id ? AND user_id ?)
[pool-2-thread-1] Parameters: 15(Long), 1012(Long)
[pool-2-thread-1] Total: 1
[pool-2-thread-1] old order count 0
[pool-2-thread-1] Preparing: UPDATE tb_seckill_voucher SET stock stock - 1 WHERE (voucher_id ? AND stock ?)
[pool-2-thread-1] Parameters: 15(Long), 0(Integer)
[pool-2-thread-1] Updates: 1
[pool-2-thread-1] update result true
[pool-2-thread-1] Preparing: INSERT INTO tb_voucher_order ( id, user_id, voucher_id, pay_time ) VALUES ( ?, ?, ?, ? )
[pool-2-thread-1] Parameters: 7354966481756487681(Long), 1012(Long), 15(Long), 2024-04-07 11:46:21.208(Timestamp)
[pool-2-thread-1] Updates: 1
[pool-2-thread-1] unlock done.
// 消息确认完成
[pool-2-thread-1] ack message done.可见基于Stream的消息队列正常工作。
…
本节完更多内容请查阅分类专栏Redis从入门到精通
感兴趣的读者还可以查阅我的另外几个专栏
SpringBoot源码解读与原理分析(已完结)MyBatis3源码深度解析(已完结)再探Java为面试赋能(持续更新中…)