文章目录

  • 1. NoSQL数据库
    • 1.1 技术发展
    • 1.2 NoSQL数据库
  • 2. Redis概述与安装
    • 2.1 安装
    • 2.2 操作
    • 2.3 Redis相关知识
  • 3. 常用五大数据类型
    • 3.1 Redis键(kesdy)
    • 3.2 Redis字符串(String)
      • 3.2.1 简介
      • 3.2.2 常用命令
      • 3.2.3 原子性
      • 3.2.4 数据结构
    • 3.3 Redis列表(List)
      • 3.3.1 简介
      • 3.3.2 常用命令
      • 3.3.3 数据结构
    • 3.4 Redis集合(Set)
      • 3.4.1 简介
      • 3.4.2 常用命令
      • 3.4.3 数据结构
    • 3.5 Redis哈希(Hash)
      • 3.5.1 简介
      • 3.5.2 常用命令
      • 3.5.3 数据结构
    • 3.6 Redis有序集合(Zset)
      • 3.6.1 简介
      • 3.6.2 常用命令
      • 3.6.3 数据结构
      • 3.6.4 跳跃表
  • 4. Redis配置文件(部分)
    • 4.1 Units单位
    • 4.2 INCLUDS
    • 4.3 GENERAL
    • 4.4 SECURITY
    • 4.5 LIMIT
  • 5. redis的发布和订阅
    • 5.1 什么是发布和订阅?
    • 5.2 发布和订阅理解
    • 5.3 发布订阅命令行实现
  • 6. Redis6新数据类型
    • 6.1 Bitmaps
      • 6.1.1 简介
      • 6.1.2 命令
      • 6.1.3 BItmaps与set做比较
    • 6.2 HyperLogLog
      • 6.2.1 简介
      • 6.2.2 命令
    • 6.3 Geospatial
      • 6.3.1 简介
      • 6.3.2 命令
  • 7. Jedis操作Redis
    • 7.1 测试连接
    • 7.2 redis实例—手机验证码
  • 8. SpringBoot 整合Redis
  • 9. Redis6的实务操作
    • 9.1 Redis的事务定义
    • 9.2 Multi、Exec、discard
    • 9.3 事务的错误处理
    • 9.4 事务冲突的问题
      • 9.4.1 悲观锁
      • 9.4.2 乐观锁
      • 9.4.3 WATCH key[key...]
    • 9.5 Redis事务三特性
  • 10. Redis持久化操作
    • 10.1 RDB
      • 10.1.1 配置文件
      • 10.1.2 操作机制
      • 10.1.3 优缺点
    • 10.2 AOF
      • 10.2.1 配置文件
      • 10.2.2 操作
      • 10.3 AOF修复
      • 10.4 优缺点
    • 10.3 RDB和AOF扩展
  • 11. 主从复制
    • 11.1 搭建一主多从
    • 11.2 复制原理之一主两从
    • 11.3 复制原理之薪火相传
    • 11.4 复制原理之反客为主
    • 11.5 哨兵模式
      • 11.5.1 总结
    • 11.6 缺点:复制延时
  • 12. Redis 集群
    • 12.1 简介
    • 12.2 搭建Redis集群
    • 12.3 集群操作和故障修复
    • 12.4 集群的Jedis开发
    • 12.5 集群优缺点
  • 13. Redis6-应用问题解决
    • 13.1 缓存穿透
    • 13.2 缓存击穿
    • 13.3 缓存雪崩
    • 13.4 分布式锁
      • 13.4.1 解决方案:基于redis实现分布式锁
      • 13.4.2 代码演示
      • 13.4.3 代码优化之设置过期时间
      • 13.4.4 代码优化之UUID防止误删
      • 13.4.5 优化代码之LUA脚本保证删除原子性
  • 14. Redis6 新功能
    • 14.1 ACL
      • 14.1.1 简介
      • 14.1.2 命令
    • 14.2 IO多线程
    • 14.3 工具支持 Cluster

1. NoSQL数据库

基于尚硅谷Redis6教程:点击学习

1.1 技术发展

  • 技术的分类:

    • 解决功能性问题:Java、Jsp、Tomcat、HTML、Linux、JDBC、SVN
    • 解决扩展性问题:Struts、Spring、SpringMVC、Mybatis
    • 解决性能问题:NOSQL、Java线程、Nginx、MQ、ElasticSearch
  • session问题,session应该存在哪?

    • 存储到客户端cookie(安全性差,网络负担效率低)

    • session复制(数据冗余,节点越多浪费越大)

    • 存在文件服务器或数据库里(大量的io效率问题)

    • NoSQL数据库(完全在内存中,速度快,数据结构简单)

  • 解决IO压力

    • 缓存数据库:减少io的读操作
    • 列式数据库,文档数据库(专门的数据用特定的方式存储,提高效率)

1.2 NoSQL数据库

NoSQL (Not Only SQL),泛指非关系型的数据库。

NoSQL不依赖业务逻辑方式存储,而以简单的key-value模式存储,因此大大的增加了数据库的扩展能力。

  • 不遵循SQL标准
  • 不支持ACID(原子性,一致性,隔离性,持久性)
  • 远超与SQL的性能

适用场景:

  • 对数据高并发的读写
  • 海量数据的读写
  • 对数据高可扩展性

不适用场景:

  • 需要实物支持

  • 基于SQL的结构化查询存储,处理复杂的关系,需要即席查询。

  • 用不着SQL和用了SQL也不行的情况,考虑用NoSQL

2. Redis概述与安装

  • Redis是一个开源的key-value存储系统
  • 和Memcached类似,它支持存储的value类型相对更多,包括string、list、set、zset(sorted set–有序集合)和hash
  • 都支持push/pop、add/remove 及取交集、并集和差集和更丰富的操作,这些操作都是原子性的
  • Redis支持各种不同方式的排序
  • 数据都缓存在内存中
  • Redis会周期性的把更新的数据写入磁盘或者把修改的操作写入追加的记录文件
  • 实现了master-salve(主从)同步

2.1 安装

  • 官网下载:https://redis.io/

  • 放入Linux系统的opt目录下

  • 安装C语言的编译环境,gcc编译器

    # 查看gcc版本 没有的话安装
    gcc -version
    # 安装命令
    yum install centos-release-scl scl-utils-build
    yum install -y devtoolset-8-toolchain
    scl enable devtoolset-8 bash
    
  • 安装redis

    # 解压
    tar -zxvf redis-6.2.5.tar.gz
    # 进入目录
    cd redis-6.2.5
    # 执行make命令(只是编译好)
    make
    # 安装命令
    make install
    
  • 安装目录:/usr/local/bin

    查看默认安装目录:

    • redis-benchmark:性能测试工具,可以再自己本子运行,查看自己本子性能
    • redis-check-aof:修复有问题的AOF文件
    • redis-check-dump:修复有问题的dump.rdb文件
    • redis-sentinel:Redis集群使用
    • redis-server:Redis服务器启动命令
    • redis-cli:客户端,操作入口

2.2 操作

  • 前台启动(不推荐)

    • 缺点启动完成后,不能再进行其他操作
    redis-server # 启动
    Ctrl + C # 关闭
    
  • 后台启动(推荐)

    • 关闭窗口后还可以继续使用
    cd /opt
    cd redis-6.2.5 # 进入目录
    cp redis.conf /etc/redis.conf # conf文件复制到etc中
    cd /ect
    vi redis.conf # 修改文件
    

    文件中的daemonize no —改为—> daemonize yes

    cd /usr/local/bin
    redis-server /etc/redis.conf # 启动redis
    ps -ef|grep redis # 查看redis进程
    

redis-cli # 连接redis
shutdown # 关闭1
kill -9 xxxx(端口号) # 关闭2

2.3 Redis相关知识

  • 默认端口6379
  • 默认16割数据库,类似数组下标从0开始,初始默认使用0号库
  • 使用命令**select<dbid>**来切换数据库。如select 8
  • 统一密码管理,所有库同样密码
  • Redis是 单线程 + 多路IO复用技术

3. 常用五大数据类型

3.1 Redis键(kesdy)

  • keys * 查看当前库所有的key(匹配:key*1)
  • exists key 判断某个key是否存在
  • type key 查看你的key是什么类型
  • del key 删除指定的key数据
  • unlink key 根据value选择非阻塞删除
    • 仅将keys从keyspace元数据中删除,真正的删除会在后续异步操作
  • expire key 10 10秒钟:为给定的key设置过期时间
  • ttl key 查看还有多少秒过期,-1表示永不过期,-2表示已过期
  • select 命令切换数据库
  • dbsize 查看当前数据库key的数量
  • flushdb 清空当前库
  • flushall 通杀全部库
# 连上redis
cd /usr/local/bin
redis-server /etc/redis.conf
redis.cli# 设置三个值
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> set k1 zhangsan
OK
127.0.0.1:6379> set k2 lisi
OK
127.0.0.1:6379> set k3 wangwu
OK
# 测试命令
127.0.0.1:6379> keys * # 查看
1) "k3"
2) "k2"
3) "k1"127.0.0.1:6379> exists k1 # 查询是否存在 1存在 0不存在
(integer) 1
127.0.0.1:6379> exists k4
(integer) 0127.0.0.1:6379> type k2 # 查看类型
string127.0.0.1:6379> del k3 # del 删除
(integer) 1127.0.0.1:6379> unlink k2 # unlink 删除
(integer) 1
127.0.0.1:6379> keys *
1) "k1"127.0.0.1:6379> expire k1 10 # 设置过期时间
(integer) 1
127.0.0.1:6379> ttl k1 # 查询过期时间
(integer) 5
127.0.0.1:6379> ttl k1
(integer) 2
127.0.0.1:6379> ttl k1
(integer) -2 # 过期了
127.0.0.1:6379> ttl k2
(integer) -1 # 永不过期127.0.0.1:6379> select 0 # 选择数据库
OK
127.0.0.1:6379> dbsize # 查看数据库key数量
(integer) 1
127.0.0.1:6379> flushdb 清空
OK

3.2 Redis字符串(String)

3.2.1 简介

  • String是Redis最基本的类型,一个key对应一个value
  • String类型是二进制安全的。意味着Redis的String可以包含任何数据,比如jpg或者序列化的对象
  • 一个Redis字符串value最多可以使521M

3.2.2 常用命令

  • set<key><value> 添加键值对

  • get<key> 查询key的值

    127.0.0.1:6379> set key1 value1
    OK
    127.0.0.1:6379> set key2 value2
    OK
    127.0.0.1:6379> keys *
    1) "key2"
    2) "key1"
    127.0.0.1:6379> get key1
    "value1"
    127.0.0.1:6379> set key1 value111 # 会覆盖
    OK
    127.0.0.1:6379> get key1
    "value111"
    
  • append<key><value> 将给定的 value 追加到原始的末尾

    127.0.0.1:6379> append key1 abc
    (integer) 11 # 新value的总长度
    127.0.0.1:6379> get key1
    "value111abc"
    
  • strlen<key> 获得值的长度

    127.0.0.1:6379> strlen key1
    (integer) 11
    
  • setnx<key><value> 只有在key不存在时,设置key的值

    127.0.0.1:6379> setnx key1 value1 # key1 存在
    (integer) 0 # 设置失败
    127.0.0.1:6379> setnx key4 value4 # key4 不存在
    (integer) 1 # 设置成功
    
  • incr<key>

    • 将key中存储的数字值增1
    • 只能对数字值操作,如果为空,新增值为1
  • decr<key>

    • 将key中存储的数字值减1
    • 只能对数字值操作,如果为空,新增值为-1
    127.0.0.1:6379> set k6 600
    OK
    127.0.0.1:6379> incr k6
    (integer) 601
    127.0.0.1:6379> decr k6
    (integer) 600
    
  • incrby/decrby <key><步长> 将key中存储的数字值增减,自定义步长

    127.0.0.1:6379> incrby k6 10
    (integer) 610
    127.0.0.1:6379> decrby k6 10
    (integer) 600
    
  • mset<key1><value1><key2><value2>... 同时设置一个或多个key-value对

  • mget<key1><key2>... 同时获取一个或多个value

  • msetnx<key1><value1><key2><value2>... 同时设置一个或多个key-value对,当且仅当所有给定key都不存在

    • 原子性:有一个失败则都失败
    127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3
    OK
    127.0.0.1:6379> keys *
    1) "k3"
    2) "k2"
    3) "k1"
    127.0.0.1:6379> mget k1 k2 k3
    1) "v1"
    2) "v2"
    3) "v3"
    127.0.0.1:6379> msetnx k11 v11 k12 v12
    (integer) 1 # 成功
    127.0.0.1:6379> msetnx k11 v11 k12 v13
    (integer) 0 # 失败
    
  • getrange<key><起始位置><结束位置> 获得值的范围,类似java中的substring,前包,后包

    127.0.0.1:6379> set name zhangsan
    127.0.0.1:6379> getrange name 0 4
    "zhang"
    
  • strange<key><起始位置><value> 用value覆写key所存储的字符串值,从<起始位置>开始(索引从0开始)

    127.0.0.1:6379> setrange name 3 abc
    (integer) 8
    127.0.0.1:6379> get name
    "zhaabcan"
    
  • setex<key><过期时间><value> 设置键值的同时,设置过期时间,单位秒

    127.0.0.1:6379> setex time 10 value
    OK
    127.0.0.1:6379> ttl time
    (integer) 7
    127.0.0.1:6379> ttl time
    (integer) -2
    
  • getset<key><value> 以新换旧,设置了新值同时获取旧值

    127.0.0.1:6379> getset name lisi
    "zhaabcan"
    127.0.0.1:6379> get name
    "lisi"
    

