linux下安装redis

link解压文件夹命令: tar -zxvf 压缩包名

基本的环境安装

yum install gcc-c++#查看
gcc -v
#命令 make命令
make
#确认安装命令
make install

默认后台是不启动redis的,修改config文件 找到daemonize 改为yes

Redis maven配置:

<!--  springboot整合 redis  -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

SpringBoot 配置文件 配置redis 信息

redis:host: 127.0.0.1port: 6379password: 123456pool:max-active: 8max-wait: 1max-idle: 8min-idle: 0timeout: 0

在实用类中注入:

@Autowired
private RedisTemplate<Object, Object> redisTemplate ;@Autowired
private RedisTemplate<String, String> redisTemplate ;

实体序列化

public class User implements Serializable{}

字符串序列化器

RedisSerializer redisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(redisSerializer);

在高并发情况下,缓存穿透:

​ 方式一: synchronized 可以实现,效率低一点

@Override
public synchronized List<User> getUSerList(){}

​ 方式二:效率高点

List<User> userList = (List<User>)redisTemplate.opsForValue().get("allUSers");
//双重检测锁
if(userList == null){synchronized(this){userList = (List<User>)redisTemplate.opsForValue().get("allUSers");if(null == userList){userList = userMapper.selectAllUser();redisTemplate.opsForValue().set("allUSers",userList);}}
}

多线程测试缓存穿透:

​ Executors:是一个工具类用于并发编程

​ newFixedThreadPool:是一个线程值 ,根据CPU核心数决定

//线程,该线程电泳底层查询用户的方法
Runnable runnable = new Runnable(){@Overridepublic void run(){userSertvice.getAllUser();}
}ExecutorService  executorService = Executors.newFixedThreadPool(32);for(int i =0 i<10000; i++){executorService.submit(runnable);
}

Redis主从复制

环境配置 只配置从库不用配置主库

127.0.0.1:6379>info replication #查看当前库的信息

​ 复制3个配置文件,然后修改对应的信息

​ 1、端口

​ 2、pid名字

​ 3、log文件名字

​ 4、dump.rdb名字

启动redis命令:redis-server kconfig/redis80.conf

连接客户端命令:redis-cli -p 6379

查看redis进程命令: ps -ef|grep redis

断掉redis命令:SHUTDOWN

一主二从

认老大:命令:

127.0.0.1:6380>SLAVEOF 127.0.0.1 6379 #SLAVEOF host 6379 认老大
127.0.0.1:6380>SLAVEOF no one #自己当老大

真实的主从复制应该在文件中配置,现在用命令配置只是暂时的。

进入的config配置文件 找到replication下

#replicaof <masterip>  <masterport> #masterip主机地址 masterport
#masterauth <master-password>  #如果有密码  master-password

主机可以写 从机只能读

测试:主机断开连接,从机依旧可以连接的主机,只是没有写了,当主机回来了,从机依旧可以获取到主机写的信息

若果是命令行认得主机,当从机断开之后,在启动就获取不到主机写的信息,就会变回主机,只要变回从机,立马就可以获取主机的值!

哨兵模式:自动选取老大

配置文件 在config文件夹下修改一个文件, 命令:

vim sentinel.conf
             被监控的名称  host   port  1
sentinel monitor myredis 127.0.0.1 6379 1

后面的数字1,代表主机挂了,slave投票让谁接替成主机,票数最多的,就会成为主机。

启动哨兵:

​ 命令:redis-sentinel kconfig/sentinel.conf

Redis缓存穿透和雪崩

Redis数据类型

String 、List、Set、Hash、SortSet

String 命令操作:

127.0.0.1:6379>set string1 1
127.0.0.1:6379>incr string1  #string自增操作
127.0.0.1:6379>decrby string1 #string减2操作
subscribe ITCAST   (订阅某个(ITCAST)主题)publish ITCAST "hello word"  (向某个主题(ITCAST)发送消息)

  subscribe __keyevent@0__:expired

Redis

一、Nosql概述:

1、单机Mysql时代

90年代,一个网站的访问量一般不会太大,单个数据库完全够用。随着用户增多,网站出现以下问题:

  1. 数据量增加到一定程度,单机数据库就放不下了

  2. 数据的索引(B+ Tree),一个机器内存也存放不下

  3. 访问量变大后(读写混合),一台服务器承受不住

    2、Memcached(缓存)+ Mysql + 垂直拆分(读写分离)

网站80%的情况都是在读,每次都要去查询数据库的话就十分的麻烦!所以说我们希望减轻数据库的压力,我们可以使用缓存来保证效率!

优化过程经历了以下几个过程:

  1. 优化数据库的数据结构和索引(难度大)

  2. 文件缓存,通过IO流获取比每次都访问数据库效率略高,但是流量爆炸式增长时候,IO流也承受不了

  3. MemCache,当时最热门的技术,通过在数据库和数据库访问层之间加上一层缓存,第一次访问时查询数据库,将结果保存到缓存,后续的查询先检查缓存,若有直接拿去使用,效率显著提升。

3、分库分表 + 水平拆分 + Mysql集群

4、如今最近时代

如今信息量井喷式增长,各种各样的数据出现(用户定位数据,图片数据等),大数据的背景下关系型数据库(RDBMS)无法满足大量数据要求。Nosql数据库就能轻松解决这些问题。目前一个基本的互联网项目:

5、为什么要用Nosql?

用户的个人信息,社交网络,地理位置。用户自己产生的数据,用户日志等等爆发式增长!这时候我们就需要使用NoSQL数据库的,Nosql可以很好的处理以上的情况!

什么是Nosql

NoSQL = Not Only SQL(不仅仅是SQL)

Not Only Structured Query Language

关系型数据库:列+行,同一个表下数据的结构是一样的。

非关系型数据库:数据存储没有固定的格式,并且可以进行横向扩展。

NoSQL泛指非关系型数据库,随着web2.0互联网的诞生,传统的关系型数据库很难对付web2.0时代!尤其是超大规模的高并发的社区,暴露出来很多难以克服的问题,NoSQL在当今大数据环境下发展的十分迅速,Redis是发展最快的。

Nosql特点

1.方便扩展(数据之间没有关系,很好扩展!)

2.大数据量高性能(Redis一秒可以写8万次,读11万次,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高!)

3.数据类型是多样型的!(不需要事先设计数据库,随取随用)

4.传统的 RDBMS 和 NoSQL

传统的 RDBMS(关系型数据库)

结构化组织
SQL
数据和关系都存在单独的表中 row col
操作,数据定义语言
严格的一致性
基础的事务

Nosql

不仅仅是数据
没有固定的查询语言
键值对存储,列存储,文档存储,图形数据库(社交关系)
最终一致性
CAP定理和BASE
高性能,高可用,高扩展

5.大数据时代的3V :主要是描述问题的

海量Velume

多样Variety

实时Velocity

6.大数据时代的3高 : 主要是对程序的要求

高并发

高可扩

高性能

真正在公司中的实践:NoSQL + RDBMS 一起使用才是最强的。

二、Redis入门

Redis是什么?

Redis(Remote Dictionary Server ),即远程字典服务。

是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步

Redis能干什么?

  1. 内存存储,持久化,内存是断电即失,所以需要持久化(RDB,AOF)
  2. 高效率、用于高数缓冲
  3. 发布订阅系统
  4. 地图信息分析
  5. 计时器、计数器 (浏览量)

特性

  1. 多样的数据类型
  2. 持久化
  3. 集群
  4. 事务

环境安装

官网:https://redis.io/

Redis中文网:http://www.redis.cn/

windows下安装

1、下载安装包:https://github.com/MicrosoftArchive/redis/releases

2、解压

3、双击运行redis-server.exe

Linux下安装

1、redis官网下载安装包:


2、上传Linux并解压

# tar -zxvf redis-6.0.9.tar.gz


3、基本的环境安装

yum install gcc-c++#查看
gcc -v
#命令 make命令
make
#确认安装命令
make install



如果安装有误可以提供参考:https://blog.csdn.net/qq_38224607/article/details/107108283

redis的默认安装路径在/usr/local/bin

4、将配置文件拷贝到当前目录下

我们之后就是用这个配置文件进行启动

5、默认后台是不启动redis的,修改config文件 找到daemonize 改为yes

6、启动Redis服务

# redis-server hconfig/redis.conf

7、客户端测试连接

# redis-cli -p 6379

8、关闭redis

shutdown

9、查看redis服务

redis修改了密码再次连接需要密码

127.0.0.1:6379> auth 121566
OK
127.0.0.1:6379>

redis可视化工具连接:


参考连接:https://blog.csdn.net/qq_40460909/article/details/84838245

修改redis-config文件

没改配置文件前

改配置文件后

允许连接的IP (开放全部IP可能造成一些安全问题,可结合iptables进行控制访问)

更改密码

将protected-mode修改为no

redis-benchmark性能测试

序号 选项 描述 默认值
1 -h 指定服务器主机名 127.0.0.1
2 -p 指定服务器端口 6379
3 -s 指定服务器 socket
4 -c 指定并发连接数 50
5 -n 指定请求数 10000
6 -d 以字节的形式指定 SET/GET 值的数据大小 2
7 -k 1=keep alive 0=reconnect 1
8 -r SET/GET/INCR 使用随机 key, SADD 使用随机值
9 -P 通过管道传输 请求 1
10 -q 强制退出 redis。仅显示 query/sec 值
11 –csv 以 CSV 格式输出
12 -l 生成循环,永久执行测试
13 -t 仅运行以逗号分隔的测试命令列表。
14 -I Idle 模式。仅打开 N 个 idle 连接并等待。
# 启动redis
# redis-server hconfig/redis-conf# 测试 100个并发连接 ,100000 个请求
# redis-benchmark -h localhost -p 6379 -c 100 -n 10000

