在 Redis 中实现分布式互斥锁 常见有两种方式:
SETNX + EXPIRE(早期实现) SET key value NX EX(推荐实现)
核心区别是: SETNX 不是原子操作(需要两步),SET NX EX 是原子操作(一步完成) 。
Star描述项目
1.S->situation(背景)
2.T->task(任务)
3.A->action(动作)
4.R->result(结果)
1.S 商店评价平台这个项目主要是一个评价和查看本地商店生活的项目;主要目的是为了,让用户通过这个项目来获取附近商户的信息以及交友等;
我主要负责这个项目的后端开发,使用的是springboot框架+redis缓存中间件+mysql数据库; 其中还使用了redis分布式锁,Redisson可重入的分布式锁, 新的redis数据类型:Geo,bitmap等;以及lua脚本保证多步redis操作原子性等;
主要实现了 商店浏览 , 探店笔记 , 关注推送, 以及优惠券秒杀等功能;
2.T 在这个项目中,我主要负责 高并发秒杀系统以及缓存优化相关功能的实现 ;
一,Redis缓存优化(缓存穿透、击穿、雪崩)
二,优惠券秒杀系统分布式锁保证并发安全
三,基于Redis Stream实现异步下单
四,附近商户查询,签到功能(Redis GEO,BitMap)
五,关注用户动态推送(Feed流)
3.A 一,Redis缓存优化(缓存穿透、击穿、雪崩) 1,缓存雪崩: 短时间大量的缓存同时过期失效
项目中我是通过在过期时间上加上一个时间随机值来实现的防止缓存同时过期的情况
2,缓存穿透: 短时间大量使用大量不存在的key来进行一个查询,导致大量查询打到数据库,对数据库造成极大的压力
有两种防治方法:1.缓存空对象 2.布隆过滤器,添加可能的key,来过滤那些不可能的请求
布隆过滤器使用比较复杂,且我了解工作中应该也使用较少;所以我项目中使用的是缓存空对象的形式来实现的;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public Shop queryWithPassThrough (Long id) { String key=RedisConstants.CACHE_SHOP_KEY+id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isBlank(shopJson)){ Shop shop=JSONUtil.toBean(shopJson,Shop.class); return shop; } if (shopJson != null ){ return null ; } Shop shop = getById(id); Random r=new Random (); int randomTime=r.nextInt(10 ); if (shop ==null ){ stringRedisTemplate.opsForValue().set(key,"" ,RedisConstants.CACHE_NULL_TTL+randomTime,TimeUnit.MINUTES); return null ; } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL+randomTime,TimeUnit.MINUTES); return shop; }
3,缓存击穿: 热点key失效,短时间大量的请求查询这个key,导致大量的请求打到数据库,对数据库造成极大的压力
解决方法有: 1.对于热点key实现一个永不过期策略; 2.使用逻辑过期策略,添加一个字段来为户key的过期时间,实际上redis中对key是不加ttl的, 3.使用互斥锁来使得大多数请求只有一个请求能真正打到数据库上,查询数据库,同时建立一个缓存;
项目中主要是通过逻辑过期和使用互斥锁两种策略来防止的缓存击穿;
逻辑过期: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 @Data public class RedisData { private LocalDateTime expireTime; private Object data; } private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 ); public Shop queryWithLogicalExpire (Long id) { String key=RedisConstants.CACHE_SHOP_KEY +id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(shopJson)){ return null ; } RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())){ return shop; } String lockKey=RedisConstants.LOCK_SHOP_KEY+id; boolean flag = tryLock(lockKey); if (flag){ CACHE_REBUILD_EXECUTOR.submit(()->{ try { this .saveShop2Redis(id,20L ); } catch (Exception e) { throw new RuntimeException (e); } finally { unLock(lockKey); } }); } return shop; } public void saveShop2Redis (long id,Long expireTime) { Shop shop = getById(id); RedisData redisData=new RedisData (); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime)); stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData)); } private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unLock (String key) { stringRedisTemplate.delete(key); }
使用互斥锁: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public Shop queryWithMutex (Long id) { String key=RedisConstants.CACHE_SHOP_KEY +id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isBlank(shopJson)){ Shop shop = JSONUtil.toBean(shopJson, Shop.class); return shop; } if (shopJson != null ){ return null ; } String lockKey=RedisConstants.LOCK_SHOP_KEY+id; Shop shop= null ; try { boolean flag = tryLock(lockKey); if (! flag){ Thread.sleep(50 ); return queryWithMutex(id); } shop = getById(id); Thread.sleep(200 ); Random random=new Random (); int randomTime = random.nextInt(10 ); if (shop==null ){ stringRedisTemplate.opsForValue().set(key,"" ,RedisConstants.CACHE_NULL_TTL+randomTime,TimeUnit.MINUTES); return null ; } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL,TimeUnit.MINUTES); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { unLock(lockKey); } return shop; }
二,优惠券秒杀系统分布式锁保证并发安全 准备: 为了实现一个全局唯一id , 防止不同进程中相同线程误删锁的问题; 定义了一个RedisWorker类来生成id
也可以使用雪花算法替代,可以不依赖于redis;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Component public class RedisIdWorker { private static final Long BEGIN_TIMESTAMP=1735689600L ; private static final Long COUNT_BITES=32L ; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } public Long nextId (String keyPrefix) { LocalDateTime now=LocalDateTime.now(); long second = now.toEpochSecond(ZoneOffset.UTC); second-=BEGIN_TIMESTAMP; String format = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); Long count= stringRedisTemplate.opsForValue().increment("icr" + keyPrefix + format); return second<<COUNT_BITES | count; } }
使用lua脚本,原子性删除锁,同时校验线程标识,防止误删
1 2 3 4 5 6 --比较线程和锁的标识是否相同 if (redis.call('get' ,keys[1 ])==argv[1 ]) then --释放锁 redis.call('del' ,keys[1 ]) end return 0
使用redission可重入的分布式锁来实现一个秒杀防超卖
有两个优点:1.可以重入,一个线程可以多次拿到同一个锁, 2.看门狗机制,每10秒进行一个续期,防止业务每执行完,锁就到时提前释放了
整体架构(先理解这个) 这段代码的秒杀流程其实是 5步架构 :
1 2 3 4 5 6 7 8 9 10 11 12 13 用户请求 ↓ Lua脚本判断库存 + 一人一单 ↓ 成功 ↓ 放入阻塞队列 ↓ 异步线程处理订单 ↓ Redis分布式锁 ↓ 数据库事务创建订单
用户请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString()); int r = result.intValue(); if (r !=0 ){ return Result.fail(r==1 ? "库存不足" :"用户不能重复下单" ); } VoucherOrder voucherOrder=new VoucherOrder (); voucherOrder.setId(redisIdWorker.nextId("order" )); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); proxy=(IVoucherOrderService) AopContext.currentProxy(); return Result.ok(voucherOrder.getId()); }
Lua脚本判断库存 + 一人一单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 local voucherID=ARGV[1 ]local userId=ARGV[2 ]local stockKey="seckill:stock:" ..voucherIDlocal orderKey="seckill:order:" ..voucherIDif (tonumber (redis.call('get' ,stockKey))<=0 ) then return 1 end if (redis.call('sismember' ,orderKey,userId)==1 ) then return 2 end redis.call('incrby' ,stockKey,-1 ) redis.call('sadd' ,orderKey,userId) return 0
加载lua脚本并创建阻塞队列:
1 2 3 4 5 6 7 8 9 static { SECKILL_SCRIPT =new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue <>(1024 * 1024 );
创建线程池用于异步下单操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static final ExecutorService SECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();@PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ){ try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("出现异常:" ,e); } } } }
获取分布式锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private IVoucherOrderService proxy;private void handleVoucherOrder (VoucherOrder voucherOrder) { SimpleRedisLock lock = new SimpleRedisLock ("order:" + voucherOrder.getUserId(), stringRedisTemplate); boolean flag = lock.tryLock(120 ); if (!flag) { log.error("不能重复下单" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
创建订单,更新数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long id = UserHolder.getUser().getId(); Integer count = query().eq("voucher_id" ,voucherOrder.getVoucherId()).eq("user_id" , id).count(); if (count > 0 ) { log.error("用户已购买过一次" ); return ; } boolean success = seckillVoucherService.update() .setSql("stock=stock-1" ) .eq("voucher_id" , voucherOrder.getVoucherId()) .gt("stock" , 0 ) .update(); if (!success) { log.error("库存不足" ); return ; } save(voucherOrder); }
总体的调用链路大概是: 先是请求达到seckillVoucher 方法, 内部执行lua脚本:判断库存 + 一人一单,并添加订单到阻塞队列,同时创建当前类的代理对象, 这个代理是全局变量,在这个方法内获取; 然后正常返回订单id; 此时因为写了init()初始化方法调用VoucherOrderHandler()方法; VoucherOrderHandler()方法内部又调用了,handleVoucherOrder()方法来获取分布式锁来防止订单重复消费;handleVoucherOrder()方法内部又调用了代理对象来执行createVoucherOrder()方法,createVoucherOrder()来使用事务原子性修改数据库扣减库存,写入订单数据到数据库;
Q1:为什么init()方法没调用就执行了? Q2: 为什么要创建代理对象来调用createVoucherOrder方法?
R1: init() 方法之所以 没有在代码里手动调用却执行了 ,是因为它使用了 @PostConstruct 注解 。这个注解会让 Spring 在 Bean 初始化完成后自动调用该方法
Bean的生命周期:
1 2 3 4 5 6 7 8 9 1 Bean定义加载 2 Bean实例化 3 依赖注入 4 BeanNameAware / BeanFactoryAware 等回调 5 BeanPostProcessor 前置处理 6 初始化方法 7 BeanPostProcessor 后置处理 8 Bean可用 9 Bean销毁
想在 Bean 加载(初始化)前后或销毁前后执行一些逻辑 ,通常有 4 种常见方式 :
1.
2.
3.
4.
R2: 创建代理对象来调用 createVoucherOrder() 的根本原因是为了让 @Transactional 事务生效 。因为 Spring 的事务是通过 AOP 代理实现的,而不是直接作用在方法上的
事务失效的场景: 1 同类方法内部调用 同一个类中一个方法调用另一个带 @Transactional 的方法。
1 2 3 4 5 6 7 8 9 10 11 12 @Service public class OrderService { public void create () { this .saveOrder(); } @Transactional public void saveOrder () { } }
原因 Spring 事务依赖 代理对象调用 :
但 this.saveOrder() 调用路径是:
没有经过代理。
解决方案 1 ((OrderService)AopContext.currentProxy()).saveOrder();
或者:
1 2 3 4 @Autowired private OrderService orderService;orderService.saveOrder();
2 方法不是public Spring 默认只对 public 方法 进行事务增强。
1 2 3 4 5 6 @Transactional private void saveOrder () {} @Transactional protected void saveOrder () {}
都会导致事务失效。
原因 Spring AOP 默认基于 代理机制 :
而代理只会拦截 public 方法 。
3 方法被final修饰 1 2 3 @Transactional public final void saveOrder () {}
事务会失效。
原因 如果使用 CGLIB 代理 :
而:
所以无法增强。
4 Bean不是Spring管理 如果对象不是由 Spring 创建:
1 2 OrderService orderService = new OrderService ();orderService.saveOrder();
事务不会生效。
原因
只有被 Spring 扫描的 Bean 才能被代理
5 异常被捕获没有抛出 事务默认只在 抛出异常时回滚 。
如果异常被捕获:
1 2 3 4 5 6 7 8 @Transactional public void saveOrder () { try { int i = 1 /0 ; }catch (Exception e){ e.printStackTrace(); } }
结果:
1 事务不会回滚; 需要throw e才能使事务正常执行
原因是:事务管理器会认为事务正常执行了
6 抛出的是非RuntimeException Spring 默认只对 运行时异常 回滚。
不会回滚。
原因 默认规则:
1 RuntimeException 或 Error 才回滚
解决 1 @Transactional(rollbackFor = Exception.class)
四,附近商户查询,签到功能(Redis GEO,BitMap)
附近商户: 使用之前要在启动类中提前加载商户信息,将商户经纬度存入redis的GEO中; 使用geo的原因: 传统的使用数据库查询,距离查询计算很慢,并发性能差,GEO是基于内存存储的,性能好;
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Override public Result queryShopByType (Integer typeId, Integer current, Double x, Double y) { if (current == null || current <1 ) current = 1 ; if (x == null || y == null ) { Page<Shop> page = query() .eq("type_id" , typeId) .page(new Page <>(current, SystemConstants.DEFAULT_PAGE_SIZE)); return Result.ok(page.getRecords()); } int from = (current - 1 ) * SystemConstants.DEFAULT_PAGE_SIZE; int end = current * SystemConstants.DEFAULT_PAGE_SIZE; String key = SHOP_GEO_KEY + typeId; GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() .search( key, GeoReference.fromCoordinate(x, y), new Distance (5000 ), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) ); if (results == null ) { return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent(); if (list.size() <= from) { return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> pageItems = list.stream() .skip(from) .limit(SystemConstants.DEFAULT_PAGE_SIZE) .collect(Collectors.toList()); List<Long> ids = new ArrayList <>(pageItems.size()); Map<String, Distance> distanceMap = new HashMap <>(pageItems.size()); for (GeoResult<RedisGeoCommands.GeoLocation<String>> item : pageItems) { String shopIdStr = item.getContent().getName(); ids.add(Long.valueOf(shopIdStr)); distanceMap.put(shopIdStr, item.getDistance()); }
2 GEO 的原理 Redis GEO 本质上是:
1 SortedSet + GeoHash //所以GEO查询时会根据距离自动排序
Redis 会把:
转换成:
并存入:
结构类似:
1 2 3 4 5 6 shop:geo:1 member score shop1 geohash shop2 geohash shop3 geohash
签到功能: 传统的 Set 或 List 存储一个整型用户 ID(如 Java 的 int)需要 32 bits (4 字节),而 Bitmap 仅需 1 bit 即可表示该 ID 对应的某种状态
BitMap 原理 BitMap 本质:
每一位表示一天:
例如:
表示:
1 2 3 4 5 6 第1天 未签到 第2天 签到 第3天 签到 第4天 签到 第5天 未签到 第6天 签到
签到实现 Key 设计:
例如:
表示:
用户签到: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public Result sign () { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM" )); String key = RedisConstants.USER_SIGN_KEY + userId + keySuffix; int dayOfMonth = now.getDayOfMonth(); stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1 , true ); return Result.ok(); }
用户签到数统计: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Override public Result signCount () { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM" )); String key = RedisConstants.USER_SIGN_KEY + userId + keySuffix; int dayOfMonth = now.getDayOfMonth(); List<Long> result = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0 )); if (result == null || result.isEmpty()) { return Result.ok(0 ); } long num=result.get(0 ); int count=0 ; while (true ){ if ((num & 1 )==0 ){ break ; }else { count++; } num>>>=1 ; } return Result.ok(count); }
4.R