3.2.3 原子性

  • 不是指事务中的原子性,是redis特有的名词
  • 所谓原子操作是指不会被线程调度机制打断的操作
  • 这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)
    • 在单线程中,能够在单条指令中完成的操作都可以认为是原子操作,因为中断只能发生于指令之间
    • 在多线程中,不能被其他进程(线程)打断的操作就叫原子操作。
  • Redis单命令的原子性主要得益于Redis的单线程

案例:

  1. java中的i++是否是原子操作?

    不是,java是多线程操作

  2. i=0;两个线程分别对i进行++100次,值是多少?

    值:2~200

3.2.4 数据结构

  • String的数据结构为简单动态字符串(Simple Dynamic String,SDS)。是可以修改的字符串,内部结构实现上类似于java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配。

  • 如图,内部为当前字符串实际分配的空间 capacity 一般要高于实际字符串长度len。当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为521M。

3.3 Redis列表(List)

3.3.1 简介

  • 单键多值
  • Redis列表是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
  • 它的底层实际是个双向链表,对两端的操作性能更高,通过索引下表的操作中间的节点性能会比较差

3.3.2 常用命令

  • lpush/rpush <key><value1><value2>... 从左边/右边插入一个或多个值

    127.0.0.1:6379> clear
    127.0.0.1:6379> lpush k1 v1 v2 v3
    (integer) 3
    127.0.0.1:6379> lrange k1 0 -1
    1) "v3"
    2) "v2"
    3) "v1"
    127.0.0.1:6379> rpush k2 v1 v2 v3
    (integer) 3
    127.0.0.1:6379> lrange k2 0 -1
    1) "v1"
    2) "v2"
    3) "v3"
    
  • lpop/rpop<key> 从左边/右边吐出一个值。值在键在,值光键亡

    127.0.0.1:6379> lpop k1  # 取一个少一个
    "v3"
    127.0.0.1:6379> rpop k2
    "v3"
    
  • rpoplpush<key1><key2> 从key1列表右边吐出一个值,插到key2列表左边

    127.0.0.1:6379> lpush k1 v1 v2 v3
    (integer) 3
    127.0.0.1:6379> rpush k2 v11 v22 v33
    (integer) 3
    127.0.0.1:6379> rpoplpush k1 k2
    "v1"
    127.0.0.1:6379> lrange k2 0 -1
    1) "v1"
    2) "v11"
    3) "v22"
    4) "v33"
    
  • lrange<key><start><stop> 按照索引下标获得元素(从左到右),0左边第一个,-1右边第一个,(0 -1表示获取所有)

  • lindex<key><value> 按照索引下标获得元素(从左到右)

    127.0.0.1:6379> lindex k2 0
    "v1"
    127.0.0.1:6379> lindex k2 1
    "v11"
    
  • llen<key> 获得列表长度

  • linsert<key> before/after <value><newValue> 在value后面插入newValue

    127.0.0.1:6379> linsert k2 before "v11" "newV11"
    (integer) 5
    127.0.0.1:6379> lrange k2 0 -1
    1) "v1"
    2) "newV11"
    3) "v11"
    4) "v22"
    5) "v33"
    
  • lrem<key><n><value> 从左边删除n个value(从左到右)

    127.0.0.1:6379> lrem k2 2 "newV11"
    (integer) 2
    127.0.0.1:6379> lrange k2 0 -1
    1) "v1"
    2) "v11"
    3) "v22"
    4) "v33
    
  • lset<key><index><value> 将列表key下标为index的值替换成value

    127.0.0.1:6379> lset k2 1 newV1
    OK
    127.0.0.1:6379> lrange k2 0 -1
    1) "v1"
    2) "newV1"
    3) "v22"
    4) "v33"
    

3.3.3 数据结构

  • List数据结构为快速链表quickList
  • 首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,即压缩列表,它将所有的元素紧挨着一起存储,分配的是这一块连续的内存
  • 当数据量比较多的时候才会改成quickList
  • 因为普通的链表需要的附加指针空间太大,会比较浪费空间。
  • Redis将链表和ZipList结合起来组成了quickList。也就是将多个ZipList使用双向指针穿起来操作,这样既满足了快速的插入删除性能,又不会出现太大的空间冗余

3.4 Redis集合(Set)

3.4.1 简介

  • Redis Set对外提供的功能与list类似,是一个列表的功能,特殊之处在于set是可以自动排重
  • Set 是String类型的无序集合,底层是一个value为null的hash表,所以增删改查的复杂度是O(1)

3.4.2 常用命令

  • sadd<key><value1><value2>... 将一个或多个member元素加入到集合key中,已经存在的member忽略

  • smembers<key> 取出该集合所有值

  • sismember<key><value> 判断集合key是否含有该value值,有1,没有0

  • scard<key> 返回该集合的元素个数

  • srem<key><value1><value2>... 删除集合中的某个元素

  • spop<key> 随机从该集合中吐出一个值

  • srandmember<key><n> 随机从该集合中取出n个值,不会从集合中删除

    127.0.0.1:6379> sadd k1 v1 v2 v3
    (integer) 3
    127.0.0.1:6379> smembers k1
    1) "v3"
    2) "v2"
    3) "v1"
    127.0.0.1:6379> sismember k1 v1
    (integer) 1
    127.0.0.1:6379> scard k1
    (integer) 3
    127.0.0.1:6379> spop k1
    "v3"
    127.0.0.1:6379> srandmember k1 2
    1) "v2"
    2) "v1"
    
  • smove<source><destination>value 把集合中一个值从另一个集合移动到另一个集合

  • sinter<key1><key2> 返回两个集合的交集

  • sunion<key1><key2> 返回两个集合的并集

  • sdiff<key1><key2> 返回两个集合的差集元素(key1有,不包含key2)

    127.0.0.1:6379> sadd k1 v1 v2 v3
    (integer) 3
    127.0.0.1:6379> sadd k2 v1 v2 v4
    (integer) 3
    127.0.0.1:6379> smove k1 k2 v3
    (integer) 1
    127.0.0.1:6379> smembers k2
    1) "v3"
    2) "v2"
    3) "v4"
    4) "v1"
    127.0.0.1:6379> sinter k1 k2
    1) "v2"
    2) "v1"
    127.0.0.1:6379> sunion ke k2
    1) "v2"
    2) "v3"
    3) "v1"
    4) "v4"
    127.0.0.1:6379> sdiff k1 k2
    (empty array)
    

3.4.3 数据结构

  • Set的数据结构是dict字典,字典是用hash表实现的
  • Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象
  • Redis的set结构也一样,它的内部也使用hash结构,所有的value都只想同一个内部值

3.5 Redis哈希(Hash)

3.5.1 简介

  • Redis hash 是一个键值对集合
  • Redis hash 是一个String类型的field和value的映射表,hash特别适用于存储对象
  • 类似Java中的Map<String,Object>

3.5.2 常用命令

  • hset<key><field><value> 给key集合中的field键赋值value

  • hget<key1><field> 从key1集合field取出value

    127.0.0.1:6379> hset user:1001 id 1
    (integer) 1
    127.0.0.1:6379> hset user:1001 name zhangsan
    (integer) 1
    127.0.0.1:6379> hget user:1001 id
    "1"
    127.0.0.1:6379> hget user:1001 name
    "zhangsan"
    
  • hmset<key1><field1><value1><field2><vaule2>... 批量设置hash的值

  • hexists<key1><field> 查看哈希表key中,给定域field是否存在

  • hkeys<key> 列出该hash集合的所有field

  • hvals<key> 列出所有values

  • hincrby<key><field><increment> 为hash表key中的域field的值加上增量1 -1

  • hsetnx<key><field><value> 将hash表key中的域field的值设置为value,当且仅当field不存在

    127.0.0.1:6379> hmset user:1002 id 2 name lisi age 18
    OK
    127.0.0.1:6379> hexists user:1002 id
    (integer) 1
    127.0.0.1:6379> hexists user:1002 psw
    (integer) 0
    127.0.0.1:6379> hkeys user:1002
    1) "id"
    2) "name"
    3) "age"
    127.0.0.1:6379> hvals user:1002
    1) "2"
    2) "lisi"
    3) "18"
    127.0.0.1:6379> hincrby user:1002 age 2
    (integer) 20
    127.0.0.1:6379> hsetnx user:1002 psw 111111
    (integer) 1
    

    3.5.3 数据结构

  • hash类型对应数据结构有两种:

    • ZipList(压缩列表)
    • HashTable(哈希表)
  • 当 field-value 长度较短且个数较少时,使用ZipList,否则使用HashTable

3.6 Redis有序集合(Zset)

3.6.1 简介

  • Redis有序集合zset和普通集合set非常相似,是一个没有重复元素的字符串集合
  • 不同之处是有序集合的每个成员都关联了一个评分(score),这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以使重复的。
  • 因为元素是有序的,所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素
  • 访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表

3.6.2 常用命令

  • zadd<key><score1><value1><score2><value2>... 将一个或多个member元素及其score值加入到有序集合key中

  • zrange<key><start><stop>[WITHSCORES]

    • 返回有序集key中,下标在 start - stop 之间的元素
    • 带 WITHSCORES ,可以让分数一起和值返回到结果集
  • zrangebyscore key minmax [withscores][limit offset count]

    • 返回有序集key中,所有score值介于min和max(包括min,max)之间的成员
    • 有序集成员按score值递增排列
  • zrevrangebyscore key maxmin [withscores][limit offset count]

    • 同上,递减
      127.0.0.1:6379> zadd topn 200 java 300 c++ 400 mysql 500 php(integer) 4127.0.0.1:6379> zrange topn 0 -1
    1) "java"
    2) "c++"
    3) "mysql"
    4) "php"
    127.0.0.1:6379> zrange topn 0 -1 withscores
    1) "java"
    2) "200"
    3) "c++"
    4) "300"
    5) "mysql"
    6) "400"
    7) "php"
    8) "500"
    127.0.0.1:6379> zrangebyscore topn 300 500
    1) "c++"
    2) "mysql"
    3) "php"
    127.0.0.1:6379> zrangebyscore topn 300 500 withscores
    1) "c++"
    2) "300"
    3) "mysql"
    4) "400"
    5) "php"
    6) "500"
    127.0.0.1:6379> zrevrangebyscore topn 500 300 withscores
    1) "php"
    2) "500"
    3) "mysql"
    4) "400"
    5) "c++"
    6) "300"
    
  • zincrby<key><increment><value> 为元素的score加上增量

  • zrem<key><value> 删除该集合下,指定值的元素

  • zcount<key><min><max> 统计该集合,分数区间内的元素个数

  • zrank<key><value> 返回该值在集合中的排名,从0开始

    127.0.0.1:6379> zincrby topn 50 java
    "250"
    127.0.0.1:6379> zrem topn php
    (integer) 1
    127.0.0.1:6379> zcount topn 200 300
    (integer) 2
    127.0.0.1:6379> zrank topn java
    (integer) 0
    

3.6.3 数据结构

  • Zset是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String,Double>,可以给每一个元素value赋予一个权重score,另外一方面它又类似于TressSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。
  • Zset底层使用了两个数据结构
    • hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到对应的score值
    • 跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表

3.6.4 跳跃表

1.简介

​ 有序集合在生活中比较常见,例如根据成绩对学生进行排名,根据得分对玩家排名等。对于有序集合的底层实现,可以用数组、平衡 树、链表等。数组不便于元素的插入、删除;平衡树或红黑树虽然效率高但是结构复杂;链表查询需要遍历所有,效率低。所以redis采用的是跳跃表,跳跃表效率堪比红黑树,实现远比红黑树简单。

2.实例

​ 对比有序链表和跳跃表,从链表中查询出51

(1)有序链表