基础知识

redis默认有16个数据库

默认使用的第0个;

16个数据库为:DB 0~DB 15 默认使用DB 0 ,可以使用select n切换到DB n,dbsize可以查看当前数据库的大小,与key数量相关。

127.0.0.1:6379> select 3
OK
127.0.0.1:6379[3]> dbsize
(integer) 0
127.0.0.1:6379[3]>
127.0.0.1:6379[3]> dbsize
(integer) 0
127.0.0.1:6379[3]> set name "hello"
OK
127.0.0.1:6379[3]> get name
"hello"
127.0.0.1:6379[3]> dbsize
(integer) 1

查看当前数据库中所有的key

127.0.0.1:6379[3]> keys *
1) "name"
127.0.0.1:6379[3]>

清空当前库

flushdb

127.0.0.1:6379[3]> flushdb
OK
127.0.0.1:6379[3]> keys *
(empty array)
127.0.0.1:6379[3]>

清空所有库

flushall

Redis是单线程的,Redis是基于内存操作l.的。

所以Redis的性能瓶颈不是CPU,而是机器内存和网络带宽。

那么为什么Redis的速度如此快呢,性能这么高呢?QPS达到10W+

所以Redis的性能瓶颈不是CPU,而是机器内存和网络带宽。

那么为什么Redis的速度如此快呢,性能这么高呢?QPS达到10W+

  • 误区1:高性能的服务器一定是多线程的?
  • 误区2:多线程(CPU上下文会切换!)一定比单线程效率高!

核心:Redis是将所有的数据放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个CPU上的,在内存存储数据情况下,单线程就是最佳的方案。

五大数据类型

Redis-key

127.0.0.1:6379> keys *             # 查看当前数据库的所有key
1) "age"
2) "name"
127.0.0.1:6379> move name 1        # 移除key为name
(integer) 1
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> set name he
OK
127.0.0.1:6379> keys *
1) "name"
2) "age"
127.0.0.1:6379> expire name 10     # 设置key为name的时间为10秒钟
(integer) 1
127.0.0.1:6379> ttl name           # 查看key为name还剩余多少时间
(integer) 5
127.0.0.1:6379> ttl name
(integer) 4
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> exists name        # 查看当前的key是否存在
(integer) 0
127.0.0.1:6379> exists age
(integer) 1
127.0.0.1:6379> type age           # 查看当前的key是什么类型
string
127.0.0.1:6379>

String

################################################################################
127.0.0.1:6379> set key1 v1        # 设置值
OK
127.0.0.1:6379> get key1           # 获取值
"v1"
127.0.0.1:6379> keys *             # 查看当前所有的key
1) "key1"
127.0.0.1:6379> exists key1        # 判断是否存在当前key
(integer) 1
127.0.0.1:6379> append key1 hello  # 追加字符串,如果当前字符串不存在相当于set key
(integer) 7
127.0.0.1:6379> get key1
"v1hello"
127.0.0.1:6379> strlen key1        # 获取key的长度
(integer) 7
127.0.0.1:6379>
################################################################################
# i++
# 步长 i+=127.0.0.1:6379> set views 1
OK
127.0.0.1:6379> get views
"1"
127.0.0.1:6379> incr views        # 可以使key的值+1        incr
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> incr views
(integer) 3
127.0.0.1:6379> get views
"3"
127.0.0.1:6379> type views
string
127.0.0.1:6379> decr views         # 可以使key的值-1         decr
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> incrby views 10    # 可以使key的值指定+10     incrby
(integer) 12
127.0.0.1:6379> get views
"12"
127.0.0.1:6379> decrby views 5     # 可以使key的值指定-5       decrby
(integer) 7
127.0.0.1:6379>
################################################################################
# 字符串的范围 range127.0.0.1:6379> set key1 "hello,word"    # 设置值
OK
127.0.0.1:6379> getrange key1 0 3        # 截取字符串[0,3]
"hell"
127.0.0.1:6379> getrange key1 0 -1       # 获取全部的值  和 get key 是一样的
"hello,word"
127.0.0.1:6379>
################################################################################
# 替换127.0.0.1:6379> set key2 "hello"
OK
127.0.0.1:6379> get key2
"hello"
127.0.0.1:6379> setrange key2 1 "XX"  # 替换指定位置的字符串
(integer) 5
127.0.0.1:6379> get key2
"hXXlo"
127.0.0.1:6379>
################################################################################
# setex (set with expire)  # 设置过期时间
# setnx (set if not exist) # 不存在在设置127.0.0.1:6379> setex key3 30 "hello"    # 设置key的值存储时间为30秒
OK
127.0.0.1:6379> ttl kry3
(integer) -2
127.0.0.1:6379> ttl key3
(integer) 18
127.0.0.1:6379> get key3
"hello"
127.0.0.1:6379> setnx mykey "redis"       # 如果mykey 不存在 则创建成功
(integer) 1
127.0.0.1:6379> keys *
1) "mykey"
2) "key2"
3) "key1"
127.0.0.1:6379> setnx mykey "hello,word"   # 如果mykey 存在 则创建失败
(integer) 0
127.0.0.1:6379>
################################################################################
# 同时设置多个值,获取多个值
# mset mget127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3     # 同时设置多个值
OK
127.0.0.1:6379> keys *
1) "k2"
2) "k1"
3) "k3"
127.0.0.1:6379> mget k1 k2 k3              # 同时获取多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4         # msetnx是一个原子性的操作,要么一起成功,要么一起失败!
(integer) 0
127.0.0.1:6379> get k4
(nil)
127.0.0.1:6379>
################################################################################
# getset 先get在set127.0.0.1:6379> getset db redis   # 如果不存在值,则返回null ,如果存在值,则返回值,并设置新的值
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db hello
"redis"
127.0.0.1:6379> get db
"hello"
127.0.0.1:6379> 

String类似的使用场景:value除了是字符串还可以是数字,用途举例:

  • 计数器
  • 统计多单位的数量:uid:123666:follow 0
  • 粉丝数
  • 对象存储缓存

List

所有的命令都是以L开头

################################################################################
127.0.0.1:6379> lpush list one         # 将一个值或多个值,从列表的左边放进去
(integer) 1
127.0.0.1:6379> lpush list two three
(integer) 3
127.0.0.1:6379> lrange list 0 -1      # 获取列表的所有值
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> keys *
1) "list"
127.0.0.1:6379> rpush list four       # 将一个值或多个值,从列表的右边放进去
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "four"
127.0.0.1:6379>
################################################################################
# lpop  从左边移除列表的第一个元素
# rpop  从右边移除列表的第一个元素127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "four"
127.0.0.1:6379> lpop list
"three"
127.0.0.1:6379> rpop list
"four"
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379>
################################################################################
# lindex 通过下标获取值127.0.0.1:6379> lindex list 0    # 通过下标获取第一个值
"two"
127.0.0.1:6379> lindex list 1
"one"
127.0.0.1:6379>
################################################################################
# llen127.0.0.1:6379> lpush list 1 2 3
(integer) 3
127.0.0.1:6379> llen list                    # 返回列表的长度
(integer) 3
127.0.0.1:6379>
################################################################################
# lrem 移除指定的值127.0.0.1:6379> lpush list 3
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
4) "1"
127.0.0.1:6379> lrem list 1 1           # 移除列表中指定个数的key的值
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
127.0.0.1:6379> lrem list 2 3
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "2"
127.0.0.1:6379>
################################################################################
# ltrim 截取127.0.0.1:6379> rpush mylist "hello" "hello1" "hello2" "hello3"
(integer) 4
127.0.0.1:6379> ltrim mylist 1 2     # 通过下标截取指定的长度
OK
127.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
2) "hello2"
127.0.0.1:6379>
################################################################################
# rpoplpush 移除列表的最后一个元素 ,到新的列表中27.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
2) "hello2"
127.0.0.1:6379> rpoplpush mylist listTwo
"hello2"
127.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
127.0.0.1:6379> lrange listTwo 0 -1
1) "hello2"
127.0.0.1:6379>
################################################################################
# lset 将列表中指定下标的值替换成另一个值,相当于更新操作127.0.0.1:6379> exists list              # 判断是否存在列表
(integer) 0
127.0.0.1:6379> lset list 0 "hello"      # 如果不存在则报错
(error) ERR no such key
127.0.0.1:6379> lpush list "one"         # 创建一个列表
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "one"
127.0.0.1:6379> lset list 0 "hello"      # 如果存在,更新当前下标的值
OK
127.0.0.1:6379> lrange list 0 0
1) "hello"
127.0.0.1:6379> lset list 1 "hello1"     # 如果不存在则报错
(error) ERR index out of range
127.0.0.1:6379>
################################################################################
# linsert 在某个指定元素的前面或者后面插入值
# before 在某个指定元素的前面插入值
# after  在某个指定元素的后面插入值
127.0.0.1:6379> lpush list "hello" "word"
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "word"
2) "hello"
127.0.0.1:6379> linsert list before "hello" "xiaogege"
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "word"
2) "xiaogege"
3) "hello"
127.0.0.1:6379> linsert list after "hello" "xiaojiejie"
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "word"
2) "xiaogege"
3) "hello"
4) "xiaojiejie"
127.0.0.1:6379>
################################################################################

