目录

  • **一.初始Redis**
    • **1.1SQL 和 NoSql的区别**
      • **1.1.1结构化和非结构化**
      • **1.1.2关联和非关联**
      • **1.1.3查询方式**
      • **1.1.4 事务**
      • **1.1.5总结**
    • **1.2 认识Redis**
    • **1.3 Redis安装启动**
      • **默认启动:**
      • **后台启动:**
      • **开机自启:**
    • **1.4 Redis客户端**
      • **1.4.1.Redis命令行客户端**
      • **1.4.2.图形化桌面客户端**
  • **二.Redis命令**
    • **1.String**
    • **2.Hash**
    • **3.List**
    • **4.Set**
    • **5.SortedSet**
  • **三.Redis的java客户端**
    • **1.Jedis**
    • **2.SpringDataRedis**
  • **四.Redis实战**
    • **一.短信登录**
      • **1.1导入hmdp项目**
      • **1.2session实现短信登录**
      • **1.3集群的session共享问题**
      • **1.4基于Redis实现共享session登录**
    • **二.商户查询缓存**
      • **2.1什么是缓存**
      • **2.2添加商户缓存**
      • **2.3 添加商户类型缓存**
      • **2.4缓存更新策略**
      • **2.5实现商铺缓存和数据库的双写一致**
      • **2.6缓存穿透的解决思路**
      • **2.7解决商铺查询的缓存问题**
      • **2.8缓存雪崩**
      • **2.9缓存击穿**
      • **基于互斥锁解决缓存击穿问题**
      • **基于逻辑过期解决缓存击穿问题**
      • **2.10 封装Redis工具类**
    • **三.优惠卷秒杀**
      • **3.1全局唯一ID**
      • **3.2添加优惠卷**
      • **3.3实现秒杀下单**
      • **3.4超卖现象**
      • **3.5超卖问题分析**
      • **3.6乐观锁解决超卖问题**
      • **3.7实现一人一单功能**
      • **3.8集群下的线程并发安全问题**
      • **3.9分布式锁-原理**
      • **3.10分布式锁-实现思路**
      • **3.11实现Redis分布式锁版本1**
      • **3.12 Redis分布式锁误删问题**
      • 3.13解决Redis分布式锁误删问题
      • 3.14 分布式锁的原子性问题
      • 3.15 Lua脚本解决分布式锁的原子性问题
      • 3.16java调用lua脚本改造分布式锁
      • 3.17分布式锁-Redission简介
      • 3.18 Redisson快速入门
      • 3.19 Redisson的可重入锁原理
      • 3.20 Redisson的锁重试和WatchDog机制
      • 3.21 Redisson的multiiLock原理
      • image-20221117183929926
      • 3.22 Redis优化秒杀
      • 3.23 基于Redis完成秒杀资格判断
      • 3.24 基于阻塞队列实现秒杀异步下单
      • 3.25 认识消息队列
      • 3.26 基于list实现的消息队列
      • 3.27 基于PubSub实现的消息队列
      • 3.28 基于Stream实现的消息队列
      • 3.29 Stream的消费者组模式
      • 3.30 基于stream消息队列实现异步秒杀下单
    • 四. 达人探店
      • 4.1 发布探店笔记
      • 4.2 查看探店笔记
      • 4.3 点赞功能
      • 4.4 点赞排行榜
      • 4.5关注和取关
      • 4.6共同关注
      • 4.7 Feed流实现方案分析
      • 4.8 推送到粉丝收件箱
      • 4.9 滚动分页查询收件箱
      • 4.10 实现滚动分页查询
      • 4.11 附近商铺-GEO数据结构的基本使用
      • 4.12附近商铺-导入店铺数据到GEO
      • 4.13附近商铺-实现附近商铺功能
      • 4.15用户签到-BitMap功能演示
      • 4.16用户签到-实现签到功能
      • 4.17 用户签到-统计连续签到
      • 4.18 UV统计-HyperLogLog的用法
      • 4.19 UV统计-测试百万数据的统计
      • 4.15用户签到-BitMap功能演示
      • 4.16用户签到-实现签到功能
      • 4.17 用户签到-统计连续签到
      • 4.18 UV统计-HyperLogLog的用法
      • 4.19 UV统计-测试百万数据的统计

一.初始Redis

1.1SQL 和 NoSql的区别

1.1.1结构化和非结构化

(1) SQL关系性数据库

传统关系型数据库是结构化数据,每一张表都有严格的约束信息:字段名、字段数据类型、字段约束等等信息,插入的数据必须遵守这些约束

(2) NoSql数据库

NoSql对数据库格式没有严格约束,往往形式松散,自由。

可以是key-value,可以是文档,或者图格式

1.1.2关联和非关联

(1) 关系型数据库

(2) 非关系型数据库

{id: 1,name: "张三",orders: [{id: 1,item: {id: 10, title: "荣耀6", price: 4999}},{id: 2,item: {id: 20, title: "小米11", price: 3999}}]
}

1.1.3查询方式

1.1.4 事务

传统关系型数据库能满足事务ACID的原则 ,而非关系型数据库往往不支持事务,或者不能严格保证ACID的特性,只能实现基本的一致性。

1.1.5总结

1.2 认识Redis

特征:

  • 键值(key-value)型,value支持多种不同数据结构,功能丰富

  • 单线程,每个命令具备原子性

  • 低延迟,速度快(基于内存、IO多路复用、良好的编码)。

  • 支持数据持久化(定期将内存搬运到磁盘)

  • 支持主从集群、分片集群(数据拆分)

  • 支持多语言客户端

Redis的官方网站地址:RedisRedis Redis

1.3 Redis安装启动

Redis是基于C编写,所以需要先安装Redis所需的gcc依赖

yum install -y gcc

如果有了就跳过

安装包上传到usr/local/src

tar -zxvf redis-6.2.6.tar.gz

cd到redis的目录:

cd redis-6.2.6/

编译和运行

make && make install

默认启动:

需要一直挂着页面

redis-server

后台启动:

前提是必须修改配置文件(/usr/local/src/redis-6.2.6/redis.conf)

(1) 先备份一份

cp redis.conf redis.conf.bak

(2) 修改

vi redis.conf

eg:修改密码为1234

(3) 运行

cd /usr/local/src/redis-6.2.6
redis-server redis.conf

(4) 查看是否启动

ps -ef | grep redis

(5) 停止redis -9:强制但是不安全

kill -9 进程号

开机自启:

(1) 新建系统服务文件

vi /etc/systemd/system/redis.service

内容:

[Unit]
Description=redis-server
After=network.target[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
PrivateTmp=true[Install]
WantedBy=multi-user.target

(2) 重新加载系统服务

systemctl daemon-reload

(3)启动

systemctl start redis

(4)查看状态

systemctl status redis

(5) 设置开机自启

systemctl enable redis

1.4 Redis客户端

安装完成Redis,我们就可以操作Redis,实现数据的CRUD了。这需要用到Redis客户端,包括:

  • 命令行客户端
  • 图形化桌面客户端
  • 编程客户端

1.4.1.Redis命令行客户端

Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:

redis-cli [options] [commonds]

redis-cli -h 192.168.200.131 -p 6379 -a 1234

其中常见的options有:

  • -h 127.0.0.1:指定要连接的redis节点的IP地址,默认是127.0.0.1
  • -p 6379:指定要连接的redis节点的端口,默认是6379
  • -a 123321:指定redis的访问密码

其中的commonds就是Redis的操作命令,例如:

  • ping:与redis服务端做心跳测试,服务端正常会返回pong

不指定commond时,会进入redis-cli的交互控制台:

也可以先不写密码,后面来补充!

1.4.2.图形化桌面客户端

GitHub上的大神编写了Redis的图形化桌面客户端,地址:https://github.com/uglide/RedisDesktopManager

不过该仓库提供的是RedisDesktopManager的源码,并未提供windows安装包。

在下面这个仓库可以找到安装包:https://github.com/lework/RedisDesktopManager-Windows/releases

resp.exe

二.Redis命令

expire设置存活周期,ttl查看剩余时间,不设置expire的话ttl为-1

1.String

setex key expireTime value

由于Redis为NoSql,我们不知道value对应的属性的数据类型是什么

如果Value是一个Java对象,例如一个User对象,则可以将对象序列化为JSON字符串后存储:

KEY VALUE
heima:user:1 {“id”:1, “name”: “Jack”, “age”: 21}
heima:product:1 {“id”:1, “name”: “小米11”, “price”: 4999}

一旦我们向redis采用这样的方式存储,那么在可视化界面中,redis会以层级结构来进行存储,形成类似于这样的结构,更加方便Redis获取数据**

2.Hash

本身Redis就是一个key-value的结构,而hash的value还是一个key-value的结构

支持对单个值进行修改

3.List

从左侧推

lpush users 1 2 3

从右侧推

从左侧右侧弹出

lpop users 1
rpop users 1

阻塞弹出 blpop/brpop key second

blpop user1 100

4.Set

sadd s1 a b c

删除元素

 srem s1 a

查看元素数量

scard s1

多个集合之间的操作

sinsert s1 s2 //s1和s2的交集
sdiff s1 s2//s1和s2的差集
sunion s1 s2 //s1和s2的并集

sadd zs lisi wnagwu zhaoliu
sadd ls wangwu mazi ergouscard zs
sinter zs ls
adiff zs ls
sunion zs ls
ismember zs lisi
ismember ls zhangsan
srem zs lisismembers zs
smembers ls

5.SortedSet

每个元素都带上分数,所以才能实现排序

zadd stus 85 jack 89 Lucy 82 Rose 95 Tom 78 Jerry 92 Amy 76 Miles
zrem stus Tom
zscore stus Amy
zrank stus Rose //升序 zrevrank stus Rose//降序
zcount stus 0 80
zincrby stus 2 Amy
zrange stus 0 2 //升序 zrevrange stus 0 2 //降序
zrangebyscore stus 0 80

三.Redis的java客户端

1.Jedis

在Redis官网中提供了各种语言的客户端,地址:https://redis.io/docs/clients/

基本用法

(1) 导入依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.8.0</version>
</dependency>

(2) 建立连接

public class JedisTest {private Jedis jedis;@BeforeEachvoid setUp(){//1.建立连接jedis = new Jedis("192.168.200.130",6379);//2.设置密码jedis.auth("1234");//3.选择库jedis.select(0);}@Testvoid testString(){String result = jedis.set("name", "小明");System.out.println("result= " + result);String name = jedis.get("name");System.out.println("name= "+name);}@AfterEachvoid tearDown(){if(jedis!=null){jedis.close();}}}

连接不上/报错 得使用设置密码

config set requirepass 12349

@Test
void testHash(){jedis.hset("user:1","name","jack");jedis.hset("user:1","age","21");Map<String, String> map = jedis.hgetAll("user:1");System.out.println(map);
}

Jedis连接池

建立Factory

public class JedisConnectFactory {private static final JedisPool jedisPool;static{//配置连接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(8);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(0);poolConfig.setMaxWait(Duration.ofMillis(1000));jedisPool = new JedisPool(poolConfig,"192.168.200.130",6379,1000,"1234");}public static Jedis getJedis(){return jedisPool.getResource();}}

代码说明:

  • 1) JedisConnectionFacotry:工厂设计模式是实际开发中非常常用的一种设计模式,我们可以使用工厂,去降低代的耦合,比如Spring中的Bean的创建,就用到了工厂设计模式

  • 2)静态代码块:随着类的加载而加载,确保只能执行一次,我们在加载当前工厂类的时候,就可以执行static的操作完成对 连接池的初始化

  • 3)最后提供返回连接池中连接的方法.

改造原始代码

代码说明:

1.我们在完成了使用工厂设计模式来完成代码的编写后,我们可以通过工厂来获得连接,不用去new对象,减低耦合,且使用的还是连接池对象

2.当我们使用连接池后,当我们关闭连接其实并不是关闭,而是将Jedis归还给连接池

@BeforeEach
void setUp(){//1.建立连接//jedis = new Jedis("192.168.200.130",6379);//2.设置密码//jedis.auth("1234");jedis = JedisConnectFactory.getJedis();//3.选择库jedis.select(0);
}····@AfterEachvoid tearDown() {if (jedis != null) {jedis.close();}}

2.SpringDataRedis

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https://spring.io/projects/spring-data-redis

  • 提供了对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群
  • 支持基于Lettuce的响应式编程(Lettuce之前实在es那里有)
  • 支持基于JDK.JSON.字符串.Spring对象的数据序列化及反序列化
  • 支持基于Redis的JDKCollection实现

SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:

(1) 导入依赖

        <!--Redis依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--连接池依赖--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>

(2) 配置文件

spring:redis:host: 192.168.200.130port: 6379password: 1234database: 0lettuce:pool:max-active: 8 #最大连接数max-idle: 8 #最大空闲连接min-idle: 0 #最小空闲连接max-wait: 100 #连接等待时间

(3) 自定义的RedisTemplate,注入到IOC中

先去(4) 写测试类测试一下就会发现问题

由于我们使用的是String-Object的话,你value的值如果传入一个字符串或者其他类型的话,并没有序列化,就会造成以下情况:get key

这是由于key和value会被当成对象,被redis底层的默认序列化方法:jdk序列化工具jdkSerializationRedisSerialliszer

而它采用的是objectOutputStream(把java对象转成字节)

先把这个key删除掉,用del

写一个RedisConfig,写bean: 拥有我们的序列化后的redisTemplate

@Configurationpublic class RedisConfig {@Beanpublic RedisTemplate<String,Object> redisTemplate( RedisConnectionFactory factory){RedisTemplate<String,Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 设置key的序列化方式template.setKeySerializer(RedisSerializer.string());// 设置value的序列化方式template.setValueSerializer(RedisSerializer.json());// 设置hash的key的序列化方式template.setHashKeySerializer(RedisSerializer.string());// 设置hash的value的序列化方式template.setHashValueSerializer(RedisSerializer.json());template.afterPropertiesSet();return template;}
}

(4) 测试类

@SpringBootTest
public class RedisTest {@Autowiredprivate RedisTemplate redisTemplate;@Testpublic void RedistestString(){// 写入一条String数据redisTemplate.opsForValue().set("name", "wxp");// 获取string数据Object name = redisTemplate.opsForValue().get("name");System.out.println("name = " + name);}
}

接下里试试实体类的序列化

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {private String name;private Integer age;
}
@Test
public void testSaveUser(){User user = new User();user.setName("阿廖莎");user.setAge(21);redisTemplate.opsForValue().set("user:100",user);System.out.println(redisTemplate.opsForValue().get("user:100"));
}

控制台打印:

.

.

发现这个json对象会将类的class写入,这个是为这个class进行反序列化的,但是会存在内存开销

现在只需要用成StringRedisTemplate即可

@Autowired
private StringRedisTemplate stringRedisTemplate;//JSON工具类ObjectMapper,或者可以用fastjson:JSON.toJSONString(), JSON.parseObject()
private static final ObjectMapper mapper = new ObjectMapper();@Testpublic void testSaveUser() throws JsonProcessingException {User user = new User();user.setName("阿廖莎");user.setAge(21);//手动序列化String json = mapper.writeValueAsString(user);stringRedisTemplate.opsForValue().set("user:100",json);//反序列化User user1 = mapper.readValue(stringRedisTemplate.opsForValue().get("user:100"), User.class);System.out.println("user1 = " + user1);}

.

.

接下来测试下哈希结构

string:

@Test
void testHash(){stringRedisTemplate.opsForHash().put("user:200","name","张三");stringRedisTemplate.opsForHash().put("user:200","age","21");Object name = stringRedisTemplate.opsForHash().get("user:200", "name");System.out.println("属性name: " + name);Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries("user:200");//获取全部的hashkey-hashvalueSystem.out.println("所有属性: " + entries);
}

.

.

实战篇====

四.Redis实战

一.短信登录

1.1导入hmdp项目

(1) 数据库

少数据的话,用navicat创建数据库hmdp,用idea连接mysql执行sql,用的还是本地的数据库

(2) 导入半成品hm-dianping

(3) 导入前端的话就导入nginx的文件夹,内部有hmdp的前端资源,我们导入后启动它即可

nginx目录下cmd输入start nginx.exe

访问:http://localhost:8080/

1.2session实现短信登录

(1) userService创建sendCode接口,实现它

@Override
public Result sendCode(String phone, HttpSession session) {//1.校验手机号:利用util下RegexUtils进行正则验证if(RegexUtils.isPhoneInvalid(phone)){return Result.fail("手机号格式不正确!");}//2.生成验证码:导入hutool依赖,内有RandomUtilString code = RandomUtil.randomNumbers(6);//3.保存验证码到sessionsession.setAttribute("code",code);//4.发送验证码log.info("验证码为: " + code);log.debug("发送短信验证码成功!");return Result.ok();}

(2) login功能

这个老师的写法也是存在问题:假如我先用自己的获取验证码,再换别人的手机号用我的验证码登录,也可以登录

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {//1.校验手机号String phone = loginForm.getPhone();if(RegexUtils.isPhoneInvalid(phone)){return Result.fail("手机号格式错误!");}//2.校验验证码Object cacheCode = session.getAttribute("code");String code = loginForm.getCode();if(code==null||!cacheCode.toString().equals(code)){//3.不一致,报错return Result.fail("验证码错误!");}//4.一致,根据手机号查询用户(需要写对应的单表查询方法:select * from tb_user where phone = #{phone})User user = query().eq("phone", phone).one();if(user==null){//5.注册用户user.setPhone(phone);user.setNickName("user_"+RandomUtil.randomString(10));//保存用户save(user);}//6.存入sessionsession.setAttribute("user",user);return Result.ok();
}

(3) 登录验证功能

其实就是携带登录配置,这里用的是cookie,然后用拦截器进行拦截然后验证,但是一般我们用jwt令牌放入localstroagecookie

上面我们不能直接把user存入,而是要把保留一些不隐私的信息(UserDto)然后传入Session

a.

//6.存入session
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));

b.拦截器

public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取sessionHttpSession session = request.getSession();//2.获取session中的用户Object user = session.getAttribute("user");//3.判断用户是否存在. 不存在:拦截;存在:放入ThreadLocal,放行(写了ThreadLocal的封装工具类UserHolder)if(user==null){response.setStatus(401);response.getWriter().write("用户未登录!");return false;}UserHolder.saveUser((UserDTO) user);return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {//移除用户UserHolder.removeUser();}
}

c.在MvcConfig内添加上我们的拦截器

@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**","/voucher/**");}
}

1.3集群的session共享问题

多台Tomcat不共享session存储空间,当请求切换到不同的tomcat服务时导致数据丢失的问题

所以我们把数据存入Redis,集群的Redis可以替代session

1.4基于Redis实现共享session登录

我们应该选择String类型存验证码即可,value:验证码,但是key要区分开来

选择Hash存储用户信息,因为每个字段独立,比较好去DRUD,内存占用少,key用token即可(随机字符串)

之前的session的话,tomcat会自动把session的Id存入Cookie,每次请求都会携带Cookie,所以我们需要手动把token返回给客户端,每次请求客户端都会携带着token

基于上面的来进行修改

RedisConstants工具类存储key常量

public class RedisConstants {public static final String LOGIN_CODE_KEY = "login:code:";public static final Long LOGIN_CODE_TTL = 2L;public static final String LOGIN_USER_KEY = "login:token:";public static final Long LOGIN_USER_TTL = 36000L;public static final Long CACHE_NULL_TTL = 2L;public static final Long CACHE_SHOP_TTL = 30L;public static final String CACHE_SHOP_KEY = "cache:shop:";public static final String LOCK_SHOP_KEY = "lock:shop:";public static final Long LOCK_SHOP_TTL = 10L;public static final String SECKILL_STOCK_KEY = "seckill:stock:";public static final String BLOG_LIKED_KEY = "blog:liked:";public static final String FEED_KEY = "feed:";public static final String SHOP_GEO_KEY = "shop:geo:";public static final String USER_SIGN_KEY = "sign:";
}

sendCode做以下修改

//3.保存验证码到Redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY +phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//有效期2mins

login

UserServiceImpl的sendCode和Login

@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result sendCode(String phone, HttpSession session) {//1.校验手机号:利用util下RegexUtils进行正则验证if(RegexUtils.isPhoneInvalid(phone)){return Result.fail("手机号格式不正确!");}//2.生成验证码:导入hutool依赖,内有RandomUtilString code = RandomUtil.randomNumbers(6);//3.保存验证码到RedisstringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY +phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//有效期2mins//4.发送验证码log.info("验证码为: " + code);log.debug("发送短信验证码成功!");return Result.ok();}@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {//1.校验手机号String phone = loginForm.getPhone();if(RegexUtils.isPhoneInvalid(phone)){return Result.fail("手机号格式错误!");}//2.从Redis中获取验证码String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);String code = loginForm.getCode();if(cacheCode==null||!cacheCode.equals(code)){//3.不一致,报错return Result.fail("验证码错误!");}//4.一致,根据手机号查询用户(需要写对应的单表查询方法:select * from tb_user where phone = #{phone})User user = query().eq("phone", phone).one();if(user==null){//5.注册用户User newUser = new User();newUser.setPhone(phone);newUser.setNickName("user_"+RandomUtil.randomString(10));save(newUser);user = newUser;}//6.保存用户到Redis//(1)生成tokenString token = UUID.randomUUID().toString(true);//hutools//(2)User转为HashMap存储UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);HashMap<Object, Object> userMap = new HashMap<>();userMap.put("id", userDTO.getId().toString());userMap.put("nickName", userDTO.getNickName());userMap.put("icon", userDTO.getIcon());//(3)存储到RedisString tokenKey = LOGIN_USER_KEY + token;stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);//(4) 设置有效期stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);return Result.ok(token);}
}

MvcConfig注入stringRedisTemplate,然后传给LoginInterceptor,因为LoginInterceptor不是bean不能用spring注入其他bean

@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor(stringRedisTemplate)).excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**","/voucher/**");}
}

LoginInterceptor

public class LoginInterceptor implements HandlerInterceptor{private final StringRedisTemplate stringRedisTemplate;public LoginInterceptor(StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取请求头中的tokenString token = request.getHeader("authorization");if (StrUtil.isBlank(token)){//不存在,拦截 设置响应状态吗为401(未授权)response.setStatus(401);return false;}//2.基于token获取redis中用户String key=RedisConstants.LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);//3.判断用户是否存在if (userMap.isEmpty()){//4.不存在则拦截,设置响应状态吗为401(未授权)response.setStatus(401);return false;}//5.将查询到的Hash数据转化为UserDTO对象UserDTO userDTO=new UserDTO();BeanUtil.fillBeanWithMap(userMap,userDTO, false);//6.保存用户信息到ThreadLocalUserHolder.saveUser(userDTO);//7.更新token的有效时间,只要用户还在访问我们就需要更新token的存活时间stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.SECONDS);//8.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {//销毁,以免内存泄漏UserHolder.removeUser();}
}

用户请求进去拦截器,我们试着去获取请求头内的token,根据token去查询用户信息,判断是否拦截,保存在ThreadLocal,刷新token的有效期

但是,这个拦截器是拦截需要登录之后才需要进行请求的路径,那我如果一直在访问的是不需要拦截的页面的话,我还是会过期?这就不合理。所以我们需要在这个拦截器前面再加个拦截器,然后在新增拦截器上进行保存ThreadLocal和刷新有效期,不理解其他

其实就是对之前的拦截器进行功能拆分

MvcConfig

@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**","/voucher/**").order(1);//RefreshTokenInterceptor 先于 LoginInterceptor 执行registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);//默认拦截所有请求}}

RefreshTokenInterceptor

public class RefreshTokenInterceptor implements HandlerInterceptor{private final StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取请求头中的tokenString token = request.getHeader("authorization");if (StrUtil.isBlank(token)){return true;}//2.基于token获取redis中用户String key=RedisConstants.LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);//3.判断用户是否存在if (userMap.isEmpty()){return true;}//5.将查询到的Hash数据转化为UserDTO对象UserDTO userDTO=new UserDTO();BeanUtil.fillBeanWithMap(userMap,userDTO, false);//6.保存用户信息到ThreadLocalUserHolder.saveUser(userDTO);//7.更新token的有效时间,只要用户还在访问我们就需要更新token的存活时间stringRedisTemplate.expire(key, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);//8.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {//销毁,以免内存泄漏UserHolder.removeUser();}
}

LoginInterceptor

public class LoginInterceptor implements HandlerInterceptor{@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//判断是否需要拦截if(UserHolder.getUser()==null){response.setStatus(401);response.getWriter().write("用户未登录!");return false;}return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {//销毁,以免内存泄漏UserHolder.removeUser();}
}

刷新以下首页

二.商户查询缓存

2.1什么是缓存

数据库发生改变,Redis还没及时更新,那么从缓存内取到的数据就会出错,就是数据一致性问题

2.2添加商户缓存

我们通过这个接口查询到的数据有很多,我们希望在此做个Redis缓存数据,提供查询速度

(1)service定义方法,实现它

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result querygetById(Long id) {//1.从Redis内查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);if(StrUtil.isNotBlank(shopJson)){//手动反序列化Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}//2.不存在就根据id查询数据库Shop shop = getById(id);if(shop==null){return Result.fail("商户不存在!");}//3.数据库数据写入Redis//手动序列化String shopStr = JSONUtil.toJsonStr(shop);stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);return Result.ok(shop);}
}

controller

@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {return shopService.querygetById(id);
}

可以通过调试查看是否是从Redis内拿出来

时间变快了很多

2.3 添加商户类型缓存

作业:

@Service
public class ShopTypeServiceImpl extends ServiceImpl<ShopTypeMapper, ShopType> implements IShopTypeService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result queryList() {//1.从Redis中查询String key = CACHE_SHOPTYPE_KEY;List<String> list = stringRedisTemplate.opsForList().range(key, 0, -1);if(!list.isEmpty()){//手动反序列化List<ShopType> typeList = new ArrayList<>();for (String s : list) {ShopType shopType = JSONUtil.toBean(s, ShopType.class);typeList.add(shopType);}return Result.ok(typeList);}//2.从数据库内查询List<ShopType> typeList = query().orderByAsc("sort").list();if(typeList.isEmpty()){return Result.fail("不存在该分类!");}//序列化for (ShopType shopType : typeList) {String s = JSONUtil.toJsonStr(shopType);list.add(s);}//3.存入缓存stringRedisTemplate.opsForList().rightPushAll(key,list);stringRedisTemplate.expire(key,CACHE_SHOPTYPE_TTL,TimeUnit.MINUTES);return Result.ok(list);}
}

2.4缓存更新策略

由于数据库的操作速度比操作缓存的速度慢,所以操作缓存的时候极低概率会被操作数据库的线程抢去cpu,反过来就会出现线程安全问题,所以采用先更新数据库再删除缓存

2.5实现商铺缓存和数据库的双写一致

ShopServiceImpl

@Override
public Result update(Shop shop) {if(shop.getId()==null){return Result.fail("店铺id不能为空!");}//1.更新数据库updateById(shop);//2.删除缓存String key = CACHE_SHOP_KEY + shop.getId();stringRedisTemplate.delete(key);return Result.ok();
}

ShopController

@PutMapping
public Result updateShop(@RequestBody Shop shop) {// 写入数据库return shopService.update(shop);
}

后面再访问的时候才会重新添加上缓存,这个之前就写过了

重新刷新

2.6缓存穿透的解决思路

避免数据库也查不到,还把null存入缓存,那么以后缓存就永远不生效

2.7解决商铺查询的缓存问题

2.8缓存雪崩

解决方案:

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

2.9缓存击穿

缓存击穿问题,也叫 热点 Key 问题;就是一个被 高并发访问 并且 缓存中业务较复杂的 Key 突然失效,大量的请求在极短的时间内一起请求这个 Key 并且都未命中,无数的请求访问在瞬间打到数据库上,给数据库带来巨大的冲击。

解决方案:

  • 互斥锁:查询缓存未命中,获取互斥锁,获取到互斥锁的才能查询数据库重建缓存,将数据写入缓存中后,释放锁。
  • 逻辑过期:查询缓存,发现逻辑时间已经过期,获取互斥锁,开启新线程;在新线程中查询数据库重建缓存,将数据写入缓存中后,释放锁;在释放锁之前,查询该数据时,都会将过期的数据返回。

解决方案 优点 缺点
互斥锁 没有额外的内存消耗;保证一致性;实现简单 线程需要等待,性能受影响;可能有死锁风险
逻辑过期 线程无需等待,性能较好 有额外内存消耗;不保证一致性;实现复杂

基于互斥锁解决缓存击穿问题

核心:利用 Redis 的 setnx 方法来表示获取锁。该方法的含义是:如果 Redis 中没有这个 Key,则插入成功;如果有这个 Key,则插入失败。通过插入成功或失败来表示是否有线程插入 Key,插入成功的 Key 则认为是获取到锁的线程;释放锁就是将这个 Key 删除,因为删除 Key 以后其他线程才能再执行 setnx 方法。

stenx lock 1

/*** 获取互斥锁*/
private boolean tryLock(String key) {Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "1", TTL_TEN, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);
}/*** 释放互斥锁*/
private void unLock(String key) {redisTemplate.delete(key);
}

一次请求的过程

  1. 请求打进来,先去 Redis 中查,未命中;

  2. 获取互斥锁:将一个 Key 为 LOCK_SHOP_KEY + id 的数据写入 Redis 中,此时其他线程就无法拿到这个 Key,也就无法继续后续操作;

  3. 获取失败就进行休眠,休眠结束后通过递归再次请求;

  4. 获取成功,查询数据库、将需要查询的那个数据写入 Redis;

  5. 最后,删除通过 setnx 创建的那个 Key。

需求:修改根据id查询店铺的业务(互斥锁方式解决)

/**互斥锁实现解决缓存击穿**/
public Shop queryWithMutex(Long id){//1.从Redis内查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);if(StrUtil.isNotBlank(shopJson)){//手动反序列化return JSONUtil.toBean(shopJson, Shop.class);}//如果上面的判断不对,那么就是我们设置的""(有缓存"",证明数据库内肯定是没有的)或者null(没有缓存)//判断命中的是否时空值if(shopJson!=null){//return null;}//a.实现缓存重建//a.1 获取互斥锁String lockKey = LOCK_SHOP_KEY + id;Shop shop = null;try {boolean hasLock = tryLock(lockKey);//a.2 判断是否获取到,获取到:根据id查数据库 获取不到:休眠if(!hasLock){Thread.sleep(50);return queryWithMutex(id);}//2.不存在就根据id查询数据库shop = getById(id);//模拟重建的延时Thread.sleep(200);if(shop==null){stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,"",CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}//3.数据库数据写入Redis//手动序列化String shopStr = JSONUtil.toJsonStr(shop);stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id,shopStr,CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {//释放互斥锁unlock(lockKey);}return shop;
}
@Override
public Result querygetById(Long id) {//缓存穿透//Shop shop = queryWithPassThrough(id);//互斥锁解决缓存击穿Shop shop = queryWithMutex(id);if(shop==null) return Result.fail("店铺不存在!");return Result.ok(shop);
}

利用postman测试多线程:先把这个key的缓存删除

访问1000次,数据库只查询一次,都可以200,说明互斥锁设置后效果成功

基于逻辑过期解决缓存击穿问题

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);//开启10个线程/**逻辑过期实现解决缓存击穿**/
public Shop queryWithLogical(Long id){//1.从Redis内查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);//2.判断是否存在if(StrUtil.isBlank(shopJson)){return null;}//3.命中,需要先把json反序列化为对象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);JSONObject data = (JSONObject) redisData.getData();Shop shop = JSONUtil.toBean(data, Shop.class);//4.判断是否过期LocalDateTime expireTime = redisData.getExpireTime();if(expireTime.isAfter(LocalDateTime.now())){//未过期直接返回return shop;}//5.过期的话需要缓存重建//5.1 获取互斥锁String lockKey = LOCK_SHOP_KEY + id;boolean hasLock = tryLock(lockKey);//5.2判断是否获取到,获取到:根据id查数据库 获取不到:休眠if(hasLock){//成功就开启独立线程,实现缓存重建, 这里的话用线程池CACHE_REBUILD_EXECUTOR.submit(()->{try {//重建缓存this.saveShop2Redis(id,20L);} catch (Exception e) {throw new RuntimeException(e);}finally {//释放锁unlock(lockKey);}});}return shop;
}
/**缓存重建方法**/
public void saveShop2Redis(Long id,Long expireSeconds) throws InterruptedException {//1.查询店铺信息Shop shop = getById(id);Thread.sleep(200);//2.封装逻辑过期时间RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));//3.写入RedisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}

先用saveShop2Redis把热点key提前放入缓存,提升速度(设置了逻辑过期时间)

@Test
void testSaveShop() throws InterruptedException {shopService.saveShop2Redis(1L,10L);
}ja'v

把数据库修改一下

大约200ms后就会进行缓存重建

2.10 封装Redis工具类

方法3:

id2 -> getById(id2) 在java8是可以用 this::getById 代替的
/**解决缓存穿透**/
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallBack,Long time,TimeUnit unit){String key = keyPrefix + id;//1.从Redis内查询商品缓存String json = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);if(StrUtil.isNotBlank(json)){//手动反序列化return JSONUtil.toBean(json, type);}//如果上面的判断不对,那么就是我们设置的""(有缓存"",证明数据库内肯定是没有的)或者null(没有缓存)//判断命中的是否时空值if(json!=null){//return null;}//2.不存在就根据id查询数据库R r = dbFallBack.apply(id);//由于不知道这段逻辑,所以我们需要用户传进来函数逻辑if(r==null){stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}//写入Redisthis.set(key,r,time,unit);return r;
}
@Override
public Result querygetById(Long id) {//解决缓存穿透Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);if(shop==null) return Result.fail("店铺不存在!");return Result.ok(shop);
}

测试:数据库访问一次,传入Redis为“”,多刷后不会查询到数据库

方法4:

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);//10个线程的线程池/**逻辑过期实现解决缓存击穿**/
public <R,ID> R queryWithLogical(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallBack,Long time,TimeUnit unit) {String key = CACHE_SHOP_KEY + id;//1.从Redis内查询商品缓存String shopJson = stringRedisTemplate.opsForValue().get(key);//2.判断是否存在if (StrUtil.isBlank(shopJson)) {return null;}//3.命中,需要先把json反序列化为对象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);JSONObject data = (JSONObject) redisData.getData();R r = JSONUtil.toBean(data, type);//4.判断是否过期LocalDateTime expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {//未过期直接返回return r;}//5.过期的话需要缓存重建//5.1 获取互斥锁String lockKey = LOCK_SHOP_KEY + id;boolean hasLock = tryLock(lockKey);//5.2判断是否获取到,获取到:根据id查数据库 获取不到:休眠if (hasLock) {//成功就开启独立线程,实现缓存重建, 这里的话用线程池CACHE_REBUILD_EXECUTOR.submit(() -> {try {//重建缓存(查数据库+传入Redis)R r1 = dbFallBack.apply(id);this.setWithLogicalExpire(key,r1,time,unit);} catch (Exception e) {throw new RuntimeException(e);} finally {//释放锁unlock(lockKey);}});}return r;
}//设置锁
private boolean tryLock(String key){Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);//如果存在return BooleanUtil.isTrue(flag);}
//修改锁
private void unlock(String key){stringRedisTemplate.delete(key);
}
@Override
public Result querygetById(Long id) {//解决缓存穿透//Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);//互斥锁解决缓存击穿//Shop shop = queryWithMutex(id);//逻辑过期解决缓存击穿Shop shop = cacheClient.queryWithLogical(CACHE_SHOP_KEY,id,Shop.class,this::getById,20L,TimeUnit.SECONDS);//方便测试if(shop==null) return Result.fail("店铺不存在!");return Result.ok(shop);
}