​ 要查询值为51的元素,需要从第一个元素开始依次查找、比较才能找到,共需要6次比较

(2)跳跃表

​ 先从第2层招,51比1大,到21,51比21大,指向null,在第2层找不到

​ 跳到第一层,找21的下一个节点,到41,51比41大,到61,51比61小,第一层找不到

​ 跳到第0层,找41的下一个节点,找到51

​ 经过四次查找

4. Redis配置文件(部分)

4.1 Units单位

  • 配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit,大小写不敏感

4.2 INCLUDS

  • 类似jsp中的include,多实例的情况可以把公用的配置文件提取出来

3.NETWORK

  • bind

    • 默认情况 bind = 127.0.0.1 只能接收本机的访问请求
    • 不写的情况下,无限制接收任何ip地址的访问
    • 生产环境肯定要写自己应用服务器的地址;服务器是需要远程访问的,所以需要将其注释掉
  • protected-mode

    • 如果开启了 protected-mode ,那么在没有设定bind ip 且没有设密码的情况下,Redis只允许接收本机的响应
  • port

    • 端口号默认6379
  • tcp-backlog

    • 设置tcp的backlog,backlog其实是一个连接队列,backlog队列总和 = 未完成三次握手队列 + 已经完成三次握手队列
    • 在高并发环境下需要一个高backlog值来避免慢客户端连接问题
    • 注意Linux内核会将这个值减小到 /proc/sys/net/core/somaxconn 的值(128),所以需要确认增大 /proc/sys/net/core/somaxconn 和 /proc/sys/net/ipv4/tcp_max_syn_backlog(128)两个值来达到想要的效果

  • timeout

    • 默认0 永不超时
  • tcp-keepalive

    • 检查心跳时间 默认300秒,每隔300秒检测连接

4.3 GENERAL

  • daemonize

    • 是否为后台进程,设置为yes
    • 守护进程,后台启动
  • pidfile

    • 在里面保存进程号
  • loglevel 默认notice

    • 日志级别

      • debug 开发环境中,能看到更详细的信息
      • verbose 有用的信息,但不像调试级别那样混乱
      • notice 生产情况下使用
      • warning 只记录非常重要/关键的消息
  • logfile 默认为空

    • 日志输出路径

  • databases 16

    • Redis默认支持16个数据库(可以通过配置文件支持更多,无上限),可以通过配置databases来修改这一数字。

4.4 SECURITY

  • 设置密码

    • 访问密码的查看、设置和取消
    • 在命令中设置密码,只是临时的。重启redis,密码还原
    • 永久设置,需要在配置文件中进行配置

4.5 LIMIT

  • maxclients

    • 设置redis同时可以与多少个客户端连接
    • 默认 10000
    • 如果达到此限制,redis会拒绝新的请求,并且向这些连接请求放发出 max number of cilents reacher 作回应

  • maxmemory

    • 建议必须设置,否则,将内存占满,造成服务器宕机
    • 设置redis可以使用的内存量,一旦达到内存使用上限,redis将会试图移除内部数据,移除规则可以通过 maxmenory-policy来指定
  • maxmemory-samples

    • 设置样本数量,LRU算法和最小TTL算法都并非是最精确的算法,而是估算值,所以可以设置样本的大小,redis默认会检查这么多个key并选择其中LRU的那个
    • 一般设置3-7的数字,数值越小,样本越不精确,但是性能消耗越小

5. redis的发布和订阅

5.1 什么是发布和订阅?

  • redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息
  • Redis客户端可以订阅任意数量的频道

5.2 发布和订阅理解

1.客户端可以订阅频道

2.当给这个频道发布消息后,消息就会发送给订阅的客户端

5.3 发布订阅命令行实现

1.打开一个客户端订阅channel1

2.打开另一个客户端,给channel1发布消息hello

3.打开第一个客户端可以看到发送的消息

​ 第一步:客户端2

cd /usr/local/bin/redis-cli # 启动,两个客户端连接redis

​ 第二步:客户端1

SUBSCRIBE channel1 # 客户端1订阅channel1频道

​ 第三步:客户端2

publish channel1 hello # 发布消息

6. Redis6新数据类型

6.1 Bitmaps

6.1.1 简介

  • 现代计算机用二进制作为信息的基础单位,1个字节等于8位
  • Redis提供了Bitmaps这个“数据类型”,可以实现对位的操作
    • BItmaps本身不是一种数据类型,实际上它就是字符串(key-value),但是它可以对字符串的位进行操作
    • BItmaps单独提供了一套命令,所以在Redis中使用Bitmaps和使用字符串的方法不太相同。可以把Bitmaps想象成一个以位为单位的数组,数组的每个单元只能存储0和1,数组的下标在Bitmaps中叫做偏移量

6.1.2 命令

  • setbit

    • 格式:setbit<key><offset><value> 设置Bitmaps中某个偏移量的值(0或1)

      • *offset:偏移量从0开始
    • 实例:

      每个独立用户是否昂问过网站存放在Bitmaps中,将访问的用户记作1,没有访问的记作0,用偏移量作为用户的id

      设置的第offset个位的值(0开始),假设现在有20个用户,userid==1,6,11,15,19的用户对网站进行了访问,那么他们的值就变成了1

      unique:users:20201106 代表2020-11-06这天的独立访问用户的Bitmaps

    • 注意:

      • 很多应用的用户id以一个指定数字(例如1000)开头,直接将用户id和Bitmaps的偏移量对应,势必会造成一定的浪费,通常做法是每次setbit操作时将用户id减去这个指定数字
      • 在第一次初始化Bitmaps时,加入偏移量非常大,那么整个初始化过程执行会比较慢,可能造成Redis的阻塞
  • getbit

    • 格式:getbit<key><offset>获取Bitmaps中某个偏移量的值
  • bitcount

    • 统计字符串被设置成1的bit数
    • 格式:bitcount<key>[start end] 统计字符串从start 到 end字节比特值为1的数量
  • bitop

    • 格式:bitop and(or/not/xor) <destkey> [key...]

      • 复合操作,可以做多个BItmaps的交集and,并集or,非not,异或xor 操作并将结果保存在deskkey中
    • 实例:

      • 2020-11-04 访问网站的userid=1,2,5,9

      ​ 2020-11-03 访问网站的userid=0,1,4,9

      127.0.0.1:6379> setbit unique:users:20201104 1 1
      (integer) 0
      127.0.0.1:6379> setbit unique:users:20201104 2 1
      (integer) 0
      127.0.0.1:6379> setbit unique:users:20201104 5 1
      (integer) 0
      127.0.0.1:6379> setbit unique:users:20201104 9 1
      127.0.0.1:6379> setbit unique:users:20201103 0 1
      (integer) 0
      127.0.0.1:6379> setbit unique:users:20201103 1 1
      (integer) 0
      127.0.0.1:6379> setbit unique:users:20201103 4 1
      (integer) 0
      127.0.0.1:6379> setbit unique:users:20201103 9 1
      (integer) 0
      
      • 计算两天都访问过网站的用户数量

        127.0.0.1:6379> bitop and unique:users:and:20201104_03 unique:users:20201103 unique:users:20201104
        (integer) 2
        
      • 计算任意一天都访问过网站的用户数量(例如月活跃),可以使用or求并集

        xxxxxxxxxx 127.0.0.1:6379> bitop or unique:users:or:20201104_03 unique:users:20201103 unique:users:20201104
        (integer) 2
        

6.1.3 BItmaps与set做比较

​ 假设网站有1亿用户,每天独立访问的用户有5千万,如果每天用集合类型和BItmaps分别存储活跃用户可以得到表

对比
数据类型 每个用户id占用空间 需要存储的用户量 全部内存量
集合类型 64位 5000,0000 64位*5000,0000 = 400M
BItmaps 1位 1,0000,0000 1*1,0000,0000 = 12.5M

BItmaps节约空间

但是当网站每天的独立访问用户很少,例如只有10W,那么用BItmaps就不合适了。

6.2 HyperLogLog

6.2.1 简介

  • HyperLogLog只会根据输入元素来计算基数,而不会存储输入元素本身,所以HyperLogLog不能像集合那样,返回输入的各个元素
  • 基数:
    • 比如数据集{1,3,5,7,5,7,8},那么这个数据集的基数为{1,3,5,7,8},基数(不重复元素)为5。基数估计就是在误差可接受的范围内,快速计算基数
  • 解决基数的方案:
    • mysql中使用distinct count计算不重复个数
    • Redis提供的hash,set,bitmaps等数据结构来处理
    • 但是随着数据增加,导致占用空间越来越大,对于非常大的数据是不切实际的
    • 所以Redis推出HyperLogLog来降低一定的精度平衡存储空间
  • HyperLogLog是用来做基础统计的算法,优点是在输入元素的数量或者体积非常大时,计算基数所需的空间总是固定的、并且很小
  • 每个HyperLogLog键只需花费12KB内存,就可以计算解禁2^64个不同元素的基数
  • HyperLogLog只会根据输入元素来计算基数,而不会存储输入元素本身,所以HyperLogLog不会像集合那样,返回输入的各个元素

6.2.2 命令

  • pfadd

    • 格式:pfadd<key><element>[element...] 添加指定元素到HyperLogLog中,成功返回1,失败返回0

    • 实例:

      127.0.0.1:6379> pfadd program "java"
      (integer) 1
      127.0.0.1:6379> pfadd program "python"
      (integer) 1
      127.0.0.1:6379> pfadd program "java"
      (integer) 0
      127.0.0.1:6379> pfadd program "c++" "mysql"
      (integer) 1
      
  • pfcount

    • 格式:pfcount<key>[key...] 计算HHL的近似基数,可以计算多个HHL,比如用HHL存储每天的UV,计算一周的UV可以使用7天的UV合并计算即可

    • 实例:

      127.0.0.1:6379> pfcount program
      (integer) 4
      
  • pfmerge

    • 格式:pfmerge<destkey><sourcekey>[sourcekey...] 将一个或多个HLL合并后的结果存储在另一个HHL中

    • 实例:

      127.0.0.1:6379> pfadd k1 "a"
      (integer) 1
      127.0.0.1:6379> pfadd k1 "b"
      (integer) 1
      127.0.0.1:6379> pfcount k1
      (integer) 2
      127.0.0.1:6379> pfmerge k2 k1 program
      OK
      127.0.0.1:6379> pfcount k2
      (integer) 6
      

6.3 Geospatial

6.3.1 简介

  • Redis3.2 增加了对GEO类型的支持,Geospatial,地理信息的缩写。该类型,就是元素的二维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度hash等插件操作

6.3.2 命令

  • geoadd

    • 格式:geoadd<key><longitude><latitude><member>[longitude latiyude member...]

      • 添加地理位置(经度,纬度,名称)
    • 实例:

      127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
      (integer) 1
      127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing
      (integer) 3
      
    • 两级无法添加,有效经度-180度 - 180度 。有效纬度 -85.05112878 - 85.05112878

    • 坐标位置超过指定范围时,会返回一个错误值

    • 已经添加的数据,是无法再次添加的

  • geopos

    • 格式:geopops<key><member>[member...] 获得指定地区的坐标值

    • 实例:

      127.0.0.1:6379> geopos china:city shanghai shenzhen chongqing beijing
      1) 1) "121.47000163793563843"2) "31.22999903975783553"
      2) 1) "114.04999762773513794"2) "22.5200000879503861"
      3) 1) "106.49999767541885376"2) "29.52999957900659211"
      4) 1) "116.38000041246414185"2) "39.90000009167092543"
      
  • geodist

    • 格式:geodist<key><member1><member2>[m|km|ft|mi] 获取两个位置之间的直线距离

      • 单位 :m 米,km 千米,ft 英尺,mi 英里
    • 实例:

      127.0.0.1:6379> geodist china:city shanghai beijing km
      "1068.1535"
      127.0.0.1:6379> geodist china:city shanghai beijing m
      "1068153.5181"
      127.0.0.1:6379> geodist china:city shanghai beijing mi
      "663.7215"
      127.0.0.1:6379> geodist china:city shanghai beijing ft
      "3504440.6763"
      
  • georadius

    • 格式:georadius<key><longitude><latitude>radius m|km|ft|mi 给定的经纬度为中心,找出某一半径内的元素

      • 经度 纬度 距离 单位
    • 实例:

      127.0.0.1:6379> georadius china:city 110 30 1000 km # 东经110度,北纬30度,方圆1000km内的城市
      1) "chongqing"
      2) "shenzhen"
      

7. Jedis操作Redis