小结

  • 它实际上是一个链表,before Node after, left ,r’ight 都可以插入值
  • 如果key不存在,创建新的链表
  • 如果key存在,添加值
  • 如果移除了所有值,空链表,也代表不存在
  • 在两边插入或者改变值,效率最高,中间元素,相对来说效率会低一点!

消息队列(Lpush,Rpop) 栈(Lpush , Lpop)

Set

Set的值是不能重复的

################################################################################
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> sadd myset "hello"              # 往集合中添加一个元素
(integer) 1
127.0.0.1:6379> sadd myset "xiaohe"
(integer) 1
127.0.0.1:6379> sadd myset "lovexiaohe"
(integer) 1
127.0.0.1:6379> SMEMBERS myset                 # 查看集合中的元素
1) "hello"
2) "lovexiaohe"
3) "xiaohe"
127.0.0.1:6379> SISMEMBER myset word           # 判断集合中是存在元素
(integer) 0
127.0.0.1:6379> SISMEMBER myset hello
(integer) 1
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> scard myset                   # 获取set集合中的值的个数
(integer) 3
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> srem myset "hello"            # 移除set集合中的某个值
(integer) 1
127.0.0.1:6379> scard myset
(integer) 2
127.0.0.1:6379> SMEMBERS myset
1) "lovexiaohe"
2) "xiaohe"
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> SRANDMEMBER myset             # 随机筛选出set集合中的一个元素
"xiaohe"
127.0.0.1:6379> SRANDMEMBER myset
"xiaohe"
127.0.0.1:6379> SRANDMEMBER myset
"lovexiaohe"
127.0.0.1:6379> SRANDMEMBER myset 2           # 随机筛选出set集合中的两个元素
1) "xiaohe"
2) "lovexiaohe"
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> SMEMBERS myset
1) "xiaogege"
2) "lovexiaohe"
3) "xiaohe"
127.0.0.1:6379> SPOP myset                    # 随机删除一个set集合中一个的元素
"xiaohe"
127.0.0.1:6379> SMEMBERS myset
1) "xiaogege"
2) "lovexiaohe"
127.0.0.1:6379>
################################################################################
# 将set集合中的一个指定的值,移动到另一个set集合中
127.0.0.1:6379> sadd myset "hello "
(integer) 1
127.0.0.1:6379> sadd myset "hello" "xiaohe"
(integer) 2
127.0.0.1:6379> SMEMBERS myset
1) "hello"
2) "hello "
3) "xiaohe"
127.0.0.1:6379> sadd set2 "set2"
(integer) 1
127.0.0.1:6379> SMOVE myset set2 "hello"       # 将set集合中的一个指定的值,移动到另一个set集合中
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "hello "
2) "xiaohe"
127.0.0.1:6379> SMEMBERS set2
1) "hello"
2) "set2"
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> sadd key1 a
(integer) 1
127.0.0.1:6379> sadd key1 b
(integer) 1
127.0.0.1:6379> sadd key1 c
(integer) 1
127.0.0.1:6379> sadd key2 c
(integer) 1
127.0.0.1:6379> sadd key2 d
(integer) 1
127.0.0.1:6379> sadd key2 e
(integer) 1
127.0.0.1:6379> SDIFF key1 key2              # 查看集合1和集合2的差集
1) "a"
2) "b"
127.0.0.1:6379> sinter key1 key2             # 查看集合1和集合2的交集   共同好友可以这样实现
1) "c"
127.0.0.1:6379> SUNION key1 key2             # 查看集合1和集合2的并集
1) "c"
2) "b"
3) "a"
4) "e"
5) "d"
127.0.0.1:6379>
################################################################################

Hash

Map集合 , key -> Map集合 , 本质和Sting类型没有太大区别

################################################################################
127.0.0.1:6379> hset myhash field hello                 # set一个集体的 key - value
(integer) 1
127.0.0.1:6379> hget myhash field                       # 获取一个字段的值
"hello"
127.0.0.1:6379> hmset myhash field word field1 xiaohe   # set多个key - value
OK
127.0.0.1:6379> HMGET myhash field field1               # 获取多个字段的值
1) "word"
2) "xiaohe"
127.0.0.1:6379> HGETALL myhash                          # 获取全部的数据
1) "field"
2) "word"
3) "field1"
4) "xiaohe"
127.0.0.1:6379> hdel myhash field                       # 删除一个指定的key, 对应的value也就消失了
(integer) 1
127.0.0.1:6379> HGETALL myhash
1) "field1"
2) "xiaohe"
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> hlen myhash                            # 获取hash表的字段数量
(integer) 1
127.0.0.1:6379> hset myhash fiekd hello field1 word
(integer) 1
127.0.0.1:6379> hlen myhash
(integer) 2
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> HEXISTS myhash field                   # 判断hash集合中某个key是否存在
(integer) 0
127.0.0.1:6379> HEXISTS myhash fiekd
(integer) 1
127.0.0.1:6379>
################################################################################
# 只获取hash集合中所有的key
# 只获取hash集合中所有的values
127.0.0.1:6379> hkeys myhash
1) "field1"
2) "fiekd"
127.0.0.1:6379> HVALS myhash
1) "word"
2) "hello"
127.0.0.1:6379>
################################################################################127.0.0.1:6379> hset myhash field4 2
(integer) 1
127.0.0.1:6379> hget myhash field4
"2"
127.0.0.1:6379> HINCRBY myhash field4 3            # 集合中的某个元素添加指定的值
(integer) 5
127.0.0.1:6379> hget myhash field4
"5"
127.0.0.1:6379> 127.0.0.1:6379> hsetnx myhash field5 hello         # 如果存在则添加成功,不存在则添加失败!
(integer) 1
127.0.0.1:6379> hsetnx myhash field5 word
(integer) 0
127.0.0.1:6379> ################################################################################

Zset

不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。

score相同:按字典顺序排序

有序集合的成员是唯一的,但分数(score)下·却可以重复。

  • ZADD key score member1 [score2 member2] 向有序集合添加一个或多个成员,或者更新已存在成员的分数
  • ZCARD key 获取有序集合的成员数
  • ZCOUNT key min max 计算在有序集合中指定区间score的成员数
  • ZINCRBY key n member 有序集合中对指定成员的分数加上增量 n
  • ZSCORE key member 返回有序集中,成员的分数值
  • ZRANK key member 返回有序集合中指定成员的索引
  • ZRANGE key start end 通过索引区间返回有序集合成指定区间内的成员
  • ZRANGEBYLEX key min max 通过字典区间返回有序集合的成员
  • ZRANGEBYSCORE key min max 通过分数返回有序集合指定区间内的成员==-inf 和 +inf分别表示最小最大值,只支持开区间()==
  • ZLEXCOUNT key min max 在有序集合中计算指定字典区间内成员数量
  • ZREM key member1 [member2..] 移除有序集合中一个/多个成员
  • ZREMRANGEBYLEX key min max 移除有序集合中给定的字典区间的所有成员
  • ZREMRANGEBYRANK key start stop 移除有序集合中给定的排名区间的所有成员
  • ZREMRANGEBYSCORE key min max 移除有序集合中给定的分数区间的所有成员
  • ZREVRANGE key start end 返回有序集中指定区间内的成员,通过索引,分数从高到底
  • ZREVRANGEBYSCORRE key max min 返回有序集中指定分数区间内的成员,分数从高到低排序
  • ZREVRANGEBYLEX key max min 返回有序集中指定字典区间内的成员,按字典顺序倒序
  • ZREVRANK key member 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
  • ZINTERSTORE destination numkeys key1 [key2 ..] 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中,numkeys:表示参与运算的集合数,将score相加作为结果的score
  • ZUNIONSTORE destination numkeys key1 [key2..] 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中
  • ZSCAN key cursor [MATCH pattern\] [COUNT count] 迭代有序集合中的元素(包括元素成员和元素分值)
################################################################################
127.0.0.1:6379> zadd myset 1 one
(integer) 1
127.0.0.1:6379> zadd myset 2 two 3 three    # 往集合中添加多个值
(integer) 2
127.0.0.1:6379> ZRANGE myset 0 -1           # 查看集合中所有元素
1) "one"
2) "two"
3) "three"
127.0.0.1:6379>
################################################################################
# 排序
127.0.0.1:6379> zadd salary 2500 zhangshan           # 往集合中添加3个元素
(integer) 1
127.0.0.1:6379> zadd salary 500 xiaohe
(integer) 1
127.0.0.1:6379> zadd salary 5000 xiaozhan
(integer) 1
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf       # 显示所有用户,按照从小到大的顺序
1) "xiaohe"
2) "zhangshan"
3) "xiaozhan"
127.0.0.1:6379> ZREVRANGE salary 0 -1                # 显示所有用户,按照从大到小的顺序
1) "xiaozhan"
2) "zhangshan"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores # 显示所有用户,按照从小到大的顺序,并且附带成绩。
1) "xiaohe"
2) "500"
3) "zhangshan"
4) "2500"
5) "xiaozhan"
6) "5000"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 withscores  # 显示工资小于2500员工的升序排序
1) "xiaohe"
2) "500"
3) "zhangshan"
4) "2500"
127.0.0.1:6379>
################################################################################
# 移除某个元素 rem
127.0.0.1:6379> ZRANGE salary 0 -1
1) "xiaohe"
2) "zhangshan"
3) "xiaozhan"
127.0.0.1:6379> Zrem salary xiaohe       # 移除某个元素
(integer) 1
127.0.0.1:6379> ZRANGE salary 0 -1
1) "zhangshan"
2) "xiaozhan"
127.0.0.1:6379>
################################################################################
127.0.0.1:6379> ZCARD salary    # 获取有序集合的成员数
(integer) 2
127.0.0.1:6379>
################################################################################
# 判断指定区间的个数
127.0.0.1:6379> zadd myset 1 he 2 xiao 3 long
(integer) 3
127.0.0.1:6379> ZCOUNT myset 1 3           # 判断1~3之间存在有几个值
(integer) 3
127.0.0.1:6379> ZCOUNT myset 1 2
(integer) 2
127.0.0.1:6379>