测试:

我们先在Redis内插入逻辑过期时间的key

public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){//逻辑过期时间 redisData有属性expireTime和Data,把value封装到里面就有了逻辑过期时间RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));//写入RedisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}
@Test
void testSaveShop() throws InterruptedException {Shop shop = shopService.getById(1L);cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY,shop,1L, TimeUnit.SECONDS);
}

postman发送多次请求

三.优惠卷秒杀

3.1全局唯一ID

tb_voucher_order表

在分布式系统下生成全局唯一ID的工具,满足 唯一性,高可用,高性能,递增性,安全性

这里利用的是Redis自增id策略

为了增加ID的安全性,不要直接使用Redis自增的数值,而是拼接一些其它信息:

一秒接收2^32,完全够用

ID的组成部分:符号位:1bit,永远为0

时间戳:31bit,以秒为单位,可以使用69年

序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

@Component
public class RedisIdWorker {//到今年第一天的秒数private static final long BEGIN_TIMESTAMP = 1640995200L;//序列号的位数private static final long COUNT_TIMESTAMP = 32L;@Resourceprivate StringRedisTemplate stringRedisTemplate;public long nextId(String keyPrefix){//不同业务//生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timeStamp = nowSecond-BEGIN_TIMESTAMP;//生成序列号String today = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long count = stringRedisTemplate.opsForValue().increment("irc:" + keyPrefix + ":" + today);//拼接return timeStamp << COUNT_TIMESTAMP  |  count ;}public static void main(String[] args) {LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);long second = time.toEpochSecond(ZoneOffset.UTC);System.out.println("second = " + second);}
}