7.1 测试连接

  • Jedis所需要的的jar包
        <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.2.0</version></dependency>
  • 配置文件 bind 注释掉,protected-mode 改为no

  • 关闭防火墙

    systemctl status firewalld # 查看防火墙
    systemctl stop firewalld # 关闭防火墙[root@ghc1 etc]# cd /usr/local/bin
    [root@ghc1 bin]# ./redis-cli
    127.0.0.1:6379> config set protected-mode no
    OK
    
  • 测试

    public class JedisDemo1 {public static void main(String[] args) {// 创建Jedis对象Jedis jedis = new Jedis("8.130.160.173",6379);// 测试String value = jedis.ping();System.out.println(value);jedis.close();}
    }// PONG
    
  • 演示几个命令

        //操作key@Testpublic void DemoString (){Jedis jedis = new Jedis("8.130.160.173",6379);//添加jedis.set("name","zhangsan");//获取String s = jedis.get("name");System.out.println(s);//设置多个keyjedis.mset("k1","v1","k2","v2");List<String> mget = jedis.mget("k1", "k2");mget.forEach(System.out::println);Set<String> keys = jedis.keys("*");//keys *keys.forEach(System.out::println);}@Testpublic void DemoList(){Jedis jedis = new Jedis("8.130.160.173",6379);jedis.lpush("key1","zs","ls","ww");List<String> key1 = jedis.lrange("key1", 0, -1);key1.forEach(System.out::println);}@Testpublic void DemoSet(){Jedis jedis = new Jedis("8.130.160.173",6379);jedis.sadd("k","lucy","jack");jedis.smembers("k").forEach(System.out::println);}@Testpublic void DemoHash(){Jedis jedis = new Jedis("8.130.160.173",6379);jedis.hset("users","id","1");jedis.hset("users","name","jack");System.out.println(jedis.hget("users","id"));System.out.println(jedis.hget("users","name"));}@Testpublic void DemoZset(){Jedis jedis = new Jedis("8.130.160.173",6379);jedis.zadd("topn",100,"java");jedis.zadd("topn",200,"C++");jedis.zadd("topn",300,"Python");jedis.zrange("topn",0,-1).forEach(System.out::println);}
    

7.2 redis实例—手机验证码

要求:

1.输入手机号,点击发送后,随机生成6个数字码,两分钟有效

2.输入验证码,点击验证,返回成功或失败

3.每个手机号每天只能输入三次

1.生成随机6位数字Random
2.验证码2分钟有效验证码放到redis中,设置过期时间120秒
3.判断验证码是否一致从redis中获取验证码和输入的进行比较
4.每天三次验证码incr 每次发送之后+1,大于2就提示不能发送

代码:

public class PhoneCode {public static void main(String[] args) {//模拟验证码发送verifyCode("13911111111");//测试
//        checkCode("13911111111","249125");}//1.生成6位数字验证码public static String getCode() {Random random = new Random();String code = "";for (int i = 0; i < 6; i++) {int rand = random.nextInt(10);code += rand;}return code;}//2.让每个手机每天只能发送三次,验证码放到redis中,设置过期时间public static void verifyCode(String phone) {//连接redisJedis jedis = new Jedis("8.130.160.173", 6379);//拼接key//手机发送次数的keyString countKey = "VerifyCode" + phone + ":count";//验证码keyString codeKey = "VerifyCode" + phone + ":code";//每天三次String count = jedis.get(countKey);if (count == null) {//没有发送次数,第一次发送//设置发送次数是1jedis.setex(countKey, 24 * 60 * 60, "1");} else if (Integer.parseInt(count) <= 2) {//发送次数+1jedis.incr(countKey);} else if (Integer.parseInt(count) > 2) {//发送满三次,不能再发送System.out.println("今天的发送次数已经超过三次");jedis.close();}//发送的验证码 放入 redisString vCode = getCode();jedis.setex(codeKey, 120, vCode);jedis.close();return;}//3. 校验public static void checkCode(String phone, String code) {//连接redisJedis jedis = new Jedis("8.130.160.173", 6379);//验证码keyString codeKey = "VerifyCode" + phone + ":code";//得到redis验证码String redisCode = jedis.get(codeKey);//判断if (redisCode.equals(code)) {System.out.println("成功");} else {System.out.println("失败");}jedis.close();}
}

redis中获取验证码

127.0.0.1:6379> keys *
1) "VerifyCode13911111111:code"
2) "VerifyCode13911111111:count"
127.0.0.1:6379> get VerifyCode13911111111:code
"281479"