应用案例:

  1. set排序 存储班级成绩表 工资表排序!
  2. 普通消息,1.重要消息 2.带权重进行判断
  3. 排行榜应用实现,取Top N测试

三种特殊数据类型

geospatial 地理位置

城市经纬度查询:http://www.jsons.cn/lngcode/

功能:可以推算出地理位置的信息,两地之间的距离,方圆几里的人!

GEOADD

将指定的地理空间位置(纬度、经度、名称)添加到指定的key

EPSG:900913 / EPSG:3785 / OSGEO:41001 规定如下:

  • 有效的经度从-180度到180度。
  • 有效的纬度从-85.05112878度到85.05112878度。
#127.0.0.1:6379> geoadd china:city 39.904989 116.405285 beijing
(error) ERR invalid longitude,latitude pair 39.904989,116.405285

geoadd

# 添加城市数据
# 规则 : 两级是无法直接添加,我们一般会下载城市数据,通过Java 程序导入!
127.0.0.1:6379> geoadd china:city 116.405285 39.904989 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.472644 31.231706 shanghai 120.153576 30.287459 hangzhou 106.504962 29.533155 chongqing 125.14904 42.927 jilin 91.132212 29.660361 xizang
(integer) 5
127.0.0.1:6379> geoadd china:city 108.948024 34.263161 xian
(integer) 1
127.0.0.1:6379>

GEODIST

返回两个给定位置之间的距离。

如果两个位置之间的其中一个不存在, 那么命令返回空值。

指定单位的参数 unit 必须是以下单位的其中一个:

  • m 表示单位为米。
  • km 表示单位为千米。
  • mi 表示单位为英里。
  • ft 表示单位为英尺。
127.0.0.1:6379> GEODIST china:city beijing shanghai      # 查看北京到上海的直线距离 单位 m
"1067597.9668"
127.0.0.1:6379> GEODIST china:city beijing shanghai km   # 查看北京到上海的直线距离 单位 km
"1067.5980"
127.0.0.1:6379>

GEOHSAS

该命令将返回11个字符的Geohash字符串,所以没有精度Geohash,损失相比,使用内部52位表示。返回的geohashes具有以下特性:

  1. 他们可以缩短从右边的字符。它将失去精度,但仍将指向同一地区。
  2. 它可以在 geohash.org 网站使用,网址 http://geohash.org/<geohash-string>。查询例子:http://geohash.org/sqdtr74hyu0.
  3. 与类似的前缀字符串是附近,但相反的是不正确的,这是可能的,用不同的前缀字符串附近。
# 将二维的纬度转换成一维的字符串,如果两个字符串越接近,则距离越近!
127.0.0.1:6379> geohash china:city beijing chongqing
1) "wx4g0b7xrt0"
2) "wm78p86e170"
127.0.0.1:6379>

GEOPOS

127.0.0.1:6379> geopos china:city beijing           # 获取指定城市的经度和纬度
1) 1) "116.40528291463851929"2) "39.9049884229125027"
127.0.0.1:6379> geopos china:city beijing chongqing
1) 1) "116.40528291463851929"2) "39.9049884229125027"
2) 1) "106.50495976209640503"2) "29.53315530684997015"
127.0.0.1:6379>

GEORADIUS

以给定的经纬度为中心, 返回键包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。

范围可以使用以下其中一个单位:

  • m 表示单位为米。
  • km 表示单位为千米。
  • mi 表示单位为英里。
  • ft 表示单位为英尺。

所有的数据都录入到china:city中

127.0.0.1:6379> georadius china:city 110 30 1000 km     # 以经度110纬度30为中心,找半径1000km的城市
1) "chongqing"
2) "xian"
3) "hangzhou"
127.0.0.1:6379> georadius china:city 110 30 500 km withdist # 以经度110纬度30为中心,找半径1000km的城市,并返回距离
1) 1) "chongqing"2) "341.4052"
2) 1) "xian"2) "484.4073"
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord #以经度110纬度30为中心,找半径1000km的城市,并返回经度纬度
1) 1) "chongqing"2) 1) "106.50495976209640503"2) "29.53315530684997015"
2) 1) "xian"2) 1) "108.94802302122116089"2) "34.2631604414749944"
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord withdist count 1  #以经度110纬度30为中心,找半径1000km的城市,并返回经度,并设置返回数量
1) 1) "chongqing"2) "341.4052"3) 1) "106.50495976209640503"2) "29.53315530684997015"

GEORADIUSBYMEMBER

这个命令和 GEORADIUS 命令一样, 都可以找出位于指定范围内的元素, 但是 GEORADIUSBYMEMBER 的中心点是由给定的位置元素决定的, 而不是像 GEORADIUS 那样, 使用输入的经度和纬度来决定中心点

指定成员的位置被用作查询的中心。

127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km  # 以beijing城市为中心,找半径1000km的城市
1) "beijing"
2) "jilin"
3) "xian"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city shanghai 1000 km
1) "hangzhou"
2) "shanghai"
127.0.0.1:6379> 

GEO地层的实现原理其实就是Zset,我们可以用Zset命令来操作

127.0.0.1:6379> ZRANGE china:city 0 -1       # 查看所有的城市
1) "xizang"
2) "chongqing"
3) "xian"
4) "hangzhou"
5) "shanghai"
6) "beijing"
7) "jilin"
127.0.0.1:6379> ZREM china:city xian        # 删除指定的城市
(integer) 1
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "xizang"
2) "chongqing"
3) "hangzhou"
4) "shanghai"
5) "beijing"
6) "jilin"
127.0.0.1:6379>

Spring boot使用redis进行经纬度距离计算 :

https://blog.csdn.net/LzmCode/article/details/100099327

Hyperloglog基数统计

什么是基数 ?

A {1,3,4,6,7,6}

B {2,3,4,1,6}

基数(在一个集合中不重复的数) = 5 ,可以接受误差。

简介

Redis 2.8.9版本就更新了Hyperloglog数据结构!

Redis Hyperloglog基数统计法!

优点:占用的内存空间是固定的,2^64不同的元素技术操作,只需要12KB内存!如果要从内存角度来比较的话Hyperloglog首选!

网页的uv(一个人访问一个网站多次,但还是算一个人!

传统的方式,set保存用户的id,然后就可以统计set中的元素数量作为标准判断!

这个方式如果保存大量的用户id,就会比较麻烦!我们的目的就是为了计数,而不是为了保存用户id;

0.81%错误率!统计uv任务,可以忽略不计!

127.0.0.1:6379> pfadd myset1 a b c d e f g     # 创建一组元素
(integer) 1
127.0.0.1:6379> pfcount myset1                 # 统计myset1这组元素个数
(integer) 7
127.0.0.1:6379> pfadd myset2 b s v b e q
(integer) 1
127.0.0.1:6379> pfcount myset2
(integer) 5
127.0.0.1:6379> PFMERGE myset3 myset1 myset2   # 合并两组 myset1 myset2 => myset3
OK
127.0.0.1:6379> pfcount myset3
(integer) 10
127.0.0.1:6379> 

BitMap位图场景详解

使用位存储,信息状态只有 0 和 1

Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset),在bitmap上可执行AND,OR,XOR,NOT以及其它位操作。

应用场景: 签到统计、状态统计

  • setbit key offset value 为指定key的offset位设置值
  • getbit key offset 获取offset位的值
  • bitcount key [start end] 统计字符串被设置为1的bit数,也可以指定统计范围按字节
  • bitop operration destkey key[key..] 对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。
  • BITPOS key bit [start] [end] 返回字符串里面第一个被设置为1或者0的bit位。start和end只能按字节,不能按位
------------setbit--getbit--------------
127.0.0.1:6379> setbit sign 0 1 # 设置sign的第0位为 1
(integer) 0
127.0.0.1:6379> setbit sign 2 1 # 设置sign的第2位为 1  不设置默认 是0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> type sign
string
127.0.0.1:6379> getbit sign 2 # 获取第2位的数值(integer) 1127.0.0.1:6379> getbit sign 3(integer) 1127.0.0.1:6379> getbit sign 4 # 未设置默认是0(integer) 0-----------bitcount----------------------------127.0.0.1:6379> BITCOUNT sign # 统计sign中为1的位数(integer) 4

事务

Redis单条命令是保证原子性的,但是事务不保证原子性

Redis事务本质:一组命令的集合! 一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行!

一次性、顺序性、排他性!执行一序列的命令!

-----  队列  set set set 执行  -----

Redis事务没有隔离级别的概念!

所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会被执行!

Redis事务:

  • 开启事务(multi)
  • 命令入队()
  • 执行事务(exec)
  • 取消事务(discard)

正常执行事务

127.0.0.1:6379> multi                   # 开启事务
OK
# 命令入队
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec                    # 执行事务
1) OK
2) OK
3) "v2"
4) OK
127.0.0.1:6379>

放弃事务