测试:

@Resource
private RedisIdWorker redisIdWorker;private ExecutorService es = Executors.newFixedThreadPool(500);//线程池@Test
void testRedisId() throws InterruptedException {//CountDownLatch大致的原理是将任务切分为N个,让N个子线程执行,并且有一个计数器也设置为N,哪个子线程完成了就N-1CountDownLatch latch = new CountDownLatch(300);Runnable task =()->{for(int i=0;i<100;i++){Long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}latch.countDown();};Long begin = System.currentTimeMillis();for(int i=0;i<300;i++){es.submit(task);}latch.await();Long end = System.currentTimeMillis();System.out.println("time = " + (end - begin));
}

3.2添加优惠卷

类似拓展

{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食","payValue":8000,"actualValue":10000,"type":1,"stock":100,"beginTime":"2022-11-13T10:09:17","endTime":"2022-11-13T22:10:17"
}

点击抢购:

3.3实现秒杀下单

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始,是否结束if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束!");}//3.判断库存是否充足if(voucher.getStock()<=0){return Result.fail("优惠券库存不足!");}//4.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).update();//5.创建订单if(!success){return Result.fail("优惠券库存不足!");}//6.返回订单idVoucherOrder voucherOrder = new VoucherOrder();//6.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金券idvoucherOrder.setVoucherId(voucherId);//7.订单写入数据库save(voucherOrder);//8.返回订单Idreturn Result.ok(orderId);
}

测试:

但是存在很多问题,多线程问题,单用户抢多张文图

3.4超卖现象

测试:

类似postman携带header

3.5超卖问题分析

悲观锁比较简单,直接加锁即可,乐观锁难在判断

第一种:携带另一个变量进行判断

第二种:用数据本身有没有变化进行判断

3.6乐观锁解决超卖问题

//4.扣减库存
boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).eq("stock",voucher.getStock())//where id = ? and stock =? 添加了乐观锁.update();

结果是不会出现线程安全问题,但是优惠券会出现过剩的情况,这就是乐观所的弊端:例如多个线程一开始标识stock为100,然后有个线程把stock减一了,其他那些线程就会返回错误

改进:

不去判断库存是否改变,判断库存>0即可

//4.扣减库存
boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",0)//where id = ? and stock >0 添加了乐观锁.update();

3.7实现一人一单功能

//查询订单看看是否存在
Long userId = UserHolder.getUser().getId();
if (query().eq("user_id",userId).eq("voucher_id",voucherId).count()>0) {return Result.fail("用户已经购买过一次!");
}

但是你要加锁,不让遇到多线程还是会下多个单,所以需要改进:没法判断这个数据是否修改过,因为一开始不存在,不能用乐观锁,所以只能用乐观锁

问题:能否用乐观锁执行?

不能,原因是乐观锁只能操作单个变量,而创建订单需要操作数据库

(1) 对实现类进行修改

@Override
public Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始,是否结束if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束!");}//3.判断库存是否充足if(voucher.getStock()<=0){return Result.fail("优惠券库存不足!");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {//userId一样的持有同一把锁,最好不要放在整个方法上,intern:去字符串常量池找相同字符串IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy}//先获取锁,然后再进入方法,确保我的前一个订单会添加上,能先提交事务再释放锁
}@Transactional
public Result createVoucherOrder(Long voucherId){//查询订单看看是否存在Long userId = UserHolder.getUser().getId();if (query().eq("user_id",userId).eq("voucher_id",voucherId).count()>0) {return Result.fail("用户已经购买过一次!");}//4.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",0)//where id = ? and stock >0 添加了乐观锁.update();//5.创建订单if(!success){return Result.fail("优惠券库存不足!");}//6.返回订单idVoucherOrder voucherOrder = new VoucherOrder();//6.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户id//Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金券idvoucherOrder.setVoucherId(voucherId);//7.订单写入数据库save(voucherOrder);//8.返回订单Idreturn Result.ok(orderId);
}