8. SpringBoot 整合Redis

  • 创建springboot工程

  • 导入依赖

            <!--redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--spring2.x集成redis所需common-pool2--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version></dependency>
    
  • application.properties配置文件

    # Redis 服务器ip地址
    spring.redis.host=8.130.160.173
    # Redis 服务器连接端口
    spring.redis.port=6379
    # Redis 数据库索引(默认0)
    spring.redis.database=0
    # 连接超时时间(ms)
    spring.redis.timeout=1800000
    # 连接池最大连接数(使用负数表示没有限制)
    spring.redis.lettuce.pool.max-active=20
    # 最大阻塞等待时间(负数表示没有限制)
    spring.redis.lettuce.pool.max-wait=-1
    # 连接池中的最大空闲连接
    spring.redis.lettuce.pool.max-idle=5
    # 连接池中的最小空闲连接
    spring.redis.lettuce.pool.min-idle=0
    
  • 测试

       @Testvoid contextLoads() {//redisTemplate//opsForValue 操作字符串 类似string类型//opsForList  操作list//...redisTemplate.opsForValue().set("k1","v1");System.out.println(redisTemplate.opsForValue().get("k1"));//出了基本操作,常用的方法可以直接通过RedisTemplate操作,比如事务和基本的CRUD//获取redis连接对象
    /*        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();connection.flushDb();connection.flushAll();*/}
    
  • 测试2

    User

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    @Component
    public class User implements Serializable { //必须序列化private String name;private Integer age;
    }
    

    Test

        @Testpublic void Test1()  {//真实的开发一般都使用json来传递对象User user = new User("张三",18);//序列化字符串
    //        String jsonUser = new ObjectMapper().writeValueAsString(user);redisTemplate.opsForValue().set("user",user);System.out.println(redisTemplate.opsForValue().get("user"));}
    

    结果:

    User(name=张三, age=18)
    
  • 自定义RedisTemplate

    @Configuration
    public class RedisConfig {//编写自己的redisTemplate@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {//为了自己开发方便,一般直接使用<String,Object>RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();template.setConnectionFactory(factory);//Jackson序列化配置Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer= new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper om = new ObjectMapper();//转义om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);//string的序列化配置StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();//配置具体的序列化方式//key采用String的序列化方式template.setKeySerializer(stringRedisSerializer);//hash的key也采用Stringtemplate.setHashKeySerializer(stringRedisSerializer);//value序列化方式采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);//hash的value序列化采用jacksontemplate.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}
    }
    
  • 测试 不会乱码

        @Testpublic void Test1() {//真实的开发一般都使用json来传递对象User user = new User("张三", 18);redisTemplate.opsForValue().set("user", user);System.out.println(redisTemplate.opsForValue().get("user"));}
    
  • 自定义util

    package com.banyaun.utils;import org.springframework.data.redis.connection.DataType;
    import org.springframework.data.redis.core.Cursor;
    import org.springframework.data.redis.core.ScanOptions;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.core.ZSetOperations.TypedTuple;import java.util.Collection;
    import java.util.Date;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    import java.util.concurrent.TimeUnit;/*** Redis工具类*/
    public class RedisUtil {private StringRedisTemplate redisTemplate;public void setRedisTemplate(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public StringRedisTemplate getRedisTemplate() {return this.redisTemplate;}/** -------------------key相关操作--------------------- *//*** 删除key** @param key*/public void delete(String key) {redisTemplate.delete(key);}/*** 批量删除key** @param keys*/public void delete(Collection<String> keys) {redisTemplate.delete(keys);}/*** 序列化key** @param key* @return*/public byte[] dump(String key) {return redisTemplate.dump(key);}/*** 是否存在key** @param key* @return*/public Boolean hasKey(String key) {return redisTemplate.hasKey(key);}/*** 设置过期时间** @param key* @param timeout* @param unit* @return*/public Boolean expire(String key, long timeout, TimeUnit unit) {return redisTemplate.expire(key, timeout, unit);}/*** 设置过期时间** @param key* @param date* @return*/public Boolean expireAt(String key, Date date) {return redisTemplate.expireAt(key, date);}/*** 查找匹配的key** @param pattern* @return*/public Set<String> keys(String pattern) {return redisTemplate.keys(pattern);}/*** 将当前数据库的 key 移动到给定的数据库 db 当中** @param key* @param dbIndex* @return*/public Boolean move(String key, int dbIndex) {return redisTemplate.move(key, dbIndex);}/*** 移除 key 的过期时间,key 将持久保持** @param key* @return*/public Boolean persist(String key) {return redisTemplate.persist(key);}/*** 返回 key 的剩余的过期时间** @param key* @param unit* @return*/public Long getExpire(String key, TimeUnit unit) {return redisTemplate.getExpire(key, unit);}/*** 返回 key 的剩余的过期时间** @param key* @return*/public Long getExpire(String key) {return redisTemplate.getExpire(key);}/*** 从当前数据库中随机返回一个 key** @return*/public String randomKey() {return redisTemplate.randomKey();}/*** 修改 key 的名称** @param oldKey* @param newKey*/public void rename(String oldKey, String newKey) {redisTemplate.rename(oldKey, newKey);}/*** 仅当 newkey 不存在时,将 oldKey 改名为 newkey** @param oldKey* @param newKey* @return*/public Boolean renameIfAbsent(String oldKey, String newKey) {return redisTemplate.renameIfAbsent(oldKey, newKey);}/*** 返回 key 所储存的值的类型** @param key* @return*/public DataType type(String key) {return redisTemplate.type(key);}/** -------------------string相关操作--------------------- *//*** 设置指定 key 的值* @param key* @param value*/public void set(String key, String value) {redisTemplate.opsForValue().set(key, value);}/*** 获取指定 key 的值* @param key* @return*/public String get(String key) {return redisTemplate.opsForValue().get(key);}/*** 返回 key 中字符串值的子字符* @param key* @param start* @param end* @return*/public String getRange(String key, long start, long end) {return redisTemplate.opsForValue().get(key, start, end);}/*** 将给定 key 的值设为 value ,并返回 key 的旧值(old value)** @param key* @param value* @return*/public String getAndSet(String key, String value) {return redisTemplate.opsForValue().getAndSet(key, value);}/*** 对 key 所储存的字符串值,获取指定偏移量上的位(bit)** @param key* @param offset* @return*/public Boolean getBit(String key, long offset) {return redisTemplate.opsForValue().getBit(key, offset);}/*** 批量获取** @param keys* @return*/public List<String> multiGet(Collection<String> keys) {return redisTemplate.opsForValue().multiGet(keys);}/*** 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value** @param key 位置* @param value*            值,true为1, false为0* @return*/public boolean setBit(String key, long offset, boolean value) {return redisTemplate.opsForValue().setBit(key, offset, value);}/*** 将值 value 关联到 key ,并将 key 的过期时间设为 timeout** @param key* @param value* @param timeout*            过期时间* @param unit*            时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES*            秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS*/public void setEx(String key, String value, long timeout, TimeUnit unit) {redisTemplate.opsForValue().set(key, value, timeout, unit);}/*** 只有在 key 不存在时设置 key 的值** @param key* @param value* @return 之前已经存在返回false,不存在返回true*/public boolean setIfAbsent(String key, String value) {return redisTemplate.opsForValue().setIfAbsent(key, value);}/*** 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始** @param key* @param value* @param offset*            从指定位置开始覆写*/public void setRange(String key, String value, long offset) {redisTemplate.opsForValue().set(key, value, offset);}/*** 获取字符串的长度** @param key* @return*/public Long size(String key) {return redisTemplate.opsForValue().size(key);}/*** 批量添加** @param maps*/public void multiSet(Map<String, String> maps) {redisTemplate.opsForValue().multiSet(maps);}/*** 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在** @param maps* @return 之前已经存在返回false,不存在返回true*/public boolean multiSetIfAbsent(Map<String, String> maps) {return redisTemplate.opsForValue().multiSetIfAbsent(maps);}/*** 增加(自增长), 负数则为自减** @param key* @return*/public Long incrBy(String key, long increment) {return redisTemplate.opsForValue().increment(key, increment);}/**** @param key* @return*/public Double incrByFloat(String key, double increment) {return redisTemplate.opsForValue().increment(key, increment);}/*** 追加到末尾** @param key* @param value* @return*/public Integer append(String key, String value) {return redisTemplate.opsForValue().append(key, value);}/** -------------------hash相关操作------------------------- *//*** 获取存储在哈希表中指定字段的值** @param key* @param field* @return*/public Object hGet(String key, String field) {return redisTemplate.opsForHash().get(key, field);}/*** 获取所有给定字段的值** @param key* @return*/public Map<Object, Object> hGetAll(String key) {return redisTemplate.opsForHash().entries(key);}/*** 获取所有给定字段的值** @param key* @param fields* @return*/public List<Object> hMultiGet(String key, Collection<Object> fields) {return redisTemplate.opsForHash().multiGet(key, fields);}public void hPut(String key, String hashKey, String value) {redisTemplate.opsForHash().put(key, hashKey, value);}public void hPutAll(String key, Map<String, String> maps) {redisTemplate.opsForHash().putAll(key, maps);}/*** 仅当hashKey不存在时才设置** @param key* @param hashKey* @param value* @return*/public Boolean hPutIfAbsent(String key, String hashKey, String value) {return redisTemplate.opsForHash().putIfAbsent(key, hashKey, value);}/*** 删除一个或多个哈希表字段** @param key* @param fields* @return*/public Long hDelete(String key, Object... fields) {return redisTemplate.opsForHash().delete(key, fields);}/*** 查看哈希表 key 中,指定的字段是否存在** @param key* @param field* @return*/public boolean hExists(String key, String field) {return redisTemplate.opsForHash().hasKey(key, field);}/*** 为哈希表 key 中的指定字段的整数值加上增量 increment** @param key* @param field* @param increment* @return*/public Long hIncrBy(String key, Object field, long increment) {return redisTemplate.opsForHash().increment(key, field, increment);}/*** 为哈希表 key 中的指定字段的整数值加上增量 increment** @param key* @param field* @param delta* @return*/public Double hIncrByFloat(String key, Object field, double delta) {return redisTemplate.opsForHash().increment(key, field, delta);}/*** 获取所有哈希表中的字段** @param key* @return*/public Set<Object> hKeys(String key) {return redisTemplate.opsForHash().keys(key);}/*** 获取哈希表中字段的数量** @param key* @return*/public Long hSize(String key) {return redisTemplate.opsForHash().size(key);}/*** 获取哈希表中所有值** @param key* @return*/public List<Object> hValues(String key) {return redisTemplate.opsForHash().values(key);}/*** 迭代哈希表中的键值对** @param key* @param options* @return*/public Cursor<Entry<Object, Object>> hScan(String key, ScanOptions options) {return redisTemplate.opsForHash().scan(key, options);}/** ------------------------list相关操作---------------------------- *//*** 通过索引获取列表中的元素** @param key* @param index* @return*/public String lIndex(String key, long index) {return redisTemplate.opsForList().index(key, index);}/*** 获取列表指定范围内的元素** @param key* @param start*            开始位置, 0是开始位置* @param end*            结束位置, -1返回所有* @return*/public List<String> lRange(String key, long start, long end) {return redisTemplate.opsForList().range(key, start, end);}/*** 存储在list头部** @param key* @param value* @return*/public Long lLeftPush(String key, String value) {return redisTemplate.opsForList().leftPush(key, value);}/**** @param key* @param value* @return*/public Long lLeftPushAll(String key, String... value) {return redisTemplate.opsForList().leftPushAll(key, value);}/**** @param key* @param value* @return*/public Long lLeftPushAll(String key, Collection<String> value) {return redisTemplate.opsForList().leftPushAll(key, value);}/*** 当list存在的时候才加入** @param key* @param value* @return*/public Long lLeftPushIfPresent(String key, String value) {return redisTemplate.opsForList().leftPushIfPresent(key, value);}/*** 如果pivot存在,再pivot前面添加** @param key* @param pivot* @param value* @return*/public Long lLeftPush(String key, String pivot, String value) {return redisTemplate.opsForList().leftPush(key, pivot, value);}/**** @param key* @param value* @return*/public Long lRightPush(String key, String value) {return redisTemplate.opsForList().rightPush(key, value);}/**** @param key* @param value* @return*/public Long lRightPushAll(String key, String... value) {return redisTemplate.opsForList().rightPushAll(key, value);}/**** @param key* @param value* @return*/public Long lRightPushAll(String key, Collection<String> value) {return redisTemplate.opsForList().rightPushAll(key, value);}/*** 为已存在的列表添加值** @param key* @param value* @return*/public Long lRightPushIfPresent(String key, String value) {return redisTemplate.opsForList().rightPushIfPresent(key, value);}/*** 在pivot元素的右边添加值** @param key* @param pivot* @param value* @return*/public Long lRightPush(String key, String pivot, String value) {return redisTemplate.opsForList().rightPush(key, pivot, value);}/*** 通过索引设置列表元素的值** @param key* @param index*            位置* @param value*/public void lSet(String key, long index, String value) {redisTemplate.opsForList().set(key, index, value);}/*** 移出并获取列表的第一个元素** @param key* @return 删除的元素*/public String lLeftPop(String key) {return redisTemplate.opsForList().leftPop(key);}/*** 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止** @param key* @param timeout*            等待时间* @param unit*            时间单位* @return*/public String lBLeftPop(String key, long timeout, TimeUnit unit) {return redisTemplate.opsForList().leftPop(key, timeout, unit);}/*** 移除并获取列表最后一个元素** @param key* @return 删除的元素*/public String lRightPop(String key) {return redisTemplate.opsForList().rightPop(key);}/*** 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止** @param key* @param timeout*            等待时间* @param unit*            时间单位* @return*/public String lBRightPop(String key, long timeout, TimeUnit unit) {return redisTemplate.opsForList().rightPop(key, timeout, unit);}/*** 移除列表的最后一个元素,并将该元素添加到另一个列表并返回** @param sourceKey* @param destinationKey* @return*/public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey,destinationKey);}/*** 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止** @param sourceKey* @param destinationKey* @param timeout* @param unit* @return*/public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,long timeout, TimeUnit unit) {return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey,destinationKey, timeout, unit);}/*** 删除集合中值等于value得元素** @param key* @param index*            index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;*            index<0, 从尾部开始删除第一个值等于value的元素;* @param value* @return*/public Long lRemove(String key, long index, String value) {return redisTemplate.opsForList().remove(key, index, value);}/*** 裁剪list** @param key* @param start* @param end*/public void lTrim(String key, long start, long end) {redisTemplate.opsForList().trim(key, start, end);}/*** 获取列表长度** @param key* @return*/public Long lLen(String key) {return redisTemplate.opsForList().size(key);}/** --------------------set相关操作-------------------------- *//*** set添加元素** @param key* @param values* @return*/public Long sAdd(String key, String... values) {return redisTemplate.opsForSet().add(key, values);}/*** set移除元素** @param key* @param values* @return*/public Long sRemove(String key, Object... values) {return redisTemplate.opsForSet().remove(key, values);}/*** 移除并返回集合的一个随机元素** @param key* @return*/public String sPop(String key) {return redisTemplate.opsForSet().pop(key);}/*** 将元素value从一个集合移到另一个集合** @param key* @param value* @param destKey* @return*/public Boolean sMove(String key, String value, String destKey) {return redisTemplate.opsForSet().move(key, value, destKey);}/*** 获取集合的大小** @param key* @return*/public Long sSize(String key) {return redisTemplate.opsForSet().size(key);}/*** 判断集合是否包含value** @param key* @param value* @return*/public Boolean sIsMember(String key, Object value) {return redisTemplate.opsForSet().isMember(key, value);}/*** 获取两个集合的交集** @param key* @param otherKey* @return*/public Set<String> sIntersect(String key, String otherKey) {return redisTemplate.opsForSet().intersect(key, otherKey);}/*** 获取key集合与多个集合的交集** @param key* @param otherKeys* @return*/public Set<String> sIntersect(String key, Collection<String> otherKeys) {return redisTemplate.opsForSet().intersect(key, otherKeys);}/*** key集合与otherKey集合的交集存储到destKey集合中** @param key* @param otherKey* @param destKey* @return*/public Long sIntersectAndStore(String key, String otherKey, String destKey) {return redisTemplate.opsForSet().intersectAndStore(key, otherKey,destKey);}/*** key集合与多个集合的交集存储到destKey集合中** @param key* @param otherKeys* @param destKey* @return*/public Long sIntersectAndStore(String key, Collection<String> otherKeys,String destKey) {return redisTemplate.opsForSet().intersectAndStore(key, otherKeys,destKey);}/*** 获取两个集合的并集** @param key* @param otherKeys* @return*/public Set<String> sUnion(String key, String otherKeys) {return redisTemplate.opsForSet().union(key, otherKeys);}/*** 获取key集合与多个集合的并集** @param key* @param otherKeys* @return*/public Set<String> sUnion(String key, Collection<String> otherKeys) {return redisTemplate.opsForSet().union(key, otherKeys);}/*** key集合与otherKey集合的并集存储到destKey中** @param key* @param otherKey* @param destKey* @return*/public Long sUnionAndStore(String key, String otherKey, String destKey) {return redisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);}/*** key集合与多个集合的并集存储到destKey中** @param key* @param otherKeys* @param destKey* @return*/public Long sUnionAndStore(String key, Collection<String> otherKeys,String destKey) {return redisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);}/*** 获取两个集合的差集** @param key* @param otherKey* @return*/public Set<String> sDifference(String key, String otherKey) {return redisTemplate.opsForSet().difference(key, otherKey);}/*** 获取key集合与多个集合的差集** @param key* @param otherKeys* @return*/public Set<String> sDifference(String key, Collection<String> otherKeys) {return redisTemplate.opsForSet().difference(key, otherKeys);}/*** key集合与otherKey集合的差集存储到destKey中** @param key* @param otherKey* @param destKey* @return*/public Long sDifference(String key, String otherKey, String destKey) {return redisTemplate.opsForSet().differenceAndStore(key, otherKey,destKey);}/*** key集合与多个集合的差集存储到destKey中** @param key* @param otherKeys* @param destKey* @return*/public Long sDifference(String key, Collection<String> otherKeys,String destKey) {return redisTemplate.opsForSet().differenceAndStore(key, otherKeys,destKey);}/*** 获取集合所有元素** @param key* @return*/public Set<String> setMembers(String key) {return redisTemplate.opsForSet().members(key);}/*** 随机获取集合中的一个元素** @param key* @return*/public String sRandomMember(String key) {return redisTemplate.opsForSet().randomMember(key);}/*** 随机获取集合中count个元素** @param key* @param count* @return*/public List<String> sRandomMembers(String key, long count) {return redisTemplate.opsForSet().randomMembers(key, count);}/*** 随机获取集合中count个元素并且去除重复的** @param key* @param count* @return*/public Set<String> sDistinctRandomMembers(String key, long count) {return redisTemplate.opsForSet().distinctRandomMembers(key, count);}/**** @param key* @param options* @return*/public Cursor<String> sScan(String key, ScanOptions options) {return redisTemplate.opsForSet().scan(key, options);}/**------------------zSet相关操作--------------------------------*//*** 添加元素,有序集合是按照元素的score值由小到大排列** @param key* @param value* @param score* @return*/public Boolean zAdd(String key, String value, double score) {return redisTemplate.opsForZSet().add(key, value, score);}/**** @param key* @param values* @return*/public Long zAdd(String key, Set<TypedTuple<String>> values) {return redisTemplate.opsForZSet().add(key, values);}/**** @param key* @param values* @return*/public Long zRemove(String key, Object... values) {return redisTemplate.opsForZSet().remove(key, values);}/*** 增加元素的score值,并返回增加后的值** @param key* @param value* @param delta* @return*/public Double zIncrementScore(String key, String value, double delta) {return redisTemplate.opsForZSet().incrementScore(key, value, delta);}/*** 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列** @param key* @param value* @return 0表示第一位*/public Long zRank(String key, Object value) {return redisTemplate.opsForZSet().rank(key, value);}/*** 返回元素在集合的排名,按元素的score值由大到小排列** @param key* @param value* @return*/public Long zReverseRank(String key, Object value) {return redisTemplate.opsForZSet().reverseRank(key, value);}/*** 获取集合的元素, 从小到大排序** @param key* @param start*            开始位置* @param end*            结束位置, -1查询所有* @return*/public Set<String> zRange(String key, long start, long end) {return redisTemplate.opsForZSet().range(key, start, end);}/*** 获取集合元素, 并且把score值也获取** @param key* @param start* @param end* @return*/public Set<TypedTuple<String>> zRangeWithScores(String key, long start,long end) {return redisTemplate.opsForZSet().rangeWithScores(key, start, end);}/*** 根据Score值查询集合元素** @param key* @param min*            最小值* @param max*            最大值* @return*/public Set<String> zRangeByScore(String key, double min, double max) {return redisTemplate.opsForZSet().rangeByScore(key, min, max);}/*** 根据Score值查询集合元素, 从小到大排序** @param key* @param min*            最小值* @param max*            最大值* @return*/public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,double min, double max) {return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);}/**** @param key* @param min* @param max* @param start* @param end* @return*/public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,double min, double max, long start, long end) {return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,start, end);}/*** 获取集合的元素, 从大到小排序** @param key* @param start* @param end* @return*/public Set<String> zReverseRange(String key, long start, long end) {return redisTemplate.opsForZSet().reverseRange(key, start, end);}/*** 获取集合的元素, 从大到小排序, 并返回score值** @param key* @param start* @param end* @return*/public Set<TypedTuple<String>> zReverseRangeWithScores(String key,long start, long end) {return redisTemplate.opsForZSet().reverseRangeWithScores(key, start,end);}/*** 根据Score值查询集合元素, 从大到小排序** @param key* @param min* @param max* @return*/public Set<String> zReverseRangeByScore(String key, double min,double max) {return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max);}/*** 根据Score值查询集合元素, 从大到小排序** @param key* @param min* @param max* @return*/public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(String key, double min, double max) {return redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,min, max);}/**** @param key* @param min* @param max* @param start* @param end* @return*/public Set<String> zReverseRangeByScore(String key, double min,double max, long start, long end) {return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max,start, end);}/*** 根据score值获取集合元素数量** @param key* @param min* @param max* @return*/public Long zCount(String key, double min, double max) {return redisTemplate.opsForZSet().count(key, min, max);}/*** 获取集合大小** @param key* @return*/public Long zSize(String key) {return redisTemplate.opsForZSet().size(key);}/*** 获取集合大小** @param key* @return*/public Long zZCard(String key) {return redisTemplate.opsForZSet().zCard(key);}/*** 获取集合中value元素的score值** @param key* @param value* @return*/public Double zScore(String key, Object value) {return redisTemplate.opsForZSet().score(key, value);}/*** 移除指定索引位置的成员** @param key* @param start* @param end* @return*/public Long zRemoveRange(String key, long start, long end) {return redisTemplate.opsForZSet().removeRange(key, start, end);}/*** 根据指定的score值的范围来移除成员** @param key* @param min* @param max* @return*/public Long zRemoveRangeByScore(String key, double min, double max) {return redisTemplate.opsForZSet().removeRangeByScore(key, min, max);}/*** 获取key和otherKey的并集并存储在destKey中** @param key* @param otherKey* @param destKey* @return*/public Long zUnionAndStore(String key, String otherKey, String destKey) {return redisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);}/**** @param key* @param otherKeys* @param destKey* @return*/public Long zUnionAndStore(String key, Collection<String> otherKeys,String destKey) {return redisTemplate.opsForZSet().unionAndStore(key, otherKeys, destKey);}/*** 交集** @param key* @param otherKey* @param destKey* @return*/public Long zIntersectAndStore(String key, String otherKey,String destKey) {return redisTemplate.opsForZSet().intersectAndStore(key, otherKey,destKey);}/*** 交集** @param key* @param otherKeys* @param destKey* @return*/public Long zIntersectAndStore(String key, Collection<String> otherKeys,String destKey) {return redisTemplate.opsForZSet().intersectAndStore(key, otherKeys,destKey);}/**** @param key* @param options* @return*/public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {return redisTemplate.opsForZSet().scan(key, options);}
    }
    
  • 测试

        @Autowired@Resourceprivate RedisUtil redisUtil;@Testpublic void test2(){redisUtil.set("k1","v1");System.out.println(redisUtil.get("k1"));}
    