127.0.0.1:6379> multi                   # 开启事务
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> discard                # 取消事务
OK
127.0.0.1:6379> get k4                 # 事务队列中的命令都不会被执行!
(nil)
127.0.0.1:6379>

编译型异常 (代码有问题! 命令有错误!) ,事务中所有的命令都不会被执行

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> getset k3 v3
QUEUED
127.0.0.1:6379> getset k4            # 错误的命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> exec                # 执行事务的时候会报错的
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k1              # 所有的命令都不会被执行
(nil)
127.0.0.1:6379>

运行时异常 (1/0),如果事务队列中存在语法性,那麽执行命令的时候,其他命令是可以被执行的,错误命令抛出异常

127.0.0.1:6379> set k1 "v1"
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> incr k1       # 会执行的时候失败!
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) ERR value is not an integer or out of range  # 虽然命令报错了,但是依旧正常执行成功了
3) OK
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k3
"v3"
127.0.0.1:6379>

乐观锁

监控 ! Watch 加锁 Unwatch 解锁

一但事务执行成功后,监控就会取消掉。

悲观锁:

  • 很悲观,认为什么时候都会出现问题,无论做什么都会加锁。

乐观锁:

  • 很乐观,认为什么时候都不会出问题,所以不会上锁,更新数据的时候去判断一下,在此期间是否有人修改了这个数据,
  • 获取version
  • 更新的时候比较version

Redis监视测试

127.0.0.1:6379> set mony 100              # 设置钱的金额为100
OK
127.0.0.1:6379> set out 0                 # 设置花费为0
OK
127.0.0.1:6379> watch mony                # 监控mony对象
OK
127.0.0.1:6379> multi                     # 开启事务
OK
127.0.0.1:6379> DECRBY mony 20            # 金额减20
QUEUED
127.0.0.1:6379> INCRBY out 20             # 花费加20
QUEUED
127.0.0.1:6379> exec                      # 执行事务
1) (integer) 80
2) (integer) 20
127.0.0.1:6379> get mony
"80"
127.0.0.1:6379> get out
"20"
127.0.0.1:6379>

测试多线程修改值,使用watch可以当做redis的乐观锁去操作

127.0.0.1:6379> watch mony       # 监视 mony
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> DECRBY mony 20
QUEUED
127.0.0.1:6379> INCRBY out 20
QUEUED
127.0.0.1:6379> exec
(nil)
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI  # 执行之前另一条线程修改了我们的值,这个时候,就会导致失败!
127.0.0.1:6379>
--------------------------------------------------------------------------------------
127.0.0.1:6379> get mony
"80"
127.0.0.1:6379> set mony 1000
OK
127.0.0.1:6379>

Jedis

java操作redis

什么是Jedis ?是redis官方推荐的Java连接开发工具!使用Java操作redis中间件

1、创建一个空项目

1、导入对应的依赖

<!--导入jedis的包-->
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version>
</dependency>
<!--fastjson-->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version>
</dependency>

2、编码测试

  • 连接redis
  • 操作命令
  • 断开连接!

首次测试连接

/*** @author 小鸵鸟* @date 2020/11/17 12:02*/
public class TestPing {public static void main(String[] args) {// 1、new一个jedis对象JedisShardInfo shardInfo = new JedisShardInfo("redis://39.97.241.135:6379");//这里是连接的本地地址和端口shardInfo.setPassword("121566");//这里是密码Jedis jedis = new Jedis(shardInfo);System.out.println(jedis.ping());}
}

3、常用的API

String

List

Set

Hash

Zset

所有的api命令,就是我们对应的上面学习的指令,一个都没有变化!

/*** @author 小鸵鸟* @date 2020/11/17 12:02*/
public class TestPing {public static void main(String[] args) {// 1、new一个jedis对象JedisShardInfo shardInfo = new JedisShardInfo("redis://39.97.241.135:6379");//这里是连接的本地地址和端口shardInfo.setPassword("121566");//这里是密码Jedis jedis = new Jedis(shardInfo);jedis.flushDB();JSONObject jsonObject = new JSONObject();jsonObject.put("hello","world");jsonObject.put("name","kuangshen");// 开启事务Transaction multi = jedis.multi();String result = jsonObject.toJSONString();// jedis.watch(result)try {multi.set("user1",result);multi.set("user2",result);//int i = 1/0 ; // 代码抛出异常事务,执行失败!multi.exec(); // 执行事务!} catch (Exception e) {multi.discard(); // 放弃事务e.printStackTrace();} finally {System.out.println(jedis.get("user1"));System.out.println(jedis.get("user2"));jedis.close(); // 关闭连接}}
}

SpringBoot整合Redis

继springboot2.X之后,我们不使用jedis ,使用lettuce!

jedis: 采用的直连,多个线程操作的话是不安全的,如果想要避免不安全,使用jedis Pool连接池!更像BIO模式

lettuce:采用netty,实例可以在多个线程中共享,不存在线程不安全的现象!可以减少线程数据,更象Nio模式

源码分析:

@Bean
@ConditionalOnMissingBean(name = {"redisTemplate"}
)
@ConditionalOnSingleCandidate(RedisConnectionFactory.class) // 我们可以自定义一个RedisTemplate来替换这个默认的!
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {// 默认的RedisTemplate没有过多的设置,redis对象需要序列化// 两个泛型都是<Object, Object> ,我们需要强制转换为<String,Object>RedisTemplate<Object, Object> template = new RedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;
}@Bean
@ConditionalOnMissingBean   // 由于String类型常用,我们单独提出来一个bean!
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;
}

测试

1、导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、配置连接

# 配置redis
spring.redis.host=39.97.241.135
spring.redis.port=6379
spring.redis.password=121566

3、测试

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;import java.util.concurrent.TimeUnit;@SpringBootTest
class Redis02SpringbootApplicationTests {@Autowiredprivate RedisTemplate redisTemplate;@Testvoid contextLoads() {//redisTemplate//opsForValue  操作字符串,类似于String//opsForSet//opsForList   操作list,类似于list//opsForHashredisTemplate.opsForValue().set("hello","word!",3, TimeUnit.SECONDS);System.out.println(redisTemplate.opsForValue().get("hello"));//删除指定的keyredisTemplate.delete("hello");//获取redis的连接对象RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();connection.flushDb();connection.flushAll();}}

关于对象的保存

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;import java.io.Serializable;/*** @author 小鸵鸟* @date 2020/11/18 9:03*/
@Component
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable  { // 对象序列化 Serializableprivate String usernName;private int age;
}