(2) 导入依赖

<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId>
</dependency>

(3) 启动类上加 @EnableAspectJAutoProxy(exposeProxy = true) 暴露出代理对象

测试:

3.8集群下的线程并发安全问题

Postman发送两个请求

两个id相同的进入到锁里面,证明没有被锁住,那就会生成2个订单

因为相当于我们开了两个jvm,所以有两个锁监视器,这样就出现并行的2个线程执行,这样就出现了线程安全问题

3.9分布式锁-原理

不去使用jvm内部的锁监视器,我们要在外部开一个锁监视器,让它监视所有的线程

多进程可见,互斥,高可用,高性能,安全性,. . . . .

常见的分布式锁

  • MySQL:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
  • Redis:Redis 作为分布式锁比较常见,利用 setnx 方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。
  • Zookeeper:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。
MySQL Redis Zookeeper
互斥 利用 MySQL 本身的互斥锁机制 利用 setnx 互斥命令 利用节点的唯一性和有序性
高可用
高性能 一般 一般
安全性 断开链接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开链接自动释放

3.10分布式锁-实现思路

问题: 如何做到添加锁操作和释放锁操作必须具备同成功同失败?

set操作和expire写在同个语句即可:set lock thread1 ex 10 nx (nx表示存在的时候才可以set)

3.11实现Redis分布式锁版本1

ILock接口