9. Redis6的实务操作

9.1 Redis的事务定义

  • Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。
  • 事务在执行的过程中,不会被其他客户端发送来的请求命令所打断
  • Redis事务的主要作用就是串联多个命令,防止别的命令插队

9.2 Multi、Exec、discard

  • 从输入Multi命令开始,输入的命令都会以此进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令一次执行

  • 组队的过程中可以通过discard命令来放弃组队

  • 测试

    127.0.0.1:6379> multi # 开启事务
    OK
    127.0.0.1:6379(TX)> set k1 v1 # 组队,等待执行
    QUEUED
    127.0.0.1:6379(TX)> set k2 v2
    QUEUED
    127.0.0.1:6379(TX)> exec  # 执行
    1) OK
    2) OK
    
    127.0.0.1:6379(TX)> set a1 v1
    QUEUED
    127.0.0.1:6379(TX)> set a2 v2
    QUEUED
    127.0.0.1:6379(TX)> discard # 放弃组队
    OK
    

9.3 事务的错误处理

  • 组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消

    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379(TX)> set b1 v1
    QUEUED
    127.0.0.1:6379(TX)> set b2 v2
    QUEUED
    127.0.0.1:6379(TX)> set b3 # 错误
    (error) ERR wrong number of arguments for 'set' command
    127.0.0.1:6379(TX)> exec
    (error) EXECABORT Transaction discarded because of previous errors. # 失败
    
  • 如果执行阶段某个命令报错,则只有报错的命令不会被执行,其他的命令都会执行,不会回滚

127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set c1 v1
QUEUED
127.0.0.1:6379(TX)> incr c1
QUEUED
127.0.0.1:6379(TX)> set c2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) (error) ERR value is not an integer or out of range # 错误
3) OK

9.4 事务冲突的问题

  • 例子:

    • 一共10000元
    • 一个请求想给金额减8000
    • 一个请求想给金额减5000
    • 一个请求想给金额减1000

    用锁解决

9.4.1 悲观锁

  • 悲观锁,顾名思义就是很悲观,每次去拿数据的时候都认为别会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block,知道它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁,读、写锁等。都是在做操作之前先上锁。

9.4.2 乐观锁

  • 每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的场景,这样可以提高吞吐量。Redis就是利用这种 check-and-set 机制实现事务的。

9.4.3 WATCH key[key…]

  • 在执行multi之前,先执行watch key1[key2],可以监视一个或多个key,如果在事务执行之前,key被其他命令所改动,那么事务就被打断。

  • 演示:

    ​ 打开两个终端

    ​ 终端1:

    set balance 100
    watch balance # 监视key
    multi
    incrby balance 10
    exec
    110 # 输出
    

    ​ 终端2:

    watch balance # 监视key
    multi
    incrby balance 20
    exec
    (nil) # 空
    
  • unwatch 取消监视

9.5 Redis事务三特性

不是mysql中的ACID!!!

  • 单独的隔离操作

    • 事务中所有的命令都会序列化、按顺序执行,事务在执行的过程中,不会被其他客户端发送来的命令请求所打断
  • 没有隔离级别的概念
    • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
  • 不保证原子性
    • 事务中如果有一条命令执行失败,其他的命令仍然会被执行,没有回滚

10. Redis持久化操作

  • Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘中,那么一旦服务器进程退出,服务器中的数据库状态也会消失,所以Redis提供了持久化功能

  • Redis提供了两个不同形式的持久化方式

    • RDB(Redis DataBase)
    • AOF(Append Of File)

10.1 RDB

  • RDB简介:

    • 在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话将的Snapshot快照,它恢复时是将快照文件直接读到内存里
    • RDB保存的文件是dump.rdb
  • 备份是如何执行的

    • Redis会单独创建(fork)一个子进程来进行持久化,会将数据入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能,如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF更高效。RDB的缺点是最后一次持久化后的数据可能丢失

    • Fork

      • 作用:复制一个与当前进程一样的进程,所谓原进程的子进程
      • 在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后都会exec系统调用,处于效率考虑,Linux中引入了“写时复制技术
      • 一般情况,父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程

    10.1.1 配置文件

    • dump.rdb文件

      • 在redis.conf中配置文件名称,默认为dump.rdb

  • stop-writes-on-bgsave-error yes

    • 当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes

  • rdbcompression yes

    • 对于存储到磁盘中的快照,可以设置是否进行压缩存储,如果是的话,redis会采用Redis算法进行压缩

    • 如果不想消耗cpu来进程压缩,可以设置为no,推荐yes

  • rdbchecksum yes

    • 存储快照后,让redis使用CRC64算法来进行数据校验

    • 这样会增加约10%的性能消耗,推荐yes

  • Save

    • 格式:save 秒钟,写操作次数

    • RDB是整个内存的压缩过的snapshot,RDB的数据结构,可以配置复合的快照触发条件,

      默认是一分钟内改了一万次,或五分钟改了十次,或十五分钟改了一次

10.1.2 操作机制

操作案例:

  • bin 目录下会生成一个dump.rdb文件
  • 输入数据后,关闭redis
  • 开启redis,扔能get到数据

触发机制:

  • save的规则满足的情况下,会自动触发rdb规则

  • 执行flushall,也会出发rdb规则

  • 退出redis,也会产生rdb文件

  • 备份就会自动生成dumo.rdb文件

如何恢复RDB文件

  • 只需将rdb文件放在我们redis启动目录就可以,redis启动时会自动检查dump.rdb文件,恢复其中的数据

  • 查看需要存放的位置

    127.0.0.1:6379> config get dir
    1) "dir"
    2) "/usr/local/bin"  # 如果目录下存在dump.rdb文件,启动就会自动恢复
    

10.1.3 优缺点

  • 优点

    • 适合大规模的数据恢复!
    • 对数据完整性要求不高
  • 缺点
    • 需要一定的时间间隔进行操作
    • 如果Redis意外宕机,最后一次的修改的数据就没了
    • fork一条进程的时候,会占用一定的内存空间

10.2 AOF

  • 以日志的形式记录服务器所处理的每一个写、删除操作,查询操作不会记录,只许追加文件,不可以改写文件。以文本的方式记录,可以打开文件看到详细的操作记录。恢复的时候就把这个文件全部再执行一边

  • redis启动之初会读取改文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

  • AOF保存的问件事 appendonly.aof文件

10.2.1 配置文件

  • 默认不开启,开启 appendonly yes

  • appendfsync everysec (关闭就no)

  • no-appendfsync-on-rewrite no 是否重写,默认no,保证数据安全性

  • 如果aof文件大于 64mb,这是会fork一个新的进程,来将文件进行重写

  • aof默认就是文件的无限追加,文件会越来越大

10.2.2 操作

  • 开启 appendonly yes
  • 关闭redis后再打开
  • bin目录下会产生appendonly.aof文件
  • 记录着写,删操作

10.3 AOF修复

  • 如果这时aof文件有错误,这时候redis是启动不起来的,我们需要修复aof文件
  • redis提供一个工具redis-check-aof --fix appendonly.aof

10.4 优缺点

  • 优点

    • 每一次修改都同步,文件完整性会更加好
    • 每秒同步一次,可能会丢失一秒的数据
    • 从不同步,效率最高
  • 缺点
    • 相对于数据文件,AOF远远大于RDB,修复速度也比RDB慢
    • AOF运行效率比RDB慢,所以Redis默认持久化操作时RDB

10.3 RDB和AOF扩展

  • RDB持久化方式能够在指定的时间间隔内对你的数据进行快照存储
  • AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大
  • 只做缓存,如果你希望你的数据在服务器运行的时候存在,你也可以不适用任何持久化
  • 同时开启两种持久化方式
    • 这种情况下,当redis重启会优先载入AOF文件来恢复原始的数据,因为通常AOF保存的数据完整性要更高
    • RDB的数据不实时,同时使用两者时,服务器重启也只会找AOF文件,但是不建议只是用AOF,因为RDB更适合备份数据库,AOF不断变化不好备份,快速重启,而且不会有AOF可能潜在的bug
  • 性能建议
    • 因为RDB文件只用作后备用途,建议只在Salve上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1 这条规则
    • 如果Enable AOF,好处是在最恶劣的情况下也只会丢失不超过两秒的数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值是64M,太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
    • 如果不 Enable AOF,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个中的RDB文件,载入较新的那个,微博就是这个架构

RDB和AOF区别:

  • 转:https://www.cnblogs.com/zxs117/p/11242026.html

11. 主从复制

  • 简介

    • 主机数据更新后,根据配置和策略,自动同步到备机的 master/slaver 机制,Master以写为主,Slave以读为主
    • 一主多从
  • 作用

    • 读写分离,性能扩展(主,写操作、从,读操作)
    • 容灾快速恢复

11.1 搭建一主多从

# 1.根目录创建myredis文件,进入
[root@ghc1 ~]# mkdir myredis
[root@ghc1 ~]# cd myredis/# 2.复制redis.conf,到myredis中
# redis.conf 中 daemonize yes/Appendonly no
[root@ghc1 myredis]# cp /etc/redis.conf /myredis/redis.conf

3.创建3个配置文件,配置一主两从