 @Testvoid test() throws JsonProcessingException {User user = new User("法外狂赌张三", 3);String newUser = new ObjectMapper().writeValueAsString(user);redisTemplate.opsForValue().set("user", newUser);System.out.println(redisTemplate.opsForValue().get("user"));}

自定义Redis Template

默认的序列化方式:

默认的序列化方式是jdk序列化,我们可能需要的序列化方式是json序列化

自定义RedisTemplate

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** @author 小鸵鸟* @date 2020/11/17 16:40*/
@Configuration
public class RedisConfig {/** 固定模板* */// 自定义RedisTemplate@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {// 为了开发方便,一般直接使用<String, Object>RedisTemplate<String, Object> template = new RedisTemplate();// 连接工厂template.setConnectionFactory(redisConnectionFactory);// JSON序列化配置 ,使用json去解析任意传过来的对象Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);// Json序列化需要ObjectMapper转义ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);// String的序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// key采用String序列化方式template.setKeySerializer(stringRedisSerializer);// hash的key也采用String的序列化方式template.setHashKeySerializer(stringRedisSerializer);// value序列化方式采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);// hash的value也采用jackson的序列化方式template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}
}

自定义RedisTemplate和RedisUtil结合使用

RedisUtil工具类

package com.he.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/**14* Redis工具类15* @author YUX
16* @date   2018年6月7日
17*/@Component
public final class RedisUtil {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;// =============================common============================/*** 26* 指定缓存失效时间* 27** @param key  键*             28* @param time 时间(秒)*             29* @return 30*/public boolean expire(String key, long time) {try {if (time > 0) {redisTemplate.expire(key, time, TimeUnit.SECONDS);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 44* 根据key 获取过期时间* 45** @param key 键 不能为null*            46* @return 时间(秒) 返回0代表为永久有效* 47*/public long getExpire(String key) {return redisTemplate.getExpire(key, TimeUnit.SECONDS);}/*** 53* 判断key是否存在* 54** @param key 键*            55* @return true 存在 false不存在* 56*/public boolean hasKey(String key) {try {return redisTemplate.hasKey(key);} catch (Exception e) {e.printStackTrace();return false;}}/*** 67* 删除缓存* 68** @param key 可以传一个值 或多个*            69*/@SuppressWarnings("unchecked")public void del(String... key) {if (key != null && key.length > 0) {if (key.length == 1) {redisTemplate.delete(key[0]);} else {redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));}}}// ============================String=============================/*** 83* 普通缓存获取* 84** @param key 键*            85* @return 值* 86*/public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}/*** 92* 普通缓存放入* 93** @param key   键*              94* @param value 值*              95* @return true成功 false失败* 96*/public boolean set(String key, Object value) {try {redisTemplate.opsForValue().set(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 109* 普通缓存放入并设置时间* 110** @param key   键*              111* @param value 值*              112* @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期*              113* @return true成功 false 失败* 114*/public boolean set(String key, Object value, long time) {try {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {set(key, value);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 130* 递增* 131** @param key   键*              132* @param delta 要增加几(大于0)*              133* @return 134*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 143* 递减* 144** @param key   键*              145* @param delta 要减少几(小于0)*              146* @return 147*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}// ================================Map=================================/*** 157* HashGet* 158** @param key  键 不能为null*             159* @param item 项 不能为null*             160* @return 值* 161*/public Object hget(String key, String item) {return redisTemplate.opsForHash().get(key, item);}/*** 167* 获取hashKey对应的所有键值* 168** @param key 键*            169* @return 对应的多个键值* 170*/public Map<Object, Object> hmget(String key) {return redisTemplate.opsForHash().entries(key);}/*** 176* HashSet* 177** @param key 键*            178* @param map 对应多个键值*            179* @return true 成功 false 失败* 180*/public boolean hmset(String key, Map<String, Object> map) {try {redisTemplate.opsForHash().putAll(key, map);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 192* HashSet 并设置时间* 193** @param key  键*             194* @param map  对应多个键值*             195* @param time 时间(秒)*             196* @return true成功 false失败* 197*/public boolean hmset(String key, Map<String, Object> map, long time) {try {redisTemplate.opsForHash().putAll(key, map);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 212* 向一张hash表中放入数据,如果不存在将创建* 213** @param key   键*              214* @param item  项*              215* @param value 值*              216* @return true 成功 false失败* 217*/public boolean hset(String key, String item, Object value) {try {redisTemplate.opsForHash().put(key, item, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 229* 向一张hash表中放入数据,如果不存在将创建* 230** @param key   键*              231* @param item  项*              232* @param value 值*              233* @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间*              234* @return true 成功 false失败* 235*/public boolean hset(String key, String item, Object value, long time) {try {redisTemplate.opsForHash().put(key, item, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 250* 删除hash表中的值* 251** @param key  键 不能为null*             252* @param item 项 可以使多个 不能为null*             253*/public void hdel(String key, Object... item) {redisTemplate.opsForHash().delete(key, item);}/*** 259* 判断hash表中是否有该项的值* 260** @param key  键 不能为null*             261* @param item 项 不能为null*             262* @return true 存在 false不存在* 263*/public boolean hHasKey(String key, String item) {return redisTemplate.opsForHash().hasKey(key, item);}/*** 269* hash递增 如果不存在,就会创建一个 并把新增后的值返回* 270** @param key  键*             271* @param item 项*             272* @param by   要增加几(大于0)*             273* @return 274*/public double hincr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, by);}/*** 280* hash递减* 281** @param key  键*             282* @param item 项*             283* @param by   要减少记(小于0)*             284* @return 285*/public double hdecr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, -by);}// ============================set=============================/*** 292* 根据key获取Set中的所有值* 293** @param key 键*            294* @return 295*/public Set<Object> sGet(String key) {try {return redisTemplate.opsForSet().members(key);} catch (Exception e) {e.printStackTrace();return null;}}/*** 306* 根据value从一个set中查询,是否存在* 307** @param key   键*              308* @param value 值*              309* @return true 存在 false不存在* 310*/public boolean sHasKey(String key, Object value) {try {return redisTemplate.opsForSet().isMember(key, value);} catch (Exception e) {e.printStackTrace();return false;}}/*** 321* 将数据放入set缓存* 322** @param key    键*               323* @param values 值 可以是多个*               324* @return 成功个数* 325*/public long sSet(String key, Object... values) {try {return redisTemplate.opsForSet().add(key, values);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 336* 将set数据放入缓存* 337** @param key    键*               338* @param time   时间(秒)*               339* @param values 值 可以是多个*               340* @return 成功个数* 341*/public long sSetAndTime(String key, long time, Object... values) {try {Long count = redisTemplate.opsForSet().add(key, values);if (time > 0)expire(key, time);return count;} catch (Exception e) {e.printStackTrace();return 0;}}/*** 355* 获取set缓存的长度* 356** @param key 键*            357* @return 358*/public long sGetSetSize(String key) {try {return redisTemplate.opsForSet().size(key);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 369* 移除值为value的* 370** @param key    键*               371* @param values 值 可以是多个*               372* @return 移除的个数* 373*/public long setRemove(String key, Object... values) {try {Long count = redisTemplate.opsForSet().remove(key, values);return count;} catch (Exception e) {e.printStackTrace();return 0;}}// ===============================list=================================/*** 386* 获取list缓存的内容* 387** @param key   键*              388* @param start 开始*              389* @param end   结束 0 到 -1代表所有值*              390* @return 391*/public List<Object> lGet(String key, long start, long end) {try {return redisTemplate.opsForList().range(key, start, end);} catch (Exception e) {e.printStackTrace();return null;}}/*** 402* 获取list缓存的长度* 403** @param key 键*            404* @return 405*/public long lGetListSize(String key) {try {return redisTemplate.opsForList().size(key);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 416* 通过索引 获取list中的值* 417** @param key   键*              418* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推*              419* @return 420*/public Object lGetIndex(String key, long index) {try {return redisTemplate.opsForList().index(key, index);} catch (Exception e) {e.printStackTrace();return null;}}/*** 431* 将list放入缓存* 432** @param key   键*              433* @param value 值*              434* @param //time  时间(秒)*              435* @return 436*/public boolean lSet(String key, Object value) {try {redisTemplate.opsForList().rightPush(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key   键* @param value 值* @param time  时间(秒)* @return*/public boolean lSet(String key, Object value, long time) {try {redisTemplate.opsForList().rightPush(key, value);if (time > 0)expire(key, time);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 467* 将list放入缓存* 468** @param key   键*              469* @param value 值*              470* @param //time  时间(秒)*              471* @return 472*/public boolean lSet(String key, List<Object> value) {try {redisTemplate.opsForList().rightPushAll(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 484* 将list放入缓存* 485* <p>* 486** @param key   键*              487* @param value 值*              488* @param time  时间(秒)*              489* @return 490*/public boolean lSet(String key, List<Object> value, long time) {try {redisTemplate.opsForList().rightPushAll(key, value);if (time > 0)expire(key, time);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 504* 根据索引修改list中的某条数据* 505** @param key   键*              506* @param index 索引*              507* @param value 值*              508* @return 509*/public boolean lUpdateIndex(String key, long index, Object value) {try {redisTemplate.opsForList().set(key, index, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 521* 移除N个值为value* 522** @param key   键*              523* @param count 移除多少个*              524* @param value 值*              525* @return 移除的个数* 526*/public long lRemove(String key, long count, Object value) {try {Long remove = redisTemplate.opsForList().remove(key, count, value);return remove;} catch (Exception e) {e.printStackTrace();return 0;}}
}

Redis配置文件详解(Redis.conf)

单位

1、配置文件unit单位对大小写不敏感!

包含 (可以包含多个配置文件)

网络

bind 127.0.0.1        # 绑定IP
protected-mode no     # 是否开启保护模式
port 6379             # 端口

通用配置 GENERAL

daemonize yes         # 以守护进程的方式运行,默认是 no,我们需要自己开启为yes!pidfile /var/run/redis_6379.pid   # 如果我们以守护进程的方式运行,需要我们指定一个pid文件#日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel noticelogfile ""       # 日志生产的文件名,如果是空,就代表输出了databases 16     # 数据库的数量,默认是16个数据库always-show-logo yes    # 是否总是显示logo

快照 SNAPSHOTTING

持久化,在规定时间内,执行了多少次操作,则会持久话到文件 .rdb文件 .aof文件

redis是内存数据库,如果没有持久化,就会断电即使失

# 如果900秒内,至少有1个key进行了修改,我们就会持久化操作!
save 900 1
# 如果300秒内,至少有10个key进行了修改,我们就会持久化操作!
save 300 10
# 如果60秒内,至少有10000个key进行了修改,我们就会持久化操作!
save 60 10000
# 我们之后学习了持久化,会自己设置# 持久化操作出错了,是否还会进行工作!
stop-writes-on-bgsave-error yes
# 是否压缩rdb文件 ,需要消耗cpu资源
rdbcompression yes
# 保存rdb文件的时候是否会经行错误的校验
rdbchecksum yesdir ./    # rdb文件生成的目录

REPLICATION 复制 (主从复制)

安全 SECURITY

可以在这里设置redis的密码,默认是没有密码的。

127.0.0.1:6379> ping                          # 设置的密码是ping不同的
(error) NOAUTH Authentication required.
127.0.0.1:6379> config get requirepass
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth 121566                   # 验证密码
OK
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> config get requirepass        # 获取密码
1) "requirepass"
2) "121566"
127.0.0.1:6379> config get requirepass 123456 # 设置密码
OK

还可以在配置文件中设置

限制 CLIENTS

maxclients 10000     # 设置最大客户端连接数量为10000    maxmemory <bytes>    # 配置最大内存容量maxmemory-policy noeviction  # 内存达到上限怎么处理1、volatile-lru:只对设置了过期时间的key进行LRU(默认值) 2、allkeys-lru : 删除lru算法的key   3、volatile-random:随机删除即将过期key   4、allkeys-random:随机删除   5、volatile-ttl : 删除即将过期的   6、noeviction : 永不过期,返回错误

APPEND ONLY MODE 模式 (AOF配置)

appendonly no      # 默认是不开启AOP模式的 ,默认是使用rdb持久化的,在大部分情况下,rdb够用了!appendfilename "appendonly.aof"     # 持久化文件的名字# appendfsync always                # 每次修改都会同步,消耗性能
appendfsync everysec                # 每秒同步一次,可能会丢失这1秒的值
# appendfsync no                    # 不执行 sync , 这个时候系统自己同步数据,速度最快!

Redis持久化

RDB(Redis DataBase)

什么是RDB?

在指定时间间隔后,将内存中的数据集快照写入数据库 ;在恢复时候,直接读取快照文件,进行数据的恢复 ;
默认情况下, Redis 将数据库快照保存在名字为 dump.rdb的二进制文件中。文件名可以在配置文件中进行自定义。

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。
这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是RDB,一般情况下不需要修改这个配置! 有时候在生产环境我们会将这个文件进行备份!
rdb保存的文件是dump.rdb 都是在我们的配置文件中快照中进行配置的!

工作原理:

在进行 RDB 的时候,redis 的主线程是不会做 io 操作的,主线程会 fork 一个子线程来完成该操作;