public interface ILock {/*** 尝试获取锁* @Param timeoutSec 锁的持有时间* @return true:获取成功 false:获取失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();}

SimpleRedisLock

public class SimpleRedisLock implements ILock{private String name;private StringRedisTemplate stringRedisTemplate;private static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.name=name;this.stringRedisTemplate=stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {String key = KEY_PREFIX + name;//value的话一般设置为哪个线程持有该锁即可//获取线程标识long threadId = Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId+"", timeoutSec, TimeUnit.SECONDS);//最好不要直接return success,因为我们返回的是boolean类型,现在得到的是Boolean的结果,就会进行自动装箱,如果success为null,就会出现空指针异常return Boolean.TRUE.equals(success);//null的话也是返回false}@Overridepublic void unlock() {String key = KEY_PREFIX + name;stringRedisTemplate.delete(key);}
}

在VoucherOrderServiceImpl内使用:

Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order" + userId,stringRedisTemplate);
//获取锁
boolean hasLock = lock.tryLock(1200);
if(!hasLock){//获取锁失败: return fail 或者 retry 这里业务要求是返回失败return Result.fail("请勿重复下单!");
}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy
} catch (IllegalStateException e) {throw new RuntimeException(e);
} finally {lock.unlock();
}

分布式锁的测试:

证明现在只有一个线程获取锁成功了,8081线程持有了分布式锁,而8082没有,查看结果:

3.12 Redis分布式锁误删问题

解决:在释放锁的时候判断锁的标识是否一致,Redis锁的标识一般是指value的区分,这里一般我们标识的是线程id,比如Thread1,Thread2…

3.13解决Redis分布式锁误删问题

这里的线程标识,我们之前用的是线程id进行标识,但是如果放到集群线程下,多个jvm可能会出现同个线程id的线程,这样会引发线程安全问题,所以这里要用ThreadID + UUID

@Override
public boolean tryLock(long timeoutSec) {String key = KEY_PREFIX + name;//value的话一般设置为哪个线程持有该锁即可//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId, timeoutSec, TimeUnit.SECONDS);//最好不要直接return success,因为我们返回的是boolean类型,现在得到的是Boolean的结果,就会进行自动装箱,如果success为null,就会出现空指针异常return Boolean.TRUE.equals(success);//null的话也是返回false
}@Override
public void unlock() {String key = KEY_PREFIX + name;//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//获取锁中的标识if (stringRedisTemplate.opsForValue().get(key).equals(threadId)) {stringRedisTemplate.delete(key);}
}

测试:线程1获取锁,然后我把锁的标识删除,让线程2获取锁

然后线程1就不会释放锁,线程2后面会释放锁

3.14 分布式锁的原子性问题

判断锁标识和释放锁是两个操作,这里有原子性问题

3.15 Lua脚本解决分布式锁的原子性问题

Lua语言调用Redis:

eg:

-- 锁的key
-- local key = KEYS[1]
-- 当前线程标识
-- local threadId = ARGV[1]
-- 获取锁中的线程标识
local id = redis.call('get',KEYS[1])
-- 比较
if(id == ARGV[1]) then return redis.call('del',KEYS[1])
end return 0

简化:

-- 比较
if(redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1])
end return 0

接下来就是java去调用lua,lua执行Redis

3.16java调用lua脚本改造分布式锁

unlock操作:

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static{//写成静态代码块,类加载就可以完成初始定义,就不用每次释放锁都去加载这个,性能提高咯UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//设置脚本位置UNLOCK_SCRIPT.setResultType(Long.class);
}public void unlock(){//调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}

Lua脚本:

-- 比较
if(redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1])
end return 0

模拟超时释放锁: 8082拿到锁,把锁删除,8081也拿到锁,8082进行unlock的时候不会把8081的锁删除,8081unlock删除自己的锁

执行8082unlock后Redis中lock还在,8081执行unlock后就删除了

3.17分布式锁-Redission简介

3.18 Redisson快速入门

1.导入依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>

2.RedissonConfig,注入一个配置好的RedissonClient

@Configuration
public class RedissionConfig {@Beanpublic RedissonClient redissionClient(){//配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.200.131:6379").setPassword("1234");//创建RedissonClient对象return Redisson.create(config);}
}

3.在我们的业务实现类中使用RedissionClient

@Autowired
private RedissonClient redissonClient;业务方法{//···前面流程Long userId = UserHolder.getUser().getId();//创建锁对象//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean hasLock = lock.tryLock( );if(!hasLock){//获取锁失败: return fail 或者 retry 这里业务要求是返回失败return Result.fail("请勿重复下单!");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象return proxy.createVoucherOrder(voucherId);//默认是this,我们要实现事务需要proxy} catch (IllegalStateException e) {throw new RuntimeException(e);} finally {lock.unlock();}
}

测试单线程:

测试多线程:记得登录验证token是否失效,失效了重新加

3.19 Redisson的可重入锁原理

为什么要引入可重入锁这种机制?

我们知道“对象一把锁,多个对象多把锁”,可重入锁的概念就是:自己可以获取自己的内部锁。

假如有一个线程 T 获得了对象 A 的锁,那么该线程 T 如果在未释放前再次请求该对象的锁时,如果没有可重入锁的机制,是不会获取到锁的,这样的话就会出现死锁的情况

这里除了记录key和ThreaId外,还需有记录重入次数,所以我们需要用hash, 每次的thead一样时,次数+1;

直到次数为0的时候才可以删除锁

nx :判断锁是否存在

ex: 设置过期时间

但是Redis的hash结构没有nx这个命令,所以我们只能先判是否存在(exist),再设置过期时间

@Slf4j
@SpringBootTest
public class RedissonTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEach// 创建 Lock 实例(可重入)void setUp() {lock = redissonClient.getLock("order");}@Testvoid methodOne() throws InterruptedException {boolean isLocked = lock.tryLock();log.info(lock.getName());if (!isLocked) {log.error("Fail To Get Lock~1");return;}try {log.info("Get Lock Successfully~1");methodTwo();} finally {log.info("Release Lock~1");lock.unlock();}}@Testvoid methodTwo() throws InterruptedException {boolean isLocked = lock.tryLock();if (!isLocked) {log.error("Fail To Get Lock!~2");return;}try {log.info("Get Lock Successfully!~2");} finally {log.info("Release Lock!~2");lock.unlock();}}
}

method1取到锁,redis内也存储lock:order:

3.20 Redisson的锁重试和WatchDog机制

锁重试:获取锁失败后重新获取

tryLock(waitTime,leaseTime,TimeUnit)

waitTime:获取锁的等待时长,获取锁失败后等待waitTime再去获取锁

leaseTime: 锁自动失效时间,这里测试锁重试不需要用到

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-89rNAz0V-1669222806273)(http://itsawaysu.oss-cn-shanghai.aliyuncs.com/note/Redisson%23tryLock%20%E9%94%81%E9%87%8D%E8%AF%95.png)]

上面不给leaseTime的话30s其实就是Lock WatchdogTimeOut,给到他,最后给到intrtnslLockleaseTime(用去watchdog续约)

tryLockInnerAsync其实就是执行lua脚本 state就是hashValue,就是次数

后面的ttl的结果只有nil或者30s,nil就是获取锁成功

WatchDog-----超时释放

对抢锁过程进行监听,抢锁完毕后,scheduleExpirationRenewal(threadId) 方法会被调用来对锁的过期时间进行续约,在后台开启一个线程,进行续约逻辑,也就是看门狗线程。

// 续约逻辑
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {... }, 锁失效时间 / 3, TimeUnit.MILLISECONDS);Method(new TimerTask(){}, 参数2, 参数3)

通过参数2、参数3 去描述,什么时候做参数1 的事情。

  • 锁的失效时间为 30s,10s 后这个 TimerTask 就会被触发,于是进行续约,将其续约为 30s;
  • 若操作成功,则递归调用自己,重新设置一个 TimerTask 并且在 10s 后触发;循环往复,不停的续约。

剩下的就是主从一致性问题

3.21 Redisson的multiiLock原理

三个结点:创建对应的三个client

//TODO

3.22 Redis优化秒杀

一人一单

key value1 value2 value3 … (value不重复) ----> set

3.23 基于Redis完成秒杀资格判断

完成需求1,2

1.新增优惠券的同时加入到Redis

测试:

2.编写lua,基于lua完成一人一单

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2用户id
local userId = ARGV[2]-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:'..voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:'..voucherId-- 3.脚本业务
-- 3.1判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0)then return 1
end
-- 3.2判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId)==1)then return 2
end
-- 3.3扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.4下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0

测试第一次领券

第二次领券:

3.24 基于阻塞队列实现秒杀异步下单

之前的代码逻辑:

完善后的完整代码:

     private static final DefaultRedisScript<Long> SECKI_SCRIPT;static{//写成静态代码块,类加载就可以完成初始定义,就不用每次释放锁都去加载这个,性能提高咯SECKI_SCRIPT = new DefaultRedisScript<>();SECKI_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));//设置脚本位置SECKI_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);//创建阻塞队列private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//创建线程池// 判断库存和进行一人一单判断后将信息放入阻塞队列
public Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始,是否结束if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束!");}//3.判断库存是否充足if(voucher.getStock()<=0){return Result.fail("优惠券库存不足!");}//获取当前用户Long userId = UserHolder.getUser().getId();//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKI_SCRIPT,Collections.emptyList(),//空ListvoucherId.toString(), userId.toString());//2.判断结果是否0 是0就是成功,可下单,下单信息保存到阻塞队列if(result!=0){return Result.fail(result==1?"库存不足!":"不能重复下单!");}//生成订单idlong orderId = redisIdWorker.nextId("order");//创建订单数据VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setUserId(userId);voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);//放入阻塞队列orderTasks.add(voucherOrder);//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象//3.返回订单idreturn Result.ok(orderId);}// 类加载后就持续从阻塞队列出取出订单信息@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while(true){try {//1.获取订单中的队列消息VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);//2.创建订单} catch (Exception e) {log.error("处理订单异常:",e);}}}}//异步下单
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();//由于多线程,所以不能直接去ThreadLocal取//创建锁对象//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean hasLock = lock.tryLock( );if(!hasLock){//获取锁失败log.error("不允许重复下单!");return;}try {//代理对象改成全局变量proxy.createVoucherOrder(voucherOrder);//默认是this,我们要实现事务需要proxy} catch (IllegalStateException e) {throw new RuntimeException(e);} finally {lock.unlock();}}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder){//查询订单看看是否存在Long userId = UserHolder.getUser().getId();if (query().eq("user_id",userId).eq("voucher_id", voucherOrder.getUserId()).count()>0) {log.error("用户已经购买过一次!");return;}//4.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)//where id = ? and stock >0 添加了乐观锁.update();if(!success){log.error("优惠券库存不足!");return;}//7.订单写入数据库save(voucherOrder);}

3.25 认识消息队列

上一节我们基于jvm的阻塞队列进行秒杀存在2个问题:

  1. jvm的内存限制问题

  2. 数据安全问题:jvm的内存数据没有持久化,每当服务器重启或者宕机或者从阻塞队列取的时候遇到异常,数据都会丢失

    解决方法: 消息队列

Redis提供了三种不同方式来实现消息队列

1.list结构:模拟消息队列

2.Pubsub:基本的点对点模型

3.Stream :比较完善的消息队列模型

3.26 基于list实现的消息队列

xshell开两个一样的会话

3.27 基于PubSub实现的消息队列

xshell开三个一样的会话

两个消费者:

生产者发布消息:

没人接收你的消息,消息就没了

3.28 基于Stream实现的消息队列


//获取最新消息
xread count 1 streams s1 $
//阻塞获取最新消息
xread count 1 block 0 streams s1 $ //block后面的数是阻塞毫秒数,0的话是永久阻塞

读最新数据会出现漏读现象:一下子发了5条最新消息,只读一条,其他4条漏读

3.29 Stream的消费者组模式

解决数据漏读的问题

不用自己去创建消费者,监听消息的时候然后发现无该消费者,则会自动创建

创建的时候ID注意如果想要之前的数据就从0开始,不想要就从$开始

3.30 基于stream消息队列实现异步秒杀下单

1.控制台stream类型创建消息队列

xgroup create stream.orders g1 0 mkstream

2.1 修改Lua脚本

-- 1.参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2用户id
local userId = ARGV[2]
-- 1.3订单Id
local orderId = ARGV[3] -- 新增-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:'..voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:'..voucherId-- 3.脚本业务
-- 3.1判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0)then return 1
end
-- 3.2判断用户是否下单 sismember orderKey userId
if(redis.call('sismember',orderKey,userId)==1)then return 2
end
-- 3.3扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
-- 3.4下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)--4.发送消息到消息队列, xadd stream.order * k1 v1 k2 v2
redis.call('xadd','stream.order','*','userId',userId,'voucherId',voucherId,'id',orderId) --新增
return 0

2.2 修改执行lua脚本

private IVoucherOrderService proxy;
public Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始,是否结束if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束!");}//3.判断库存是否充足if(voucher.getStock()<=0){return Result.fail("优惠券库存不足!");}//获取当前用户Long userId = UserHolder.getUser().getId();//生成订单idlong orderId = redisIdWorker.nextId("order");//1.执行Lua脚本Long result = stringRedisTemplate.execute(SECKI_SCRIPT,Collections.emptyList(),//空ListvoucherId.toString(), userId.toString(),String.valueOf(orderId));//2.判断结果是否0 是0就是成功,可下单,下单信息保存到阻塞队列if(result!=0){return Result.fail(result==1?"库存不足!":"不能重复下单!");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//获得代理对象//3.返回订单idreturn Result.ok(orderId);}

3.获取消息队列中的消息

private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while(true){try {//1.获取消息队列中的消息 xreadgroup group g1 c1 count 1 block 2000 streams stream.orders >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));//2.判断获取是否成功//3.失败就再循环if(list==null||list.isEmpty()){continue;}//4.成功就创建订单且ACK确认//解析消息MapRecord<String, Object, Object> record = list.get(0);//消息id,key,value//取出每个消息Map<Object, Object> values = record.getValue();//转为VoucherOrder实体类VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//创建订单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge("stream.orders","g1",record.getId());} catch (Exception e) {log.error("处理订单异常:",e);//5.出现异常,从pendingList中取出数据后重新操作handlePendingList();}}}
}private void handlePendingList() {while (true) {try {// 1. 获取 pending-list 中的订单信息// XREAD GROUP orderGroup consumerOne COUNT 1 STREAM stream.orders 0List<MapRecord<String, Object, Object>> readingList = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2. 判断消息是否获取成功if (readingList.isEmpty() || readingList == null) {// 获取失败 pending-list 中没有异常消息,结束循环break;}// 3. 解析消息中的订单信息并下单MapRecord<String, Object, Object> record = readingList.get(0);Map<Object, Object> recordValue = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(recordValue, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);// 4. XACKstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("订单处理异常(pending-list)", e);try {// 稍微休眠一下再进行循环Thread.sleep(10);} catch (Exception ex) {ex.printStackTrace();}}}
}

4.加锁,创建订单

private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();//由于多线程,所以不能直接去ThreadLocal取//创建锁对象//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean hasLock = lock.tryLock( );if(!hasLock){//获取锁失败log.error("不允许重复下单!");return;}try {//代理对象改成全局变量proxy.createVoucherOrder(voucherOrder);//默认是this,我们要实现事务需要proxy} catch (IllegalStateException e) {throw new RuntimeException(e);} finally {lock.unlock();}}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){//查询订单看看是否存在Long userId = UserHolder.getUser().getId();if (query().eq("user_id",userId).eq("voucher_id", voucherOrder.getUserId()).count()>0) {log.error("用户已经购买过一次!");return;}//4.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)//where id = ? and stock >0 添加了乐观锁.update();if(!success){log.error("优惠券库存不足!");return;}//7.订单写入数据库save(voucherOrder);
}

四. 达人探店

4.1 发布探店笔记

@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {@PostMapping("blog")public Result uploadImage(@RequestParam("file") MultipartFile image) {try {// 获取原始文件名称String originalFilename = image.getOriginalFilename();// 生成新文件名String fileName = createNewFileName(originalFilename);// 保存文件image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));// 返回结果log.debug("文件上传成功,{}", fileName);return Result.ok(fileName);} catch (IOException e) {throw new RuntimeException("文件上传失败", e);}}@GetMapping("/blog/delete")public Result deleteBlogImg(@RequestParam("name") String filename) {File file = new File(SystemConstants.IMAGE_UPLOAD_DIR, filename);if (file.isDirectory()) {return Result.fail("错误的文件名称");}FileUtil.del(file);return Result.ok();}private String createNewFileName(String originalFilename) {// 获取后缀String suffix = StrUtil.subAfter(originalFilename, ".", true);// 生成目录String name = UUID.randomUUID().toString();int hash = name.hashCode();int d1 = hash & 0xF;int d2 = (hash >> 4) & 0xF;// 判断目录是否存在File dir = new File(SystemConstants.IMAGE_UPLOAD_DIR, StrUtil.format("/blogs/{}/{}", d1, d2));if (!dir.exists()) {dir.mkdirs();}// 生成文件名return StrUtil.format("/blogs/{}/{}/{}.{}", d1, d2, name, suffix);}
}

4.2 查看探店笔记

@Override
public Result queryBlogById(Long id) {//1.查询blogBlog blog = getById(id);if(blog==null){return Result.fail("笔记不存在!");}//2.查询blog相关用户queryBlogUser(blog);return Result.ok(blog);
}private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){return blogService.queryBlogById(id);
}

4.3 点赞功能

1.Blog类

/*** 是否点赞过了*/
@TableField(exist = false)
private Boolean isLike;
@Override
public Result likeBlog(Long id) {//1.判断当前用户是否已点赞Long userId = UserHolder.getUser().getId();String key = BLOG_LIKED_KEY + id;Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());if(BooleanUtil.isFalse(isMember)){//2.未点赞:数据库赞+1boolean isSuccess = update().setSql("liked = liked +  1").eq("id", id).update();//3.用户信息保存到Redis的点赞setif(isSuccess){stringRedisTemplate.opsForSet().add(key,userId.toString());}}else{//4.已点赞:数据库-1boolean isSuccess = update().setSql("liked = liked -  1").eq("id", id).update();//5.把用户信息从Redis的点赞set移除if(isSuccess){stringRedisTemplate.opsForSet().remove(key,userId.toString());}}return Result.ok();
}
  1. 当我们点开一篇blog的时候就需要被看到是否点赞过,这就要求我们改一下queryBlogById(id)咯,当然isLikeBlog(blog)也是需要

    @Override
    public Result queryBlogById(Long id) {//1.查询blogBlog blog = getById(id);if(blog==null){return Result.fail("笔记不存在!");}//2.查询blog相关用户queryBlogUser(blog);//3.查询用户是否点过赞,其实就是给blog的isLike添加值isLikeBlog(blog);return Result.ok(blog);
    }private void isLikeBlog(Blog blog) {Long userId = UserHolder.getUser().getId();String key = BLOG_LIKED_KEY + id;Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));
    }@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog->{this.queryBlogUser(blog);this.isLikeBlog(blog);});//就是用blog遍历的return Result.ok(records);}
    

