数据库与REDIS缓存数据一致性解决方案
数据库与缓存读写模式策略
写完数据库后是否需要马上更新缓存还是直接删除缓存?
(1)、如果写数据库的值与更新到缓存值是一样的,不需要经过任何的计算,可以马上更新缓存,但是如果对于那种写数据频繁而读数据少的场景并不合适这种解决方案,因为也许还没有查询就被删除或修改了,这样会浪费时间和资源
(2)、如果写数据库的值与更新缓存的值不一致,写入缓存中的数据需要经过几个表的关联计算后得到的结果插入缓存中,那就没有必要马上更新缓存,只有删除缓存即可,等到查询的时候在去把计算后得到的结果插入到缓存中即可。
所以一般的策略是当更新数据时,先删除缓存数据,然后更新数据库,而不是更新缓存,等要查询的时候才把最新的数据更新到缓存
数据库与缓存双写情况下导致数据不一致问题
场景一
当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后删除缓存,发现删除缓存失败了,这意味着数据库存的是99,而缓存是100,这导致数据库和缓存不一致。
场景一解决方案
这种情况应该是先删除缓存,然后在更新数据库,如果删除缓存失败,那就不要更新数据库,如果说删除缓存成功,而更新数据库失败,那查询的时候只是从数据库里查了旧的数据而已,这样就能保持数据库与缓存的一致性。
场景二
在高并发的情况下,如果当删除完缓存的时候,这时去更新数据库,但还没有更新完,另外一个请求来查询数据,发现缓存里没有,就去数据库里查,还是以上面商品库存为例,如果数据库中产品的库存是100,那么查询到的库存是100,然后插入缓存,插入完缓存后,原来那个更新数据库的线程把数据库更新为了99,导致数据库与缓存不一致的情况
场景二解决方案
遇到这种情况,可以用队列的去解决这个问,创建几个队列,如20个,根据商品的ID去做hash值,然后对队列个数取摸,当有数据更新请求时,先把它丢到队列里去,当更新完后在从队列里去除,如果在更新的过程中,遇到以上场景,先去缓存里看下有没有数据,如果没有,可以先去队列里看是否有相同商品ID在做更新,如果有也把查询的请求发送到队列里去,然后同步等待缓存更新完成。
这里有一个优化点,如果发现队列里有一个查询请求了,那么就不要放新的查询操作进去了,用一个while(true)循环去查询缓存,循环个200MS左右,如果缓存里还没有则直接取数据库的旧数据,一般情况下是可以取到的。
在高并发下解决场景二要注意的问题
(1)读请求时长阻塞
由于读请求进行了非常轻度的异步化,所以一定要注意读超时的问题,每个读请求必须在超时间内返回,该解决方案最大的风险在于可能数据更新很频繁,导致队列中挤压了大量的更新操作在里面,然后读请求会发生大量的超时,最后导致大量的请求直接走数据库,像遇到这种情况,一般要做好足够的压力测试,如果压力过大,需要根据实际情况添加机器。
(2)请求并发量过高
这里还是要做好压力测试,多模拟真实场景,并发量在最高的时候QPS多少,扛不住就要多加机器,还有就是做好读写比例是多少
(3)多服务实例部署的请求路由
可能这个服务部署了多个实例,那么必须保证说,执行数据更新操作,以及执行缓存更新操作的请求,都通过nginx服务器路由到相同的服务实例上
(4)热点商品的路由问题,导致请求的倾斜
某些商品的读请求特别高,全部打到了相同的机器的相同丢列里了,可能造成某台服务器压力过大,因为只有在商品数据更新的时候才会清空缓存,然后才会导致读写并发,所以更新频率不是太高的话,这个问题的影响并不是很大,但是确实有可能某些服务器的负载会高一些。
数据库与缓存数据一致性解决方案流程图
数据库与缓存数据一致性解决方案对应代码
商品库存实体
package com.shux.inventory.entity; public class InventoryProduct { private Integer productId; private Long InventoryCnt; public Integer getProductId() { return productId; } public void setProductId(Integer productId) { this.productId = productId; } public Long getInventoryCnt() { return InventoryCnt; } public void setInventoryCnt(Long inventoryCnt) { InventoryCnt = inventoryCnt; } }
请求接口
public interface Request { public void process(); public Integer getProductId(); public boolean isForceFefresh(); }
数据更新请求
package com.shux.inventory.request; import org.springframework.transaction.annotation.Transactional; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; /** ********************************************** * 描述:更新库存信息 * 1、先删除缓存中的数据 * 2、更新数据库中的数据 ********************************************** **/ public class InventoryUpdateDBRequest implements Request{ private InventoryProductBiz inventoryProductBiz; private InventoryProduct inventoryProduct; public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){ this.inventoryProduct = inventoryProduct; this.inventoryProductBiz = inventoryProductBiz; } @Override @Transactional public void process() { inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId()); inventoryProductBiz.updateInventoryProduct(inventoryProduct); } @Override public Integer getProductId() { // TODO Auto-generated method stub return inventoryProduct.getProductId(); } @Override public boolean isForceFefresh() { // TODO Auto-generated method stub return false; } }
查询请求
package com.shux.inventory.request; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; /** ********************************************** * 描述:查询缓存数据 * 1、从数据库中查询 * 2、从数据库中查询后插入到缓存中 ********************************************** **/ public class InventoryQueryCacheRequest implements Request { private InventoryProductBiz inventoryProductBiz; private Integer productId; private boolean isForceFefresh; public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) { this.productId = productId; this.inventoryProductBiz = inventoryProductBiz; this.isForceFefresh = isForceFefresh; } @Override public void process() { InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId); inventoryProductBiz.setInventoryProductCache(inventoryProduct); } @Override public Integer getProductId() { // TODO Auto-generated method stub return productId; } public boolean isForceFefresh() { return isForceFefresh; } public void setForceFefresh(boolean isForceFefresh) { this.isForceFefresh = isForceFefresh; } }
spring启动时初始化队列线程池
package com.shux.inventory.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; import com.shux.utils.other.SysConfigUtil; /** ********************************************** * 描述:请求处理线程池,初始化队列数及每个队列最多能处理的数量 ********************************************** **/ public class RequestProcessorThreadPool { private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString()); private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString()); private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum); private RequestProcessorThreadPool(){ for(int i=0;i<blockingQueueNum;i++){//初始化队列 ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(queueDataNum);//每个队列中放100条数据 RequestQueue.getInstance().addQueue(queue); threadPool.submit(new RequestProcessorThread(queue));//把每个queue交个线程去处理,线程会处理每个queue中的数据 } } public static class Singleton{ private static RequestProcessorThreadPool instance; static{ instance = new RequestProcessorThreadPool(); } public static RequestProcessorThreadPool getInstance(){ return instance; } } public static RequestProcessorThreadPool getInstance(){ return Singleton.getInstance(); } /** * 初始化线程池 */ public static void init(){ getInstance(); } }
请求处理线程
package com.shux.inventory.thread; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import com.shux.inventory.request.InventoryUpdateDBRequest; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; /** ********************************************** * 描述:请求处理线程 ********************************************** **/ public class RequestProcessorThread implements Callable<Boolean>{ private ArrayBlockingQueue<Request> queue; public RequestProcessorThread(ArrayBlockingQueue<Request> queue){ this.queue = queue; } @Override public Boolean call() throws Exception { Request request = queue.take(); Map<Integer,Boolean> flagMap = RequestQueue.getInstance().getFlagMap(); //不需要强制刷新的时候,查询请求去重处理 if (!request.isForceFefresh()){ if (request instanceof InventoryUpdateDBRequest) {//如果是更新请求,那就置为false flagMap.put(request.getProductId(), true); } else { Boolean flag = flagMap.get(request.getProductId()); /** * 标志位为空,有三种情况 * 1、没有过更新请求 * 2、没有查询请求 * 3、数据库中根本没有数据 * 在最初情况,一旦库存了插入了数据,那就好会在缓存中也会放一份数据, * 但这种情况下有可能由于redis中内存满了,redis通过LRU算法把这个商品给清除了,导致缓存中没有数据 * 所以当标志位为空的时候,需要从数据库重查询一次,并且把标志位置为false,以便后面的请求能够从缓存中取 */ if ( flag == null) { flagMap.put(request.getProductId(), false); } /** * 如果不为空,并且flag为true,说明之前有一次更新请求,说明缓存中没有数据了(更新缓存会先删除缓存), * 这个时候就要去刷新缓存,即从数据库中查询一次,并把标志位设置为false */ if ( flag != null && flag) { flagMap.put(request.getProductId(), false); } /** * 这种情况说明之前有一个查询请求,并且把数据刷新到了缓存中,所以这时候就不用去刷新缓存了,直接返回就可以了 */ if (flag != null && !flag) { flagMap.put(request.getProductId(), false); return true; } } } request.process(); return true; } }
请求队列
package com.shux.inventory.request; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; /** ********************************************** * 描述:请求队列 ********************************************** **/ public class RequestQueue { private List<ArrayBlockingQueue<Request>> queues = new ArrayList<>(); private Map<Integer,Boolean> flagMap = new ConcurrentHashMap<>(); private RequestQueue(){ } private static class Singleton{ private static RequestQueue queue; static{ queue = new RequestQueue(); } public static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance(){ return Singleton.getInstance(); } public void addQueue(ArrayBlockingQueue<Request> queue) { queues.add(queue); } public int getQueueSize(){ return queues.size(); } public ArrayBlockingQueue<Request> getQueueByIndex(int index) { return queues.get(index); } public Map<Integer,Boolean> getFlagMap() { return this.flagMap; } }
spring 启动初始化线程池类
package com.shux.inventory.listener; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import com.shux.inventory.thread.RequestProcessorThreadPool; /** ********************************************** * 描述:spring 启动初始化线程池类 ********************************************** **/ public class InitListener implements ApplicationListener<ContextRefreshedEvent>{ @Override public void onApplicationEvent(ContextRefreshedEvent event) { // TODO Auto-generated method stub if(event.getApplicationContext().getParent() != null){ return; } RequestProcessorThreadPool.init(); } }
异步处理请求接口
package com.shux.inventory.biz; import com.shux.inventory.request.Request; /** ********************************************** * 描述:请求异步处理接口,用于路由队列并把请求加入到队列中 ********************************************** **/ public interface IRequestAsyncProcessBiz { void process(Request request); }
异步处理请求接口实现
package com.shux.inventory.biz.impl; import java.util.concurrent.ArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.shux.inventory.biz.IRequestAsyncProcessBiz; import com.shux.inventory.request.Request; import com.shux.inventory.request.RequestQueue; /** ********************************************** * 描述:异步处理请求,用于路由队列并把请求加入到队列中 ********************************************** **/ @Service("requestAsyncProcessService") public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void process(Request request) { // 做请求的路由,根据productId路由到对应的队列 ArrayBlockingQueue<Request> queue = getQueueByProductId(request.getProductId()); try { queue.put(request); } catch (InterruptedException e) { logger.error("产品ID{}加入队列失败",request.getProductId(),e); } } private ArrayBlockingQueue<Request> getQueueByProductId(Integer productId) { RequestQueue requestQueue = RequestQueue.getInstance(); String key = String.valueOf(productId); int hashcode; int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16); //对hashcode取摸 int index = (requestQueue.getQueueSize()-1) & hash; return requestQueue.getQueueByIndex(index); } }
数据更新请求controller
package com.shux.inventory.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.shux.inventory.biz.IRequestAsyncProcessBiz; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; import com.shux.inventory.request.InventoryUpdateDBRequest; import com.shux.inventory.request.Request; import com.shux.utils.other.Response; /** ********************************************** * 描述:提交更新请求 ********************************************** **/ @Controller("/inventory") public class InventoryUpdateDBController { private @Autowired InventoryProductBiz inventoryProductBiz; private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz; @RequestMapping("/updateDBInventoryProduct") @ResponseBody public Response updateDBInventoryProduct(InventoryProduct inventoryProduct){ Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz); requestAsyncProcessBiz.process(request); return new Response(Response.SUCCESS,"更新成功"); } }
数据查询请求controller
package com.shux.inventory.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import com.shux.inventory.biz.IRequestAsyncProcessBiz; import com.shux.inventory.biz.InventoryProductBiz; import com.shux.inventory.entity.InventoryProduct; import com.shux.inventory.request.InventoryQueryCacheRequest; import com.shux.inventory.request.Request; /** ********************************************** * 描述:提交查询请求 * 1、先从缓存中取数据 * 2、如果能从缓存中取到数据,则返回 * 3、如果不能从缓存取到数据,则等待20毫秒,然后再次去数据,直到200毫秒,如果超过200毫秒还不能取到数据,则从数据库中取,并强制刷新缓存数据 ********************************************** **/ @Controller("/inventory") public class InventoryQueryCacheController { private @Autowired InventoryProductBiz inventoryProductBiz; private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz; @RequestMapping("/queryInventoryProduct") public InventoryProduct queryInventoryProduct(Integer productId) { Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false); requestAsyncProcessBiz.process(request);//加入到队列中 long startTime = System.currentTimeMillis(); long allTime = 0L; long endTime = 0L; InventoryProduct inventoryProduct = null; while (true) { if (allTime > 200){//如果超过了200ms,那就直接退出,然后从数据库中查询 break; } try { inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId); if (inventoryProduct != null) { return inventoryProduct; } else { Thread.sleep(20);//如果查询不到就等20毫秒 } endTime = System.currentTimeMillis(); allTime = endTime - startTime; } catch (Exception e) { } } /** * 代码执行到这来,只有以下三种情况 * 1、缓存中本来有数据,由于redis内存满了,redis通过LRU算法清除了缓存,导致数据没有了 * 2、由于之前数据库查询比较慢或者内存太小处理不过来队列中的数据,导致队列里挤压了很多的数据,所以一直没有从数据库中获取数据然后插入到缓存中 * 3、数据库中根本没有这样的数据,这种情况叫数据穿透,一旦别人知道这个商品没有,如果一直执行查询,就会一直查询数据库,如果过多,那么有可能会导致数据库瘫痪 */ inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId); if (inventoryProduct != null) { Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true); requestAsyncProcessBiz.process(forcRrequest);//这个时候需要强制刷新数据库,使缓存中有数据 return inventoryProduct; } return null; } }
数据库与REDIS缓存数据一致性解决方案相关推荐
- redis专题:数据库和redis缓存一致性解决方案
文章目录 1.双写模式 2.失效模式 3.缓存一致性解决方案 redis缓存和数据库都保存了数据信息,当我们更新了数据库的数据时,应该如何保证redis和数据库的数据同步呢?当前比较常用的是双写模式和 ...
- redis系列之数据库与缓存数据一致性解决方案
redis系列之数据库与缓存数据一致性解决方案 参考文章: (1)redis系列之数据库与缓存数据一致性解决方案 (2)https://www.cnblogs.com/jiawen010/p/1215 ...
- redis系列之数据库与缓存数据一致性解决方案(简单易懂)
数据库与缓存读写模式策略 写完数据库后是否需要马上更新缓存还是直接删除缓存? (1).如果写数据库的值与更新到缓存值是一样的,不需要经过任何的计算,可以马上更新缓存,但是如果对于那种写数据频繁而读数据 ...
- Redis缓存击穿解决方案之互斥锁
一.缓存击穿 缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库造成巨大的冲击. --引用哔哩哔哩UP主"黑马程序员 ...
- redis做mysql缓存的优点_面试官:如何保障数据库和redis缓存的一致性
随着互联网的高速发展,使用互联网产品的人也越来越多,团队不可避免得也会面对越来越复杂的高并发业务场景(如下图),比如热点视频/文章的观看(读场景),热点视频/文章的评论,点赞等(写场景). 众所周知, ...
- 分布式系统概念 | 分布式锁:数据库、Redis、Zookeeper解决方案
文章目录 分布式锁 数据库 唯一索引 Redis SETNX.EXPIRE RedLock算法 Zookeeper 实现原理 羊群效应 改进方法 总结 分布式锁 随着互联网技术的不断发展.数据量的大幅 ...
- redis缓存穿透-解决方案
上面的解决方案个人觉得时有误的,因为就算缓存了value的null值,后面的接口请求还是会判断走数据库,所以看解决方案二 解决方案二: https://blog.csdn.net/muyi_amen/ ...
- Redis缓存穿透解决方案
先来说下什么是缓存穿透 缓存穿透是指查询一个根本不存在的数据,缓存层中和存储层中都不会命中,通常出于容错的考虑,如果从存储层查不到数据则不会写入数据层.这样将会导致数据的每次请求都要到存储层去查询,给 ...
- REDIS11_缓存和数据库一致性如何保证、解决方案、提供Canel解决数据一致性问题
文章目录 ①. 缓存和数据库双写一致保证 ②. 缓存数据一致性-解决方案 ③. 缓存数据一致性-解决-Canal ①. 缓存和数据库双写一致保证 ①. 只要用缓存,就可能会涉及到缓存与数据库双存储双写 ...
- Redis 缓存穿透、雪崩、缓存数据库不一致、持久化方式、分布式锁、过期策略
1. Redis 缓存穿透 1.1 Redis 缓存穿透概念 访问了不存在的 key,缓存未命中,请求会穿透到 DB,量大时可能会对 DB 造成压力导致服务异常. 由于不恰当的业务功能实现,或者外部恶 ...
最新文章
- 锁定文件失败 打不开磁盘“D:\vms\S1\CentOS 64 位.vmdk”或它所依赖的某个快照磁盘(强制关机后引起的问题)...
- ASP.NET MVC中常用的ActionResult类型
- 属性文法和语法制导翻译
- 静态代码检查工具简介
- VSCode - Beautify 插件配置
- SpringBoot启动源码探究---getRunListener()
- python数据分析-Python数据分析从小白到高手的几个步骤
- django+xadmin在线教育平台慕学网(一)
- IPTV机顶盒刷机过程--山东电信【天邑TY608】
- 施一公 谈英文论文写作
- top--查看服务器CPU及内存使用情况
- 惨!美团程序员的年终奖金可能没了
- 使用开源的协同办公OA项目,实现规范高效的公文管理
- elementui表格隔行换色
- 怎么用python画出Excel表格数据的残差图
- armbian不拔掉TF卡使用usb移动硬盘启动
- MySQL基础知识——ALTER TABLE
- 缓存服务器 MemcachedRedis
- Alexa 又收拾中国网站
- android+插入一条短信,android添加一条短信记录