  1. Redis 调用forks。同时拥有父进程和子进程。
  2. 子进程将数据集写入到一个临时 RDB 文件中。
  3. 当子进程完成对新 RDB 文件的写入时,Redis 用新 RDB 文件替换原来的 RDB 文件,并删除旧的 RDB 文件。

这种工作方式使得 Redis 可以从写时复制(copy-on-write)机制中获益(因为是使用子进程进行写操作,而父进程依然可以接收来自客户端的请求。)

触发机制:

  1. save的规则满足的情况下,会自动触发rdb原则
  2. 执行flushall命令,也会触发我们的rdb原则
  3. 退出redis,也会自动产生rdb文件

save:

使用 save 命令,会立刻对当前内存中的数据进行持久化 ,但是会阻塞,也就是不接受其他操作了;

由于 save 命令是同步命令,会占用Redis的主进程。若Redis数据非常多时,save命令执行速度会非常慢,阻塞所有客户端的请求。

示意图:

flushall命令:

flushall 命令也会触发持久化 ;

触发持久化规则
满足配置条件中的触发条件 ;

可以通过配置文件对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时, 自动进行数据集保存操作。


bgsave:

bgsave 是异步进行,进行持久化的时候,redis 还可以将继续响应客户端请求 ;


bgsave和save对比

命令 save bgsave
IO类型 同步 异步
阻塞? 是(阻塞发生在fock(),通常非常快)
复杂度 O(n) O(n)
优点 不会消耗额外的内存 不阻塞客户端命令
缺点 阻塞客户端命令 需要fock子进程,消耗内存

如何恢复rdb文件!

1、只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb 恢复其中的数据!
2、查看需要存在的位置

127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin" # 如果在这个目录下存在 dump.rdb 文件,启动就会自动恢复其中的数据

优缺点

优点:

  1. 适合大规模的数据恢复
  2. 对数据的完整性要求不高

缺点:

  1. 需要一定的时间间隔进行操作,如果redis意外宕机了,这个最后一次修改的数据就没有了。
  2. fork进程的时候,会占用一定的内存空间。

AOF(Append Only File)

快照功能(RDB)并不是非常耐久(durable): 如果 Redis 因为某些原因而造成故障停机, 那么服务器将丢失最近写入、以及未保存到快照中的那些数据。 从 1.1 版本开始, Redis 增加了一种完全耐久的持久化方式: AOF 持久化。

以日志的形式来记录每个写的操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作。

appendonly no yes则表示启用AOF

默认是不开启的,我们需要手动配置,然后重启redis,就可以生效了!

如果这个aof文件有错位,这时候redis是启动不起来的,我需要修改这个aof文件

redis给我们提供了一个工具redis-check-aof --fix

appendonly yes  # 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分的情况下,rdb完全够用
appendfilename "appendonly.aof"
appendfsync always # 每次修改都会sync 消耗性能appendfsync everysec # 每秒执行一次 sync 可能会丢失这一秒的数据appendfsync no # 不执行 sync ,这时候操作系统自己同步数据,速度最快

优缺点

优点

  1. 每一次修改都会同步,文件的完整性会更加好
  2. 没秒同步一次,可能会丢失一秒的数据
  3. 从不同步,效率最高

缺点

  1. 相对于数据文件来说,aof远远大于rdb,修复速度比rdb慢!
  2. Aof运行效率也要比rdb慢,所以我们redis默认的配置就是rdb持久化

扩展

1、RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储
2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
4、同时开启两种持久化方式

  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
  • RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。

5、性能建议

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留 save 900 1 这条规则。
  • 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
  • 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。

如何选择使用哪种持久化方式?

一般来说, 如果想达到足以媲美 PostgreSQL 的数据安全性, 你应该同时使用两种持久化功能。

如果你非常关心你的数据, 但仍然可以承受数分钟以内的数据丢失, 那么你可以只使用 RDB 持久化。

有很多用户都只使用 AOF 持久化, 但并不推荐这种方式: 因为定时生成 RDB 快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快。

Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。微信、 微博、关注系统! Redis 客户端可以订阅任意数量的频道。

订阅/发布消息图: 第一个:消息发送者, 第二个:频道 第三个:消息订阅者!


下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

命令

  • PSUBSCRIBE pattern [pattern..] 订阅一个或多个符合给定模式的频道。
  • PUNSUBSCRIBE pattern [pattern..] 退订一个或多个符合给定模式的频道。
  • PUBSUB subcommand [argument[argument]] 查看订阅与发布系统状态。
  • PUBLISH channel message 向指定频道发布消息
  • SUBSCRIBE channel [channel..] 订阅给定的一个或多个频道。
  • UNSUBSCRIBE channel [channel..] 退订一个或多个频道

测试

订阅端:

127.0.0.1:6379> SUBSCRIBE xiaohe                # 订阅xiaohe频道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"                                  # 订阅成功的消息
2) "xiaohe"
3) (integer) 1
# 接收到来自xiaohe频道的消息 "hello,word"
1) "message"
2) "xiaohe"
3) "hello,word"
# 接收到来自xiaohe频道的消息 "hello,redis"
1) "message"
2) "xiaohe"
3) "hello,redis"

发送端:

127.0.0.1:6379> PUBLISH xiaohe "hello,word"   # 发布消息到xiaohe频道
(integer) 1
127.0.0.1:6379> PUBLISH xiaohe "hello,redis"  # 发布消息到xiaohe频道
(integer) 1
127.0.0.1:6379>

SpringBoot基于Redis实现简单的发布订阅功能:https://blog.csdn.net/hou_ge/article/details/107102673

原理

Redis是使用C实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。

Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。

每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息,其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。

客户端订阅,就被链接到对应频道的链表的尾部,退订则就是将客户端节点从链表中移除。

缺点

  1. 如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
  2. 这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。

应用

  1. 消息订阅:公众号订阅,微博关注等等(起始更多是使用消息队列来进行实现)
  2. 多人在线聊天室。

稍微复杂的场景,我们就会使用消息中间件MQ处理。

Redis集群搭建

只配置从库,不用配置主库。

127.0.0.1:6379> info replication                     # 查看当前库的信息
# Replication
role:master           # 角色 master
connected_slaves:0    # 没有从机
master_replid:732c831233baecd0b570df6a7006a99cda46df1a
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6379>

1、复制3个配置文件,然后修改对应的配置信息。

修改以下:

​ 1、端口

​ 2、pid名字

​ 3、log文件名字

​ 4、dump.rdb名字

启动3个redis服务

ps -ef|grep redis

主从复制

概念:

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(Master/Leader),后者称为从节点(Slave/Follower), 数据的复制是单向的!只能由主节点复制到从节点(主节点以写为主、从节点以读为主)。

默认情况下,每台Redis服务器都是主节点,一个主节点可以有0个或者多个从节点,但每个从节点只能由一个主节点。

作用:

  1. 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余的方式。
  2. 故障恢复:当主节点故障时,从节点可以暂时替代主节点提供服务,是一种服务冗余的方式
  3. 负载均衡:在主从复制的基础上,配合读写分离,由主节点进行写操作,从节点进行读操作,分担服务器的负载;尤其是在多读少写的场景下,通过多个从节点分担负载,提高并发量。
  4. 高可用基石:主从复制还是哨兵和集群能够实施的基础。

为什么使用集群?

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下:

1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;

2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。

电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。

主从复制,读写分离! 80% 的情况下都是在进行读操作!减缓服务器的压力!架构中经常使用! 一主二从!

只要在公司中,主从复制就是必须要使用的,因为在真实的项目中不可能单机使用Redis!

总结:

  1. 单台服务器难以负载大量的请求
  2. 单台服务器故障率高,系统崩坏概率大
  3. 单台服务器内存容量有限。

一主二从配置

默认情况下,每台Redis服务器都是主节点;我们一般情况下只用配置从机就好了!

认老大!一主(79)二从(80,81)

使用SLAVEOF host port就可以为从机配置主机了。

如果redis的主机设置了密码,需要在从机的config文件中进行配置
对从机的config文件中的masterauth位置进行配置

masterauth  youpassword


使用命令配置一主二从,是暂时的

# 6380 从机
27.0.0.1:6380> SLAVEOF 127.0.0.1 6379
OK
127.0.0.1:6380> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:5
master_sync_in_progress:0
slave_repl_offset:0
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:3cf29f7644a86c4773e074632712f29a8601d174
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:0
127.0.0.1:6380> # 6381从机
127.0.0.1:6381> SLAVEOF 127.0.0.1 6379
OK
127.0.0.1:6381>
127.0.0.1:6381> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:2
master_sync_in_progress:0
slave_repl_offset:182
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:cbd44d989acabc01661bb5a1ce8788193b4474a8
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:182
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:169
repl_backlog_histlen:14
127.0.0.1:6381> # 主机
127.0.0.1:6379>  info replication
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6380,state=online,offset=182,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=182,lag=0
master_replid:cbd44d989acabc01661bb5a1ce8788193b4474a8
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:182
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:182
127.0.0.1:6379>