测试:

4.4 点赞排行榜

Redis的set存储的like无序,所以需要用到sortedset

id查和分页查blog的话内部也有查看是否点赞,这里的通用方法需要修改

进行top5的点赞用户查询:

@Override
public Result queryBlogLikes(Long id) {//1.查询top5的点赞用户   zrange key 0 4String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(top5==null||top5.isEmpty()){return Result.ok(Collections.emptyList());}//2.解析出useId,然后根据UserId查询到user,再转化为UserDtoList<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());List<User> users = userService.listByIds(ids);List<UserDTO> userDTOS  =new ArrayList<>();for (User user : users) {UserDTO userDTO = new UserDTO();BeanUtils.copyProperties(user,userDTO);userDTOS.add(userDTO);}return Result.ok(userDTOS);
}
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id){return blogService.queryBlogLikes(id);
}

4.5关注和取关

curd:

@Override
public Result follow(Long followUserId, boolean isFollow) {//1.获取当前登录用户Long userId = UserHolder.getUser().getId();if(isFollow){//关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);follow.setCreateTime(LocalDateTime.now());save(follow);}else{//取关QueryWrapper<Follow> queryWrapper = new QueryWrapper();queryWrapper.eq("user_id",userId).eq("follow_user_id",followUserId);remove(queryWrapper);}return Result.ok();
}@Override
public Result isfollow(Long followUserId) {//1.获取当前登录用户Long userId = UserHolder.getUser().getId();//2.查询是否已关注 select count(*) from tb_follow where user_id = #{userId} and follow_user_id = #{followUserId};Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();return Result.ok(count > 0);
}
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") boolean isFollow){return followService.follow(followUserId,isFollow);
}@GetMapping("/or/not/{id}")
public Result isfollow(@PathVariable("id") Long followUserId){return followService.isfollow(followUserId);
}

4.6共同关注

首先是显示用户信息:

UserController

@GetMapping("/{id}")
public Result queryUserById(@PathVariable("id") Long userId){// 查询详情User user = userService.getById(userId);if (user == null) {return Result.ok();}UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);// 返回return Result.ok(userDTO);
}

BlogController

@GetMapping("/of/user")
public Result queryBlogByUserId(@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam("id") Long id) {// 根据用户查询Page<Blog> page = blogService.query().eq("user_id", id).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();return Result.ok(records);
}

但是现在点击共同关注还是没有数据,这里是我们需补充的:求交集可以用Redis的set, 所以数据存放"备份"到Redis

修改之前的关注取关代码:

@Resource
private StringRedisTemplate stringRedisTemplate;@Override
public Result follow(Long followUserId, boolean isFollow) {//1.获取当前登录用户Long userId = UserHolder.getUser().getId();String followKey = "follows:"+userId;if(isFollow){//关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);follow.setCreateTime(LocalDateTime.now());boolean isSave = save(follow);if(isSave){//把被关注用户id放入Redis sadd follows:userId(key) followerId(value)stringRedisTemplate.opsForSet().add(followKey,followUserId.toString());}}else{//取关QueryWrapper<Follow> queryWrapper = new QueryWrapper();queryWrapper.eq("user_id",userId).eq("follow_user_id",followUserId);boolean isRemove = remove(queryWrapper);if(isRemove) {//把被关注用户id从Redis移除stringRedisTemplate.opsForSet().remove(followKey, followUserId.toString());}}return Result.ok();
}

接下来实现共同关注查询

@Override
public Result followCommons(Long followUserId) {//1.先获取当前用户Long userId = UserHolder.getUser().getId();String followKey1 = "follows:" + userId;String followKey2 = "follows:" + followUserId;//2.求交集Set<String> intersect = stringRedisTemplate.opsForSet().intersect(followKey1, followKey2);if(intersect==null||intersect.isEmpty()){return Result.ok(Collections.emptyList());}//3.解析出id数组List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());//4.根据ids查询用户数组 List<User> ---> List<UserDTO>List<UserDTO> userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);
}
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long followUserId){return followService.followCommons(followUserId);
}

测试:

4.7 Feed流实现方案分析

用户关注的人一旦发布了新的笔记,就会第一时间推送给用户,所以我们需要选择TimeLine,我觉得还是消息队列好用

用户不多,所以选择推模式即可

4.8 推送到粉丝收件箱

不能使用传统的分页,因为每当有新数据进入的时候,就会出现角标变动,所以需要利用到滚动分页,记录每一次的lastId

发布后:

查看1,1010用户的收件箱:

4.9 滚动分页查询收件箱

所以不能用角标 只能用score

limit后面的数据就是偏移量,决定取不取得到端点,第一次就要给0,之后的都要给1 (但是不行)

有相同值的话,用score的话会重复查

把limit后面的数字改为 上一次查询的最小值的重复数字的个数

4.10 实现滚动分页查询

public Result queryBloyOfFollow(Long max, Integer offset) {//1.获取当前用户Long userId = UserHolder.getUser().getId();//2.查询当前用户收件箱 zrevrangebyscore key max min limit offset countString feedKey = FEED_KEY + userId;Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(feedKey, 0, max, offset, 2);if(typedTuples==null||typedTuples.isEmpty()){return Result.ok();}//3.解析出收件箱中的blogId,score(时间戳),offsetList<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 0;int count = 1;//最小时间的相同个数for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {//3.1 获取idids.add(Long.valueOf(typedTuple.getValue()));//blog的id//3.2 获取分数(时间戳)long time = typedTuple.getScore().longValue();if(time == minTime){count++;}else{minTime = time;count=1;}}//4.根据blogId查找blogString idStr = StrUtil.join(",",ids);List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id, " + idStr + ")").list();for (Blog blog : blogs) {//4.1 查询blog有关的用户queryBlogUser(blog);//4.2 查询blog是否被点过赞isLikeBlog(blog);}//5.封装并返回ScrollResult r = new ScrollResult();r.setList(blogs);r.setOffset(count);r.setMinTime(minTime);return Result.ok(r);
}
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset){return blogService.queryBloyOfFollow(max,offset);
}

测试:

再发送一条新笔记:

4.11 附近商铺-GEO数据结构的基本使用

附近商铺一般Es使用,这里的话还是用Redis

geoadd g1 116.37 39.86 bjn 116.42 39.90 bj 116.32 39.89 bjx

geodist g1 bjx bj km(默认m)

geosearch g1  fromlonlat 116.39 39.90 byradius 10 km (asc|desc) (withdist)

4.12附近商铺-导入店铺数据到GEO

geoadd的时候member的话存店铺id即可

数据导入Redis:

@Test
public void localshopData(){//1.查询店铺信息List<Shop> shops = shopService.list();//2.店铺按照typeId进行分组 map<typeId,店铺集合>//Map<Long,List<Shop>> map = shops.stream().collect(Collectors.groupingBy(shop -> shop.getTypeId()));Map<Long,List<Shop>> map = shops.stream().collect(Collectors.groupingBy(Shop::getTypeId));//3.分批写入Redisfor (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {//3.1获取类型idLong typeId = entry.getKey();String typeKey = SHOP_GEO_KEY + typeId;//3.2获取这个类型的所有店铺,组成集合List<Shop> value = entry.getValue();//3.3 写入Redis geoadd key 经度 维度 memberList<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());for (Shop shop : value) {//stringRedisTemplate.opsForZSet().add(typeKey, new Point(shop.getX(),shop.getY()), shop.getId().toString());locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(),shop.getY())));}stringRedisTemplate.opsForGeo().add(typeKey,locations);}
}

4.13附近商铺-实现附近商铺功能

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId></exclusion><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.6.2</version></dependency><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>6.1.6.RELEASE</version></dependency>

ShopServiceImpl

    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {//1.判断是否需要根据坐标进行查询if(x==null||y==null){Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));return Result.ok(page.getRecords());}//2.计算分页参数int from = (current-1)*SystemConstants.DEFAULT_PAGE_SIZE;int end = current * SystemConstants.DEFAULT_PAGE_SIZE;//3.查询redis,按照距离排序,分页  geosearch bylonlat x y byredius 10 (km/m) withdistanceString typeKey = SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo().search(typeKey,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)//只能从0到end,后面需要自己截取);//4.解析出idif(results==null){return Result.ok(Collections.emptyList());}//4.1我们要的地方的list集合(店铺Id+distance)List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = results.getContent();//有可能等下skip把数据都跳过了,所以需要判空if(content.size()<=from){//没有下一页return Result.ok(Collections.emptyList());}//4.2.截取first-endList<Long> ids = new ArrayList<>(content.size());Map<String,Distance> distMap = new HashMap<>(content.size());content.stream().skip(from).forEach(result ->{//4.2.1获取店铺IdString shopIdStr = result.getContent().getName();ids.add(Long.valueOf(shopIdStr));//4.2.2获取距离Distance distance = result.getDistance();distMap.put(shopIdStr,distance);});//这里其实就是通过stream流将shopId提取出来,并且要根据距离进行排序//5.根据id查询shopString idStr = StrUtil.join(",",ids);//1,2,3,4...// .... where id in #{ids} order by field(id,1,2,3,4...) 根据id排序List<Shop> shops = query().in("id", ids).last("order by field(id," + idStr + ")").list();for (Shop shop : shops) {shop.setDistance(distMap.get(shop.getId().toString()).getValue());}return Result.ok(shops);}

ShopController

@GetMapping("/of/type")
public Result queryShopByType(@RequestParam("typeId") Integer typeId,@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam(value = "x",required = false) Double x,@RequestParam(value = "y",required = false) Double y) {return shopService.queryShopByType(typeId,current,x,y);
}

4.15用户签到-BitMap功能演示

不能直接用数据库表来存储数据

一看就是要010101,所以就是bit数组

4.16用户签到-实现签到功能

UserServiceImpl:

@Override
public Result sign() {//1.获取当前用户Long userId = UserHolder.getUser().getId();//2.获取当前日期LocalDateTime now = LocalDateTime.now();//3.拼接key sign:1010:202211String key = USER_SIGN_KEY + userId + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));//4.获取今天是本月的第几天int index = now.getDayOfMonth();//month是1~31,而offset是0~30//5.写入Redis setbit key offset 1stringRedisTemplate.opsForValue().setBit(key,index-1,true);//true即签到1return Result.ok();
}

UserController:

@GetMapping("/sign")//签到
public Result sign(){return userService.sign();
}

4.17 用户签到-统计连续签到

@Override
public Result signCount() {//1.获取当前用户Long userId = UserHolder.getUser().getId();//2.获取当前日期LocalDateTime now = LocalDateTime.now();//3.拼接key sign:1010:202211String key = USER_SIGN_KEY + userId + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));//4.获取今天是本月的第几天int index = now.getDayOfMonth();//month是1~31,而offset是0~30//1.获取本月截至今天为止的签到记录 bitfield sign:1010:202211 get u(index) 0 可以做多次操作,所以结果是listList<Long> result = stringRedisTemplate.opsForValue().bitField(key,//子命令BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(index)).valueAt(0));if(result==null||result.isEmpty()){//没有任何签到结果return Result.ok(0);}Long number = result.get(0);if(number==null||number==0){return Result.ok(0);}//2.number位运算求签到次数int count = 0;while(number>0){if((number&1)!=0){count++;number>>=1;}else break;}return Result.ok(count);
}
@GetMapping("/sign/count")
public Result signCount(){return userService.signCount();
}

4.18 UV统计-HyperLogLog的用法

4.19 UV统计-测试百万数据的统计

@Test
void testHyperLogLog(){String[] values = new String[1000];int j=0;for(int i=0;i<1000000;i++){j=i%1000;values[j]="user_"+i;if(j==999){stringRedisTemplate.opsForHyperLogLog().add("hl2",values);}}//统计数量System.out.println(stringRedisTemplate.opsForHyperLogLog().size("hl2"));
}

测试一百万个数据,最后存入997593个数据,内存一开始是1679336,现在变成1693720,相差14384,也就是14.046875kb,没超过16kb

ap.put(shopIdStr,distance);
});//这里其实就是通过stream流将shopId提取出来,并且要根据距离进行排序

    //5.根据id查询shopString idStr = StrUtil.join(",",ids);//1,2,3,4...// .... where id in #{ids} order by field(id,1,2,3,4...) 根据id排序List<Shop> shops = query().in("id", ids).last("order by field(id," + idStr + ")").list();for (Shop shop : shops) {shop.setDistance(distMap.get(shop.getId().toString()).getValue());}return Result.ok(shops);
}