​ redis6379.conf

​ redis6380.conf

​ redis6381.conf

4.在三个配置文件中写入内容

[root@ghc1 myredis]# vi redis6379.conf
# 配置文件中写入
include /root/myredis/redis.conf  # 引入公共部分
pidfile /var/run/redis_6379.pid  # pid文件位置
port 6379  # 端口号
dbfilename dump6379.rdb  # rdb文件名称# 同样方式创建6380 6381

5.启动三台redis

[root@ghc1 myredis]# redis-server redis6379.conf
[root@ghc1 myredis]# redis-server redis6380.conf
[root@ghc1 myredis]# redis-server redis6381.conf # 查看三台主机的运行情况
127.0.0.1:6379> info replication
# Replication
role:master # 主机
connected_slaves:0 # 从服务器0
master_failover_state:no-failover
master_replid:37b8b7fc9b8f40b3c16a2d900a9a22e9e12631ab
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

6.配从库 不配主库

​ 命令slaveof ip

​ 在6380和6381上执行:slaveof 127.0.0.1 6379

[root@ghc1 myredis]# redis-cli -p 6380
127.0.0.1:6380> slaveof 127.0.0.1 6379
OK
[root@ghc1 myredis]# redis-cli -p 6381
127.0.0.1:6381> slaveof 127.0.0.1 6379
OK

7.演示

# 6379
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> keys *
1) "k1"# 6380
127.0.0.1:6380> keys * # 从服务器可以读
1) "k1"
127.0.0.1:6380> set v2 k2
(error) READONLY You can't write against a read only replica. # 只能读不能写
# 6381 同6382

11.2 复制原理之一主两从

演示:

127.0.0.1:6381> shutdown
# 6381 挂掉后 在6379主服务器中添加数据
127.0.0.1:6379> set k2 v2
OK
127.0.0.1:6379> set ke v3
OK# 6380仍能读到
127.0.0.1:6380> keys *
1) "k2"
2) "k1"
3) "ke"# 重启6381后,6381只能读到k1
127.0.0.1:6381> keys *
1) "k1"
  • 从服务器挂掉后,重启后,6381 变成了master主服务器
# 再次将6381变成6379从服务器 再次查看
127.0.0.1:6381> slaveof 127.0.0.1 6379
OK
127.0.0.1:6391> keys *
1) "k2"
2) "k1"
3) "ke"
  • 从服务器重新设置之后,会从头开始复制

  • 当主服务器挂掉之后,6380,6381仍然是从服务器

原理:

  • 从服务器连接数主服务器之后,会向主服务器发送进行数据同步的消息
  • 主服务器接到同步的消息之后,会把主服务器数据进行持久化(rdb文件),把rdb文件发送给从服务器,从服务器拿到rdb文件,进行读取
  • 每次主服务器进行写操作之后,就会和从服务器进行数据的同步
  • 全量复制
    • slave在接收到数据库文件数据后,将其存盘并加载到内存中
  • 增量复制
    • Master继续将新的所有收集到的修改命令一次传给slave,完成同步
  • 只要是重新连接master,一次完全同步(全量复制)将被自动执行

11.3 复制原理之薪火相传

缺点:在薪火相传的模式中,假如从服务器挂掉,后面的从服务器就无法同步了

演示:

6380 --> 6379

6381 -->6380

此时6379的从机就是6380,一个从服务器下可以挂多个从服务器

主服务器挂掉,从服务器仍然是从服务器

主服务器启动,依然是主服务

11.4 复制原理之反客为主

当主服务挂掉后,下一台从服务器变成主服务器,后面的从服务器不做任何修改

命令:slaveof no one 从服务器变为主服务器

缺点:需要手动完成

11.5 哨兵模式

  • 反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库换位主库

测试:

  • 调整为一主二仆模式,6379带着6380/6381

  • 自定义的 /myredis 目录下新建 sentine.conf 文件

  • 配置哨兵,填写内容

    • sentinel monitor mymaster 127.0.0.1 6379 1
    • sentinel 监控 monitor 哨兵 mymaster 为监控对象起的服务器名称 + 主机ip 端口号 + 数字
    • 数字:表示至少有多少个哨兵统一迁移的数量
  • 启动哨兵

    • redis-sentinel /myredis/sentinel.conf
    [root@ghc1 myredis]# redis-sentinel sentinel.conf
    

  • shutdown掉6379

    127.0.0.1:6379> shutdown
    

    观察哨兵日志

  • 重启 6379 后,6379变成了从服务器

    [root@ghc1 myredis]# redis-server redis6379.conf
    [root@ghc1 myredis]# redis-cli -p 6379
    127.0.0.1:6379> info replication
    # Replication
    role:slave   # 重启后变为了6381的从服务器
    master_host:127.0.0.1
    master_port:6381
    ......
    

11.5.1 总结

  • 从下线的主服务器的所有从服务器里面挑选一个从服务器,将其转换为主服务器

    • 选择优先级靠前的
    • 选择偏移量最大的
    • 选择runid最小的从服务器
  • 挑选出新的主服务器之后,sentinel 向原主服务器的从服务器发送 slaveof 新主服务器的命令,复制新master

  • 当已下线的服务重新上线时,sentinel 会向其发送 salveof 命令,让其称为新主的从

  • 优先级在 redis.conf 中默认: replica-priority 100,值越小优先级越高

  • 偏移量是指获得原主机数据最全的

  • 每个redis实例启动后都会随机生成一个40位的runid

11.6 缺点:复制延时

  • 由于所有的写操作都是现在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重

12. Redis 集群

  • 问题

    • 容量够,redis如何进行扩容?

    • 并发写操作,redis如何分摊?

    • 主从模式,薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息

    • 之前通过代理主机来解决,但是 redis3.0 中提供了解决方案,就是无中心化集群配置

      • 无中心化集群:

12.1 简介

  • Redis集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N
  • Redis集群通过分区(partition)来提供一定程度的可用性(availability):即是集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求

12.2 搭建Redis集群

  • 删除rdb文件

    [root@ghc1 myredis]# ls
    backup.db     dump6380.rdb  redis6379.conf  redis6381.conf  sentinel.conf
    dump6379.rdb  dump6381.rdb  redis6380.conf  redis.conf
    [root@ghc1 myredis]# rm -rf dump63*
    [root@ghc1 myredis]# ls
    backup.db  redis6379.conf  redis6380.conf  redis6381.conf  redis.conf  sentinel.conf
    
  • 制作6个实例,6379(主),6380(主),6381(主),6389(从),6390(从),6391(从)

    cluster-enabled yes # 打开集群模式
    cluster-config-file nodes-6379.conf # 设置节点配置文件名
    cluster-node-timeout 15000# 设定节点失联事件,超过改时间(毫秒),集群自动进行主从切换

    # 6379 配置信息
    [root@ghc1 myredis]# vi redis6379.conf include /root/myredis/redis.conf
    pidfile "/var/run/redis_6379.pid"
    port 6379
    dbfilename "dump6379.rdb"
    cluster-enabled yes   # 打开集群模式
    cluster-config-file nodes-6379.conf # 设置节点配置文件名
    cluster-node-timeout 15000 # 设定节点失联事件,超过改时间(毫秒),集群自动进行主从切换#复制剩余文件
    [root@ghc1 myredis]# cp redis6379.conf redis6380.conf
    [root@ghc1 myredis]# cp redis6379.conf redis6381.conf
    [root@ghc1 myredis]# cp redis6379.conf redis6389.conf
    [root@ghc1 myredis]# cp redis6379.conf redis6390.conf
    [root@ghc1 myredis]# cp redis6379.conf redis6391.conf
    [root@ghc1 myredis]# vi redis6380.conf
    :%s/6379/6380      # 替换
    # 其余同样操作
    
  • 启动6个redis

    [root@ghc1 myredis]# redis-server redis6379.conf
    ....
    
  • 将六个节点合并成一个集群

    # 先进入redis的src文件下
    [root@ghc1 myredis]# cd /opt
    [root@ghc1 opt]# ls
    containerd  redis-6.2.5  redis-6.2.5.tar.gz  rh
    [root@ghc1 opt]# cd redis-6.2.5
    [root@ghc1 redis-6.2.5]# cd src/# 合并
    redis-cli --cluster create --cluster-replicas 1 8.130.160.173:6379 8.130.160.173:6380 8.130.160.173:6381 8.130.160.173:6389 8.130.160.173:6390 8.130.160.173:6391yesredis-cli -c -p 6379 # 启动
    cluster nodes # 查看集群信息
    • replicas:一台从机

12.3 集群操作和故障修复

  • redis cluster 如何分配这六个节点?

    • 一个集群至少有三个主节点
    • 选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点
    • 分配原则尽量保证每个主数据库运行在不同的ip地址,每个从库和主库不在一个ip地址上
  • slots 是什么?

    • 集群信息中:[OK] All 16384 slots covered

    • 一个 Redis 集群包含 16384 个插槽(hash slot),数据库中的每个键都属于这 16384 个插槽中的其中一个

    • 集群使用公式 CRC16(key)%16384 来计算键key属于哪个槽,其中 CRC16(key) 语句用于计算键key的CRC16校验和

    • 集群中的每个节点负责处理一部分插槽。举个例子,如果一个集群可以有主节点,其中:

      • 节点A 负责处理 0 - 5460 号插槽
      • 节点B 负责处理 5461 - 10922 号插槽
      • 节点C 负责处理 10923 - 16383 号插槽
    • 录入值时:会计算插槽的位置,放到合适的节点

  • 在集群中录入值

    • set<key><value
    • mset <key>{xxx}<value>
    redis-cli -c -p 6379 # 进入6379
    set k1 v1
    -> Redirected to slot [12706] located at 8.130.160.173:6381 # 进入了6381# 插入多个值
    mset name lucy age 20 address china
    (error) ...  # 报错,有个多无法计算插槽# 插入多个值要用组的形式  组名:{xxx}
    mset name{user} lucy age{user} 20 address{user} china
    
  • 查询集群中的值

    • cluster keyslot<key> 查询key的槽值
    • clusteraccountkeyslot<slot> 查看自己插槽中的值(interge)
    • cluster getkeysinslot<slot><count>返回count个slot槽中的键
  • 故障恢复

    • 如果主节点下线?从节点能否自动升为主节点?注意:15秒超时

      • 比如 down掉6379,8379会变成主节点
    • 主节点恢复后,主从关系会如何?
      • 主节点回来变成从机
    • 如果所有某一段插槽的主从节点全部宕掉,redis服务是否能继续?
      • redis.conf中的参数 cluster-required-full-coverage
      • 如果某一段插槽的主从都挂掉,而cluster-required-full-coverage为yes,那么整个集群都挂掉
      • 如果某一段插槽的主从都挂掉,而cluster-required-full-coverage为no,那么该插槽数据全都不能使用,也不能存储

12.4 集群的Jedis开发

  • 即使连接的不是主机,集群会自动切换主机存储,主机写,从机读

  • 无中心化主从集群,无论从哪台主机写的数据,其他主机上都能读到数据

    // 演示redis集群操作
    public class RedisClusterDemo {public static void main(String[] args) {// 创建对象 脸上6379集群HostAndPort hostAndPort = new HostAndPort("8.130.160.173", 6379);JedisCluster jedisCluster = new JedisCluster(hostAndPort);//进行操作jedisCluster.set("k1","v1");System.out.println(jedisCluster.get("k1"));jedisCluster.close();}
    }
    

12.5 集群优缺点

优点

  • 实现扩容
  • 分摊压力
  • 无中心化配置相对简单

缺点

  • 多键操作时不被支持的
  • 多键的Redis事务是不被支持的,lua脚本不被支持

13. Redis6-应用问题解决

13.1 缓存穿透

问题描述:

​ key对应的数据在数据源并不存在,每次正对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库

解决方案:

​ 一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且处于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了魂村的意义。

方案:

  • 对空值缓存

    • 如果一个查询返回的数据为空(不管数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过5分钟
  • 设置可访问的名单(白名单)
    • 使用Bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmaps里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问
  • 采用布隆过滤器
    • 布隆过滤器(Bloom Filter)是1970年布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)
  • 进行实时监控
    • 当发现Redis的命中率开始极速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

13.2 缓存击穿

问题描述:

​ key对应的数据存在,但在redis中过期,此时若有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设 到缓存,这个时候并发的请求可能会瞬间把后端DB压垮。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cvLW9ewN-1631883422833)(C:\Users\ghc\AppData\Roaming\Typora\typora-user-images\image-20210914180737989.png)]解决方案:

​ key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据,这个时候,需要考虑缓存被击穿的问题

  • 预先设置热门数据

    • 在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
  • 实时调整

    • 现场监控哪些热门数据,实时调整key的过期时长
  • 使用锁

    • 在缓存失效的时候(判断拿出来的值为空),不是立即去 load db

    • 先试用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个 mutex key

    • 当操作返回成功时,在进行 load db 操作,并回设缓存,最后删除 mutex key

    • 到操作返回失败,整懵有线程在 load db,当前线程睡眠一段时间再重试整个get缓存的方法

13.3 缓存雪崩

问题描述:

​ key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般会从后端DB家在数据并回设到缓存,这个时候大并发的请求可能压垮DB。

​ 缓存雪崩与缓存击穿的区别在于这里正对很多key缓存,前者则是某一个key正常访问

解决方案:

​ 缓存失效时的雪崩效应对底层系统的冲击非常可怕!

方案:

  • 构建多级缓存架构

    • nginx架构+redis缓存+其他缓存(ehcache等)
  • 使用锁或队列
    • 用加锁或者队列的方式来保证不会有大量的线程对数据库一次性进程读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发的情况
  • 设置过期标志更新缓存
    • 记录缓存数据是否过期(设置提前量),如果过期会出发通知另外的线程在后台去更新实际key的缓存
  • 将缓存失效时间分散开
    • 比如可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件

13.4 分布式锁

问题描述:

​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

分布式锁主流的实现方案:

1.基于数据库实现分布式锁

2.基于缓存(Redis等)

3.基于ZooKeeper

各自优缺点:

  • 性能:redis最高
  • 可靠性:Zookeeper最高

13.4.1 解决方案:基于redis实现分布式锁

  • 命令# set sku:1:info "OK" NX PX 10000

  • EX second :设置键的过期时间为second秒。SET key value EX second 效果等同于 SETEX key second value

  • 演示:

    设置锁:setnx 释放锁:del 设置过期时间:expire

    # 6379
    setnx users 10
    > 1
    setnx users 20
    > 0  # 失败,有锁不能设置
    del users # 删除锁
    setnx users 20
    > 1  # 成功expire users 10 # 设置过期时间10秒
    # 有问题,如果还没设置过期时间就出现异常,就会锁住
    
    # 同时设置 nx 上锁  ex 过期时间
    set users 100 nx ex 10
    > OK
    

13.4.2 代码演示

@RestController
@RequestMapping("/test")
public class RedisController {@Autowiredprivate RedisTemplate redisTemplate;@GetMapping("testLock")public void testLock(){// 1.获取锁,setne,3秒过期Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");// 2.获取锁成功,查询num的值if(lock){Object value = redisTemplate.opsForValue().get("num");// 2.1 判断num为空  returnif (StringUtils.isEmpty(value)){return;}// 2.2 有值就转换成intint num = Integer.parseInt(value+"");// 2.3 把redis的num+1redisTemplate.opsForValue().set("num",++num);// 2.4 释放锁redisTemplate.delete("lock");}else {// 3.获取锁失败,没隔0.1秒再获取try{Thread.sleep(100);8.} catch (InterruptedException e) {e.printStackTrace();}}}
}

执行java代码

# 6379
get num
> "0"
  1. 重启,服务集群,通过网关压力测试:
ab-n 1000 -c http://8.130.160.173:8080/test/testLock # ab测试  执行了1000个请求
# 6379
get num
> "1000"

13.4.3 代码优化之设置过期时间

Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111",3, TimeUnit.SECONDS);
// 3, TimeUnit.SECONDS 3秒过期

13.4.4 代码优化之UUID防止误删

问题:当服务器a在执行的时候,卡死,达到过期时间后自动释放锁。此时b开始执行,b执行中,a反应过来继续执行,执行完后却释放了b的锁,于是b又释放c的锁…

  • 通过UUID表示不同的操作 set lock uuid nx ex second

  • 释放锁的时候,先判断当前UUID和要释放锁UUID是否一样

    String uuid = UUID.randomUUID().toString();
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
    ....
    // 判断UUID值是否一样
    String lockUuid = (String) redisTemplate.opsForValue().get("lock");
    if (lockUuid.equals(uuid)) {redisTemplate.delete("lock");
    }
    

13.4.5 优化代码之LUA脚本保证删除原子性

问题:

缺乏原子性!

服务器a在uuid比较之后,一样,于是执行删除操作,正要删除,还没有删除,锁已经到了过期时间,自动释放。b获取锁,执行操作,此时a还是没有删除,于是a会释放b的锁,因为没有原子性,会造成互相干扰。

LUA脚本在Redis中的优势:

  • 将复杂或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数,提升性能
  • LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队

优化代码:

    @GetMapping("testLockLua")public void testLockLua() {// 1.声明一个uuid,作为value放入我们的key所对应的值中String uuid = UUID.randomUUID().toString();// 2.定义一个锁:lua脚本可以使用同一把锁,来实现删除String skuId = "25";//访问skuId为25号的商品String locKey = "lock:" + skuId;//锁住的是每个商品的数据// 3.获取锁Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);// 第一种:lock与过期时间中间不写任何的代码// redisTemplate.expire("lock",10,TimeUnit.SECONDS);// 如果trueif (lock) {// 执行的业务逻辑开始// 获取缓存中num的值Object value = redisTemplate.opsForValue().get("num");// 判断num为空  returnif (StringUtils.isEmpty(value)) {return;}// 不是空 如果说在这出现了异常,那么delete就删除失败,也就是说锁永远存在int num = Integer.parseInt(value + "");// 把redis的num+1 放入缓存redisTemplate.opsForValue().set("num", String.valueOf(++num));// 使用lua脚本来锁// 定义lua脚本String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0";// 使用redis执行luaDefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptText(script);// 设置一下返回值类型为Long// 因为删除判断的时候,返回的0,给其封装为数据类型,如果不封装那么久默认返回String类型,// 那么返回字符串与0会发生错误redisScript.setResultType(Long.class);//第一个是要script脚本,第二个需要判断key,第三个就是key所对应的值redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);} else {// 其他线程等待try {// 睡眠Thread.sleep(100);// 睡醒之后调用方法testLockLua();} catch (InterruptedException e) {e.printStackTrace();}}}

14. Redis6 新功能

14.1 ACL

14.1.1 简介

  • Redis ACL 是Access Control List(访问控制列表),该功能允许根据可以指向的命令和可以访问的键来限制某些连接

  • 在Redis 5 之前,redis安全规则只有密码控制还有通过rename来调整高危命令,比如flushdb,keys *,shutdown等。

    Redis 6 则提供了ACL的功能对用户进行更细粒度的权限控制:

    • 接入权限:用户名和密码
    • 可以指向的命令
    • 可以指向的KEY

    参考官网:https://redis.io/topics/acl

14.1.2 命令

  • 使用acl list 来展现用户权限列表

    • 数据说明

  • acl cat 查看添加权限指令类别

  • acl whoami 查看当前用户名

  • acl setuser user1 通过命令创建新用户默认权限

    127.0.0.1:6379> acl setuser zhangsan
    OK
    127.0.0.1:6379> acl list
    1) "user default on nopass ~* &* +@all"
    2) "user zhangsan off &* -@all"
    
  • acl setuser user2 on >password ~cached:* +get

    127.0.0.1:6379> acl setuser lisi on >111111 ~cached:* +get
    OK
    127.0.0.1:6379> acl list
    1) "user default on nopass ~* &* +@all"
    2) "user lisi on #bcb15f821479b4d5772bd0ca866c00ad5f926e3580720659cc80d39c9d09802a ~cached:* &* -@all +get"
    3) "user zhangsan off &* -@all"
    
  • auth username password 切换用户

    127.0.0.1:6379> auth lisi 111111
    OK
    

14.2 IO多线程

  • IO多线程其实指客户端交互部分的网络IO交互处理模块多线程,并非执行命令多线程,Redis6执行命令依然是单线程

  • 用来处理网络数据的读写和协议解析

  • 多线程IO默认不开启,需要在配置文件中配置

    io-threads-do-reads yes # 开启 默认no
    io-threads 4 # 多线程数量
    

14.3 工具支持 Cluster

  • 老版redis想搭集群需要单独安装ruby环境,Redis 5 将 redis-trib.rb 的功能集成到 redis-cli。另外官方 redis-benchmark 工具开始支持 cluster模式了,通过多线程的方式对多个分片进行压测

Redis 6 学习记录相关推荐

  1. Redis的学习记录

    Redis的学习记录 1.先导了解 1.1 NOSQL概述 1.1.1 为什么要用NoSql? 1.1.2 NoSql了解 1.1.3 NoSql特点 1.1.4 NoSQL的四大分类 2. Redi ...

  2. Redis基础学习记录(1)

    最近因为一些原因用到Redis不得不学一点,就想着记录一下学习的印记.若有不对的地方还望指出为谢. Redis (REmote DIctionary Server) 是用 C 语言开发的一个开源的高性 ...

  3. springboot @cacheable不起作用_Springboot学习记录13 使用缓存:整合redis

    本学习记录的代码,部分参考自gitee码云的如下工程.这个工程有详尽的Spingboot1.x教程.鸣谢! https://gitee.com/didispace/SpringBoot-Learnin ...

  4. 【学习记录】macOS的Redis安装及基本使用

    [学习记录]macOS的Redis安装及基本使用 一. Redis的安装与启动 二. 简单使用 ① 尝试插入第一个key-value ② Redis的数据类型与基本使用 字符串 列表 字典(哈希表) ...

  5. C++学习记录:C++连接Redis数据库

    C++学习记录:C++连接Redis数据库   之前学习了Redis数据库相关的内容,但是并没有在编写C++代码中用到Redis相关内容.   本篇笔记记录了个人在 Linux 环境下使用 C++ 连 ...

  6. gRPC学习记录(六)--客户端连接池

    对于客户端来说建立一个channel是昂贵的,因为创建channel需要连接,但是建立一个stub是很简单的,就像创建一个普通对象,因此Channel就需要复用,也就是说需要实现一个连接池应用.本文使 ...

  7. memcache/redis 缓存学习笔记

    0.redis和memcache的区别 a.redis可以存储除了string之外的对象,如list,hash等 b.服务器宕机以后,redis会把内存的数据持久化到磁盘上,而memcache则不会 ...

  8. SpringCache-redis缓存学习记录

    SpringCache-redis缓存学习记录 什么是SpringCache Spring 从 3.1 开始定义了 org.springframework.cache.Cache和 org.sprin ...

  9. 笔记-redis深入学习-1

    笔记-redis深入学习-1 redis的基本使用已经会了,但存储和读取只是数据库系统最基础的功能: 数据库系统还得为可靠实现这两者提供一系列保证: 数据.操作备份和恢复,主要是持久化: 高可用:主要 ...

最新文章

  1. linux socket 缓冲区默认大小
  2. python【蓝桥杯vip练习题库】ALGO-145 4-1打印下述图形
  3. padding-bottom属性的作用
  4. 使用拦截器分析Java EE应用程序的性能下降/提高
  5. MapReduce 中的两表 join 几种方案简介
  6. 信号与线性系统分析(第四版,吴大正主编)——信号与系统
  7. 【有限元】最简单fluent流体分析实例-2D模型中带障碍物计算流体流速与压力分布
  8. windows管理信息服务器不可用,Windows提示 错误: RPC 服务器不可用 解决方法。
  9. 基于深度学习的商品检索技术
  10. combo是什么意思啊(combo卡是什么意思)
  11. 数据加密——列置换加密
  12. 很棒的WPF控件库 Newbeecoder.UI
  13. Linux学习-49-列出进程调用或打开的文件信息(lsof命令)
  14. 半小时实现Java手撸Http协议,爽!!(附完整源码,建议收藏)
  15. 33、网络地址转换(NAT)
  16. linux混合命令_十多个命令行混合
  17. iOS开发使用nib进行界面设计并跳转
  18. 安装Java 出现 Java Platform SE binary(Process Id:6800)
  19. 【java】opencv + Tesseract(tess4j) 实现图片处理验证码识别
  20. python用字典存储学生成绩_掌握Python字典的12个例子

热门文章

  1. 四元数与旋转——学习笔记(一)
  2. STC12C5A60S2_CC2420驱动
  3. [Games 101] Lecture 06 Rasterization 2 (Antialiasing and Z-Buffering)
  4. Google Cloud 发起“Data Cloud Alliance”新联盟
  5. ERR wrong number of arguments for ‘srem‘ command
  6. 从键盘输入一个英文字母,进行大小写字母转换,并输出。
  7. 如何操作 Office Open XML 格式文档
  8. 正则表达式判断是否为纯数字
  9. 如何做好一个中小型企业计算机网络管理员
  10. 吊打面试官系列之:掌握了这166个Linux常用命令,面试官果然被我征服了。。