在配置文件中配,是永久的

说明

  • connected_slaves:2 # 多了从机的配置
  • slave0:ip=127.0.0.1,port=6380,state=online,offset=42,lag=1 # 多了从机的配置

真实的从主配置应该在配置文件中配置,这样的话是永久的,我们这里使用的是命令,暂时的!

使用规则:

1.从机只能读,不能写,主机可读可写但是多用于写。

127.0.0.1:6381> set name sakura # 从机6381写入失败
(error) READONLY You can't write against a read only replica.
127.0.0.1:6380> set name sakura # 从机6380写入失败(error) READONLY You can't write against a read only replica.127.0.0.1:6379> set name sakuraOK127.0.0.1:6379> get name"sakura"

2.当主机断电宕机后,默认情况下从机的角色不会发生变化 ,集群中只是失去了写操作,当主机恢复以后,又会连接上从机恢复原状。

3.当从机断电宕机后,若不是使用配置文件配置的从机,再次启动后作为主机是无法获取之前主机的数据的,若此时重新配置称为从机,又可以获取到主机的所有数据。这里就要提到一个同步原理。

4.第二条中提到,默认情况下,主机故障后,不会出现新的主机,有两种方式可以产生新的主机:

  • 从机手动执行命令slaveof no one,这样执行以后从机会独立出来成为一个主机
  • 使用哨兵模式(自动选举)

如果没有老大了,这个时候能不能选择出来一个老大呢?手动!

如果主机断开了连接,我们可以使用SLAVEOF no one让自己变成主机!其他的节点就可以手动连接到最新的主节点(手动)!

如果这个时候老大修复了,那么就重新连接!

复制原理

Slave 启动成功连接到 master 后会发送一个sync同步命令

Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步

但是只要是重新连接master,一次完全同步(全量复制)将被自动执行! 我们的数据一定可以在从机中看到!

哨兵模式(自动选取老大)

概述:

主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

哨兵的作用:

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

开始测试

1、新建一个config文件

2、配置哨兵配置文件 sentinel.conf

# sentinel monitor 被监控的名称 host port 1
sentinel monitor myredis 127.0.0.1 6379 1
sentinel auth-pass myredis 121566 #(如果设置了密码,主机密码和从机密码必须一致!)

后面的这个数字1,代表主机挂了,slave投票看让谁接替成为主机,票数最多的,就会成为主机!

3、启动哨兵! redis-sentinel hconfig/sentinel.con

4、此时哨兵监视着我们的主机6379,当我们断开主机后:

选取新的主机,原先的主机也被加入到了从机中,重新启动成为新主机的从机

哨兵模式的优缺点:

优点:

  1. 哨兵集群,基于主从复制模式,所有主从复制的优点,它都有
  2. 主从可以切换,故障可以转移,系统的可用性更好
  3. 哨兵模式是主从模式的升级,手动到自动,更加健壮

缺点:

  1. Redis不好在线扩容,集群容量一旦达到上限,在线扩容就十分麻烦
  2. 实现哨兵模式的配置其实是很麻烦的,里面有很多配置项

哨兵模式的全部配置

完整的哨兵模式配置文件 sentinel.conf

# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379port 26379# 哨兵sentinel的工作目录dir /tmp# 哨兵sentinel监控的redis主节点的 ip port# master-name  可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了# sentinel monitor <master-name> <ip> <redis-port> <quorum>sentinel monitor mymaster 127.0.0.1 6379 1# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码# sentinel auth-pass <master-name> <password>sentinel auth-pass mymaster MySUPER--secret-0123passw0rd# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒# sentinel down-after-milliseconds <master-name> <milliseconds>sentinel down-after-milliseconds mymaster 30000# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,# 这个数字越小,完成failover所需的时间就越长,# 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。# 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。# sentinel parallel-syncs <master-name> <numslaves>sentinel parallel-syncs mymaster 1# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:# 1. 同一个sentinel对同一个master两次failover之间的间隔时间。# 2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。# 3.当想要取消一个正在进行的failover所需要的时间。# 4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了# 默认三分钟# sentinel failover-timeout <master-name> <milliseconds>sentinel failover-timeout mymaster 180000# SCRIPTS EXECUTION配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。对于脚本的运行结果有以下规则:若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。通知脚本sentinel notification-script <master-name> <script-path>sentinel notification-script mymaster /var/redis/notify.sh客户端重新配置主节点参数脚本当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。以下参数将会在调用脚本时传给脚本:<master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>目前<state>总是“failover”,<role>是“leader”或者“observer”中的一个。参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的这个脚本应该是通用的,能被多次调用,不是针对性的。sentinel client-reconfig-script <master-name> <script-path>sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

Redis知识点整理相关推荐

  1. Redis知识点整理(详讲)

    Redis整理 方便工作中复习,学习巩固. 缓存Cache 缓存的概念 缓存是存储在计算机上的一个原始数据复制集,以便于访问. Web项目常见的缓存场景 缓存击穿 概念: 对于一些设置了过期时间的ke ...

  2. mysql 全面知识点_Mysql知识点整理

    1.存储引擎区别 MyISAM:不支持事物.仅支持表级锁.支持B+树索引 MEMORY:不支持事物.仅支持表级锁.支持B+树和HASH索引 InnoDB:支持事物.支持行级锁.支持B+树索引 2.锁机 ...

  3. 高级 Java 面试通关知识点整理

    转载自 高级 Java 面试通关知识点整理 1.常用设计模式 单例模式:懒汉式.饿汉式.双重校验锁.静态加载,内部类加载.枚举类加载.保证一个类仅有一个实例,并提供一个访问它的全局访问点. 代理模式: ...

  4. RocketMQ知识点整理

    RocketMQ知识点整理 一.消息队列 二.RocketMQ简介 RocketMQ-组件 RocketMQ架构: 三. RocketMQ理解性问题整理 1.使用消息中间件之前需要先了解"同 ...

  5. 软考高级系统分析师知识点整理

    系统分析师知识点整理 信息化战略体系 企业战略规划:企业如何达到目标 信息系统战略规划:信息系统如何支撑这些目标 信息技术战略规划(IT战略规划):需要哪些信息技术支撑信息系统 信息资源规划:信息化建 ...

  6. Java进阶3 - 易错知识点整理(待更新)

    Java进阶3 - 易错知识点整理(待更新) 该章节是Java进阶2- 易错知识点整理的续篇: 在前一章节中介绍了 ORM框架,中间件相关的面试题,而在该章节中主要记录关于项目部署中间件,监控与性能优 ...

  7. springboot知识点整理(2021)

    springboot知识点整理 参考网址 https://mp.weixin.qq.com/s/GjjxJt8OauumW_J2ps7tow Demo 脚手架项目地址: https://github. ...

  8. 面试题引出的知识点整理

    1.自旋锁&可重复锁&公平锁&共享锁&分段锁你都知道吗? 2.无锁&偏向锁&轻量级锁&重量级锁如何膨胀升级? 3.Lock底层AQS实现与Syn ...

  9. 知识点整理,Java基础面试题(一)

    写在前面 整理了一些互联网大厂的面试题,这些面试题经常会被问到,也是作为Java工程师需要掌握的一些知识点,毕竟理论和实践的结合,才是王道,分片整理,每天嗑些知识点,快乐每一天,如果对你有帮助,记得点 ...

最新文章

  1. oracle union 最多_用户来稿:我就是那个在优买计划赚钱最多的男人
  2. ecshop goods.php,重命名ecshop的商品页goods.php为shangpin.php
  3. html页面之间传参乱码,急求教,在两个htm页面传参数时中文出现了乱码,试了网上的方法不管用。_html/css_WEB-ITnose...
  4. AJAX异步--ajax请求
  5. C#——后台管理端多级菜单的生成方式
  6. 软考 | 2015年上半年 软件设计师 下午试卷
  7. 修改CPAN配置文件
  8. 固态装linux,Linux下安装SSD固态卡
  9. 人工智能有哪些方向?什么方向有前景?
  10. 商淘软件WSTMart怎么样
  11. 小学生python趣味编程-【少儿编程】python趣味编程第二课:写文字
  12. 终于来了!新版本M4压不住枪了?刺激战场雪地地图最强灵敏度
  13. 面对疫情,我们普通人能做什么?
  14. 图解通信原理与案例分析-31:量子通信,信息的传输载体由确定性的宏观世界走向不确定性的微观世界
  15. 第二章 预习导图 部署DNS服务
  16. 免费的PCB打样平台汇总,每月免费打8块板!【建议收藏】
  17. MSP430单片机在3V与5V混合系统中的逻辑接口技术
  18. 关于 电脑分配IP地址可以连接局域网但无法上互联网 的解决方法
  19. 计算机音乐我还是曾经那个少年,我还是曾经那个少年
  20. 潭州学院html学习(day07)

热门文章

  1. 一个简单的Vue过滤器
  2. 为什么要用malloc申请空间
  3. 使用设计模式出任CEO迎娶白富美(2)--老板就喜欢聊设计模式?
  4. 机器学习 --- 朴素贝叶斯分类器 python
  5. 23 PPT图表链接和动画
  6. 16.数据统计之数据分组方法
  7. 2022中国(青岛)国际储能技术及应用展览会,山东风力发电展
  8. HC-SR04超声波模块程序原理和Proteus ISIS仿真
  9. 计算机c语言二级题库及答案txt,计算机二级c语言题库及答案
  10. javaScript系列笔记 - JS实现MP4播放