ShopController```java
@GetMapping("/of/type")
public Result queryShopByType(@RequestParam("typeId") Integer typeId,@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam(value = "x",required = false) Double x,@RequestParam(value = "y",required = false) Double y) {return shopService.queryShopByType(typeId,current,x,y);
}

4.15用户签到-BitMap功能演示

不能直接用数据库表来存储数据

[外链图片转存中…(img-BVeaLslY-1669222806355)]

一看就是要010101,所以就是bit数组

[外链图片转存中…(img-VjL0sMpK-1669222806356)]

[外链图片转存中…(img-R1BWnuJQ-1669222806356)]

[外链图片转存中…(img-4soW8Ad1-1669222806356)]

[外链图片转存中…(img-AGP1n9wi-1669222806357)]

4.16用户签到-实现签到功能

[外链图片转存中…(img-23Wh4l3W-1669222806357)]

UserServiceImpl:

@Override
public Result sign() {//1.获取当前用户Long userId = UserHolder.getUser().getId();//2.获取当前日期LocalDateTime now = LocalDateTime.now();//3.拼接key sign:1010:202211String key = USER_SIGN_KEY + userId + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));//4.获取今天是本月的第几天int index = now.getDayOfMonth();//month是1~31,而offset是0~30//5.写入Redis setbit key offset 1stringRedisTemplate.opsForValue().setBit(key,index-1,true);//true即签到1return Result.ok();
}

UserController:

@GetMapping("/sign")//签到
public Result sign(){return userService.sign();
}

[外链图片转存中…(img-Ici0j30K-1669222806357)]

[外链图片转存中…(img-ESXMvZUH-1669222806357)]

4.17 用户签到-统计连续签到

[外链图片转存中…(img-mBMoWKPN-1669222806358)]

@Override
public Result signCount() {//1.获取当前用户Long userId = UserHolder.getUser().getId();//2.获取当前日期LocalDateTime now = LocalDateTime.now();//3.拼接key sign:1010:202211String key = USER_SIGN_KEY + userId + now.format(DateTimeFormatter.ofPattern(":yyyyMM"));//4.获取今天是本月的第几天int index = now.getDayOfMonth();//month是1~31,而offset是0~30//1.获取本月截至今天为止的签到记录 bitfield sign:1010:202211 get u(index) 0 可以做多次操作,所以结果是listList<Long> result = stringRedisTemplate.opsForValue().bitField(key,//子命令BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(index)).valueAt(0));if(result==null||result.isEmpty()){//没有任何签到结果return Result.ok(0);}Long number = result.get(0);if(number==null||number==0){return Result.ok(0);}//2.number位运算求签到次数int count = 0;while(number>0){if((number&1)!=0){count++;number>>=1;}else break;}return Result.ok(count);
}
@GetMapping("/sign/count")
public Result signCount(){return userService.signCount();
}

[外链图片转存中…(img-E5cKsdSE-1669222806358)]

[外链图片转存中…(img-p1r1Q2YP-1669222806358)]

4.18 UV统计-HyperLogLog的用法

[外链图片转存中…(img-78CvCdkL-1669222806358)]

[外链图片转存中…(img-eCqrGWtm-1669222806359)]

[外链图片转存中…(img-DstE1Vp6-1669222806359)]

4.19 UV统计-测试百万数据的统计

@Test
void testHyperLogLog(){String[] values = new String[1000];int j=0;for(int i=0;i<1000000;i++){j=i%1000;values[j]="user_"+i;if(j==999){stringRedisTemplate.opsForHyperLogLog().add("hl2",values);}}//统计数量System.out.println(stringRedisTemplate.opsForHyperLogLog().size("hl2"));
}

测试一百万个数据,最后存入997593个数据,内存一开始是1679336,现在变成1693720,相差14384,也就是14.046875kb,没超过16kb

[外链图片转存中…(img-vP23NmqT-1669222806359)]

黑马Redis学习笔记 (基础篇+实战篇)相关推荐

  1. Redis学习笔记①基础篇_Redis快速入门

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 资料链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA( ...

  2. OpenGL学习笔记一之实战篇二 文本渲染

    转载自 https://learnopengl-cn.github.io/06%20In%20Practice/02%20Text%20Rendering/ 当你在图形计算领域冒险到了一定阶段以后你可 ...

  3. OpenGL学习笔记一之实战篇五 2D游戏(Breakout)之渲染精灵

    转载自 https://learnopengl-cn.github.io/06%20In%20Practice/2D-Game/03%20Rendering%20Sprites/ 本节暂未进行完全的重 ...

  4. Redis学习笔记(面试+实战)

    文章目录 概念(面试) 1.什么是Redis 2.Redis的优缺点 3.Redis为什么这么快 4.Redis的持久化 4.1 什么是Redis持久化 4.2 Redis持久化机制 4.2.1RDB ...

  5. Redis学习笔记②实战篇_黑马点评项目

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 资料链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA( ...

  6. Redis学习笔记(实战篇)(自用)

    Redis学习笔记(实战篇)(自用) 本文根据黑马程序员的课程资料与百度搜索的资料共同整理所得,仅用于学习使用,如有侵权,请联系删除 文章目录 Redis学习笔记(实战篇)(自用) 1.基于Sessi ...

  7. Redis学习笔记1-理论篇

    目录 1,Redis 数据类型的底层结构 1.1,Redis 中的数据类型 1.2,全局哈希表 1.3,数据类型的底层结构 1.4,哈希冲突 1.5,rehash 操作 2,Redis 的 IO 模型 ...

  8. MySQL学习笔记-基础篇1

    MySQL 学习笔记–基础篇1 目录 MySQL 学习笔记--基础篇1 1. 数据库概述与MySQL安装 1.1 数据库概述 1.1.1 为什么要使用数据库 1.2 数据库与数据库管理系统 1.2.1 ...

  9. 菜鸟学习笔记:Java提升篇9(网络1——网络基础、Java网络编程)

    菜鸟学习笔记:Java提升篇9(网络1--网络基础.Java网络编程) 网络基础 什么是计算机网络 OS七层模型 Java网络编程 InetAddress InetSocketAddress URL类 ...

最新文章

  1. JAVASCRIPT复制到剪贴板
  2. Gif(1)-加载视图-交替圆效果
  3. Intellij-Cannot download Sources解决方法
  4. 坑你没商量!盘点Java中最常见的事故现场,你都中过哪些招?
  5. 网络推广下叮咚买菜已完成D轮融资,生鲜电商下一次融资又在何方?
  6. Palindromic Numbers LightOJ - 1205 数位dp 求回文数
  7. 6月份Asp.net源码推荐
  8. GitHub Desktop离线安装包
  9. numpy-ufunc函数
  10. 黑鲨helo支持html吗,黑鲨游戏手机Helo综合评测 到底值不值得买
  11. Unity 制作一个网格地图生成组件
  12. SIFT特征匹配算法介绍——寻找图像特征点的原理
  13. 微信开发者工具 the permission value is offline verifying 异常
  14. I03 403-(Python+mysql) 飞机票销售系统
  15. 人才引进--我搭上了顺风车
  16. 南京 徐小刚 计算机,基于混合粒子PHD滤波的多目标视频跟踪
  17. android 取imei p10,纯干货 | 一般人不知道的几个华为P10小技巧
  18. 易语言服务器端口总被占用,易语言检测端口是否被占用的代码
  19. 测试点设计及编写思路
  20. 用Multisim14.0仿真电感L、电容C与电阻R的电压、电流相位关系

热门文章

  1. PoE交换机的多种连接方式 PoE交换机的4种连接方式
  2. ls -l 字段意思
  3. JavaScript检查数组中是否有重复值
  4. 程序员口中常说的API是什么意思?什么是API?
  5. fread函数和fwrite函数详解
  6. Linux下文件丢失问题
  7. 宝宝发烧手脚冰凉怎么办 如何正确给宝宝退烧
  8. 栈(stack)简单实现,系统栈是如何保存函数调用信息的?
  9. Python批量制作抖音的卡点视频原来这么简单!
  10. Java的流程控制语句