ZSET、Geo 、 Stream

  • redis zset数据结构
    • 常用命令
    • 排行榜
      • 步骤一、初始化1个月的历史数据
      • 步骤二:定时刷新数据
      • 步骤3:排行榜查询接口
  • GeoHash
    • 命令
    • 附近酒店搜索实现
    • 关注Pull推送
      • PULL 与PUSH的差别
    • pull 技术方案
      • 为什么个人列表和关注列表采用zset集合
      • 基于pull技术,实现微博个人列表
      • 基于pull技术,实现微博关注列表
    • Stream
      • 命令
    • IM聊天室
    • 布隆过滤器
      • 实现BloomFilter

redis zset数据结构

  • zset 是 set 的一个升级版本,它在 set 的基础上增加了一个顺序属性,它和 set 一样,zset也是 string 类型元素的集合,且不允许重复的成员,不同的是每个元素都会关联一个 double类型的 score。
  • 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。 集合中最大的成员数为 2的(32 - 1)次方 (4294967295, 每个集合可存储40多亿个成员)。
  • zset 最经典的应用场景就是排行榜。

常用命令

  • ZADD
    ZADD key score member [[score member] [score member] …]
    将一个或多个member元素及其score值加入到有序集key当中。
  • ZRANGE
    ZRANGE key start stop [WITHSCORES]
    返回有序集key中,指定区间内的成员。
案例:创业公司招进了4个员工,分别为: alex 工资2000元  tom工资5000元 jack工资6000元 阿甘1000元,请按工资升序排序
39.100.196.99:6379> zadd salary 2000  alex  5000 tom 6000 jack 1000 agan
(integer) 4
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1000"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
(1.21s)
  • ZREM
    ZREM key member [member …]
    移除有序集key中的一个或多个成员,不存在的成员将被忽略。
案例:创业公司 tom离职了
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1000"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
(1.21s)
39.100.196.99:6379> zrem salary tom
(integer) 1
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1000"
3) "alex"
4) "2000"
5) "jack"
6) "6000"
  • ZCARD
    ZCARD key
    返回有序集key的基数。
案例:创业公司 有多少人
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1000"
3) "alex"
4) "2000"
5) "jack"
6) "6000"
39.100.196.99:6379> zcard salary
(integer) 3
  • ZCOUNT
    ZCOUNT key min max
    返回有序集key中,score值在min和max之间(默认包括score值等于min或max)的成员。
案例:创业公司老板问你 ,工资在2000 至 6000有多少人
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1000"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
39.100.196.99:6379> ZCOUNT salary 2000 6000
(integer) 3
  • ZSCORE
    ZSCORE key member
    返回有序集key中,成员member的score值。
案例:创业公司老板问你 ,阿甘的工资是多少 ?
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1000"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
39.100.196.99:6379> zscore salary agan
"1000"
  • ZINCRBY
    ZINCRBY key increment member
    为有序集key的成员member的score值加上增量increment。
案例:创业公司老板说阿甘表现很好,给他加500元吧
39.100.196.99:6379> ZINCRBY salary 500 agan
"1500"
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1500"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
  • ZREVRANGE
    ZREVRANGE key start stop [WITHSCORES]
    返回有序集key中,指定区间内的成员,降序。
案例:创业公司老板说经济不好,成本太大,看工资最多的是哪些人?
39.100.196.99:6379> zrange salary 0 -1 withscores  #升序
1) "agan"
2) "1500"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
39.100.196.99:6379> ZREVRANGE salary 0 -1 withscores #降序
1) "jack"
2) "6000"
3) "tom"
4) "5000"
5) "alex"
6) "2000"
7) "agan"
8) "1500"
  • ZRANGEBYSCORE 取某个范围score的member,可以用于分页查询
    ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

返回有序集key中,所有score值介于min和max之间(包括等于min或max)的成员。有序集成员按score值递增(从小到大)次序排列。

案例:创业公司老板要给工资低的人加薪水,老板要求先看低于5000元的有哪些人?人多的话分页查看
39.100.196.99:6379> ZREVRANGE salary 0 -1 withscores
1) "jack"
2) "6000"
3) "tom"
4) "5000"
5) "alex"
6) "2000"
7) "agan"
8) "1500"
39.100.196.99:6379> ZRANGEBYSCORE salary 1 5000
1) "agan"
2) "alex"
3) "tom"
39.100.196.99:6379> ZRANGEBYSCORE salary 1 5000 LIMIT 0 2
1) "agan"
2) "alex"
39.100.196.99:6379> ZRANGEBYSCORE salary 1 5000 LIMIT 2 2
1) "tom"
  • ZREVRANGEBYSCORE 和上面的功能意义,但是这次是降序的
    ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
    返回有序集key中,score值介于max和min之间(默认包括等于max或min)的所有的成员。有序集成员按score值递减(从大到小)的次序排列。

  • ZRANK 取某个member的排名,升序
    ZRANK key member

案例:创业公司老板要查,工资从低到高,查某个员工排第几名?
9.100.196.99:6379> ZREVRANGE salary 0 -1 withscores
1) "jack"
2) "6000"
3) "tom"
4) "5000"
5) "alex"
6) "2000"
7) "agan"
8) "1500"
(0.75s)
39.100.196.99:6379> ZRANK salary agan
(integer) 0
  • ZREVRANK 取某个member的排名,降序
    ZREVRANK key member

  • ZREMRANGEBYRANK 移除指定排名(rank)区间内的所有成员。
    ZREMRANGEBYRANK key start stop

案例:经济不好,老板要裁员了,把工资最低的2个人裁掉
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1500"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
39.100.196.99:6379> ZREMRANGEBYRANK salary 0 1
(integer) 2
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "tom"
2) "5000"
3) "jack"
4) "6000"

ZREMRANGEBYRANK 移除指定排名(rank)区间内的所有成员。
ZREMRANGEBYRANK key start stop

案例:经济不好,老板要裁员了,把工资最低的2个人裁掉
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "agan"
2) "1500"
3) "alex"
4) "2000"
5) "tom"
6) "5000"
7) "jack"
8) "6000"
39.100.196.99:6379> ZREMRANGEBYRANK salary 0 1
(integer) 2
39.100.196.99:6379> zrange salary 0 -1 withscores
1) "tom"
2) "5000"
3) "jack"
4) "6000"
  • ZINTERSTORE 求交集
    ZINTERSTORE destination numkeys key [key …] [WEIGHTS weight [weight …]] [AGGREGATE SUM|MIN|MAX]
    计算给定的一个或多个有序集的交集,其中给定key的数量必须以numkeys参数指定,并将该交集(结果集)储存到destination。
39.100.196.99:6379> zadd group1 10 a 20 b 30 c
(integer) 3
39.100.196.99:6379> zadd group2 10 x 20 y 30 z 20 c
(integer) 4
39.100.196.99:6379> ZINTERSTORE group3 group1 group2
(error) ERR value is not an integer or out of range
39.100.196.99:6379> ZINTERSTORE group3 2 group1 group2
(integer) 1
39.100.196.99:6379> zrange group3 0 -1 withscores
1) "c"
2) "50"
  • ZUNIONSTORE 求并集
    ZUNIONSTORE destination numkeys key [key …] [WEIGHTS weight [weight …]] [AGGREGATE SUM|MIN|MAX]
    计算给定的一个或多个有序集的并集,其中给定key的数量必须以numkeys参数指定,并将该并集(结果集)储存到destination。
39.100.196.99:6379> ZUNIONSTORE group4  2 group1 group2
(integer) 6
39.100.196.99:6379> zrange group4 0 -1 withscores1) "a"2) "10"3) "x"4) "10"5) "b"6) "20"7) "y"8) "20"9) "z"
10) "30"
11) "c"
12) "50"

排行榜

技术模拟思路:
采用26个英文字母来实现排行,随机为每个字母生成一个随机数作为score
为了更好的体验,先做几件事:

  1. 先初始化1个月的历史数据
  2. 定时5秒钟,模拟微博的热度刷新(例如模拟点赞 收藏 评论的热度值更新)
  3. 定时1小时合并统计 天、周、月的排行榜。

步骤一、初始化1个月的历史数据

@Service
@Slf4j
public class InitService {@Autowiredprivate RedisTemplate redisTemplate;/*** 先初始化1个月的历史数据*/public void init30day(){//计算当前的小时keylong hour=System.currentTimeMillis()/(1000*60*60);//初始化近30天,每天24个keyfor(int i=1;i<24*30;i++){//倒推过去30天String  key=Constants.HOUR_KEY+(hour-i);this.initMember(key);System.out.println(key);}}/***初始化某个小时的key*/public void initMember(String key) {Random rand = new Random();//采用26个英文字母来实现排行,随机为每个字母生成一个随机数作为scorefor(int i = 1;i<=26;i++){this.redisTemplate.opsForZSet().add(key,String.valueOf((char)(96+i)),rand.nextInt(10));}}}

步骤二:定时刷新数据


@Service
@Slf4j
public class TaskService {@Autowiredprivate RedisTemplate redisTemplate;/***2. 定时5秒钟,模拟微博的热度刷新(例如模拟点赞 收藏 评论的热度值更新)* 3. 定时1小时合并统计 天、周、月的排行榜。*/@PostConstructpublic void init(){log.info("启动初始化 ..........");
//        2. 定时5秒钟,模拟微博的热度刷新(例如模拟点赞 收藏 评论的热度值更新)new Thread(()->this.refreshDataHour()).start();
//        3. 定时1小时合并统计 天、周、月的排行榜。new Thread(()->this.refreshData()).start();}/***采用26个英文字母来实现排行,随机为每个字母生成一个随机数作为score*/public void refreshHour(){//计算当前的小时keylong hour=System.currentTimeMillis()/(1000*60*60);//为26个英文字母来实现排行,随机为每个字母生成一个随机数作为scoreRandom rand = new Random();for(int i = 1;i<=26;i++){//redis的ZINCRBY 新增这个积分值this.redisTemplate.opsForZSet().incrementScore(Constants.HOUR_KEY+hour,String.valueOf((char)(96+i)),rand.nextInt(10));}}/***刷新当天的统计数据*/public void refreshDay(){long hour=System.currentTimeMillis()/(1000*60*60);List<String> otherKeys=new ArrayList<>();//算出近24小时内的keyfor(int i=1;i<23;i++){String  key=Constants.HOUR_KEY+(hour-i);otherKeys.add(key);}//把当前的时间key,并且把后推23个小时,共计近24小时,求出并集存入Constants.DAY_KEY中//redis ZUNIONSTORE 求并集this.redisTemplate.opsForZSet().unionAndStore(Constants.HOUR_KEY+hour,otherKeys,Constants.DAY_KEY);//设置当天的key 40天过期,不然历史数据浪费内存for(int i=0;i<24;i++){String  key=Constants.HOUR_KEY+(hour-i);this.redisTemplate.expire(key,40, TimeUnit.DAYS);}log.info("天刷新完成..........");}/***刷新7天的统计数据*/public void refreshWeek(){long hour=System.currentTimeMillis()/(1000*60*60);List<String> otherKeys=new ArrayList<>();//算出近7天内的keyfor(int i=1;i<24*7-1;i++){String  key=Constants.HOUR_KEY+(hour-i);otherKeys.add(key);}//把当前的时间key,并且把后推24*7-1个小时,共计近24*7小时,求出并集存入Constants.WEEK_KEY中this.redisTemplate.opsForZSet().unionAndStore(Constants.HOUR_KEY+hour,otherKeys,Constants.WEEK_KEY);log.info("周刷新完成..........");}/***刷新30天的统计数据*/public void refreshMonth(){long hour=System.currentTimeMillis()/(1000*60*60);List<String> otherKeys=new ArrayList<>();//算出近30天内的keyfor(int i=1;i<24*30-1;i++){String  key=Constants.HOUR_KEY+(hour-i);otherKeys.add(key);}//把当前的时间key,并且把后推24*30个小时,共计近24*30小时,求出并集存入Constants.MONTH_KEY中this.redisTemplate.opsForZSet().unionAndStore(Constants.HOUR_KEY+hour,otherKeys,Constants.MONTH_KEY);log.info("月刷新完成..........");}/***定时1小时合并统计 天、周、月的排行榜。*/public void refreshData(){while (true){//刷新当天的统计数据this.refreshDay();
//            刷新7天的统计数据this.refreshWeek();
//            刷新30天的统计数据this.refreshMonth();//TODO 在分布式系统中,建议用xxljob来实现定时try {Thread.sleep(1000*60*60);} catch (InterruptedException e) {e.printStackTrace();}}}/***定时5秒钟,模拟微博的热度刷新(例如模拟点赞 收藏 评论的热度值更新)*/public void refreshDataHour(){while (true){this.refreshHour();//TODO 在分布式系统中,建议用xxljob来实现定时try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

步骤3:排行榜查询接口

@RestController
@Slf4j
public class Controller {@Autowiredprivate RedisTemplate redisTemplate;@GetMapping(value = "/getHour")public Set getHour() {long hour=System.currentTimeMillis()/(1000*60*60);//ZREVRANGE 返回有序集key中,指定区间内的成员,降序。Set<ZSetOperations.TypedTuple<Integer>> rang= this.redisTemplate.opsForZSet().reverseRangeWithScores(Constants.HOUR_KEY+hour,0,30);return rang;}@GetMapping(value = "/getDay")public Set getDay() {Set<ZSetOperations.TypedTuple<Integer>> rang= this.redisTemplate.opsForZSet().reverseRangeWithScores(Constants.DAY_KEY,0,30);return rang;}@GetMapping(value = "/getWeek")public Set getWeek() {Set<ZSetOperations.TypedTuple<Integer>> rang= this.redisTemplate.opsForZSet().reverseRangeWithScores(Constants.WEEK_KEY,0,30);return rang;}@GetMapping(value = "/getMonth")public Set getMonth() {Set<ZSetOperations.TypedTuple<Integer>> rang= this.redisTemplate.opsForZSet().reverseRangeWithScores(Constants.MONTH_KEY,0,30);return rang;}
}

一般情况下,我们浏览各大网站时,点击某篇文章,博客,帖子,其阅读量就会+1,或者点击 点赞按钮,又或是评论数量;
这些都会根据热度算法,计算其热度;

GeoHash

Redis 3.2开始,Redis基于geohash和zset提供了地理位置相关功能。
Geohash是一种地址编码,它能把二维的经纬度编码成一维的字符串

命令

  • GEOADD
    将给定的位置对象(纬度、经度、名字)添加到指定的key;
39.100.196.99:6379> geoadd hotel 113.9807127428 22.5428248089 "世界之窗" 113.9832042690 22.5408496326 "南山威尼斯酒店" 114.0684865267 22.5412294122 "福田喜来登酒店" 114.3135524539 22.5999265998 "大梅沙海景酒店" 113.9349465491 22.5305488659 "南山新年酒店" 114.0926367279 22.5497917634 "深圳华强广场酒店"
6
39.100.196.99:6379> zrange hotel 0 -1
南山新年酒店
世界之窗
南山威尼斯酒店
福田喜来登酒店
深圳华强广场酒店
大梅沙海景酒店

注:

  1. 这里我们采用的是中文存储,如果出现了乱码,redis命令的登录命令加上 --raw
    例如: ./redis-cli --raw
  • GEOPOS
    从key里面返回所有给定位置对象的位置(经度和纬度);
39.100.196.99:6379> GEOPOS hotel "世界之窗"
113.98071080446243286
22.54282525199023013
  • GEOHASH:
    返回一个或多个位置对象的Geohash表示;
39.100.196.99:6379> GEOHASH hotel "世界之窗"
ws101xy1rp0
  • GEODIST key member1 member2 [unit]
    返回两个给定位置之间的距离;
    指定单位的参数 unit 必须是以下单位的其中一个:
    – m 表示单位为米。
    – km 表示单位为千米。
    – mi 表示单位为英里。
    – ft 表示单位为英尺。
39.100.196.99:6379> GEODIST hotel "世界之窗"  "南山威尼斯酒店" m
337.4887
  • GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
    给定一个经纬度,然后以半径为中心,计算出半径内的数据。
39.100.196.99:6379> GEORADIUS hotel 113.9410499639 22.5461508801 10 km WITHDIST WITHCOORD count 10
南山新年酒店
1.8451
113.93494695425033569
22.53054959741555052
世界之窗
4.0910
113.98071080446243286
22.54282525199023013
南山威尼斯酒店
4.3704
113.98320525884628296
22.54085070420710224

– WITHDIST: 在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。 距离的单位和用户给定的范围单位保持一致。
– WITHCOORD: 将位置元素的经度和维度也一并返回。
– WITHHASH: 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试, 实际中的作用并不大。
– ASC、DESC 排序方式,按照距离的 升序、降序排列
– STORE key1 把结果存入key1,zset格式,以坐标hash为score
– STOREDIST key2 把结果存入key2,zset格式,以距离为score

  • GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
    GEORADIUSBYMEMBER 和 GEORADIUS 一样的功能,区别在于,GEORADIUS是以经纬度去查询,而GEORADIUSBYMEMBER是以当前集合中的某个member元素来查询
39.100.196.99:6379> GEORADIUSBYMEMBER hotel "世界之窗" 10 km WITHDIST WITHCOORD count 10
世界之窗
0.0000
113.98071080446243286
22.54282525199023013
南山威尼斯酒店
0.3375
113.98320525884628296
22.54085070420710224
南山新年酒店
4.8957
113.93494695425033569
22.53054959741555052
福田喜来登酒店
9.0190
114.06848877668380737
22.54122837765984144

附近酒店搜索实现

@RestController
@Slf4j
public class Controller {@Autowiredprivate RedisTemplate redisTemplate;@GetMapping(value = "/init")public void init() {Map<String, Point> map= Maps.newHashMap();map.put("世界之窗",new Point(113.9807127428,22.5428248089));map.put("南山威尼斯酒店",new Point(113.9832042690 ,22.5408496326));map.put("福田喜来登酒店" ,new Point(114.0684865267,22.5412294122));map.put("大梅沙海景酒店",new Point(114.3135524539 ,22.5999265998));map.put("南山新年酒店",new Point(113.9349465491,22.5305488659));map.put("深圳华强广场酒店",new Point(114.0926367279 ,22.5497917634));this.redisTemplate.opsForGeo().add(Constants.HOTEL_KEY,map);}@GetMapping(value = "/position")public Point position(String member) {//获取经纬度坐标List<Point> list= this.redisTemplate.opsForGeo().position(Constants.HOTEL_KEY,member);return list.get(0);}@GetMapping(value = "/hash")public String hash(String member) {//geohash算法生成的base32编码值List<String> list= this.redisTemplate.opsForGeo().hash(Constants.HOTEL_KEY,member);return list.get(0);}@GetMapping(value = "/distance")public Distance distance(String member1, String member2) {Distance distance= this.redisTemplate.opsForGeo().distance(Constants.HOTEL_KEY,member1,member2, RedisGeoCommands.DistanceUnit.KILOMETERS);return distance;}/*** 通过经度,纬度查找附近的*/@GetMapping(value = "/radiusByxy")public GeoResults radiusByxy() {//这个坐标是腾讯大厦位置Circle circle = new Circle(113.9410499639, 22.5461508801, Metrics.KILOMETERS.getMultiplier());//返回50条RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50);GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults= this.redisTemplate.opsForGeo().radius(Constants.HOTEL_KEY,circle, args);return geoResults;}/*** 通过地方查找附近*/@GetMapping(value = "/radiusByMember")public GeoResults radiusByMember() {String member="世界之窗";//返回50条RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().includeCoordinates().sortAscending().limit(50);//半径10公里内Distance distance=new Distance(10, Metrics.KILOMETERS);GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults= this.redisTemplate.opsForGeo().radius(Constants.HOTEL_KEY,member, distance,args);return geoResults;}}

关注Pull推送

场景:每个用户都有一个关注微博列表List,和个人微博列表List;
明星发表一条微博、如果使用 Redis List数据结构, 就需要先获取明星的粉丝集合,再将微博的ID发送到粉丝的List,如果粉丝的用户量不大,就几十万,还是勉强可以支撑的。适合中小型并发。但是像大明星,粉丝数量几千万,将明星微博的ID推送Push到粉丝的关注List,这个过程太耗时间,会直接把服务器给卡死了。
而且明星实时在线粉丝数量可能只有百分之一,也就是几十万。push操作短时间内看来相当于做了无用功;

替代方案:使用zset pull 推送,每个用户都有一个关注微博列表Zset , 和个人列表Zset;
当用户登录后,主动去查询关注用户的微博;并将他们的微博放到自己的关注微博列表Zset里面;

PULL 与PUSH的差别

push : 每次用户发微博都要异步推送给每个粉丝的关注列表;
pull :每个粉丝查看关注微博列表,都需要主动去关注人的个人微博列表下拉取,再存储到自己的关注微博列表里;

选择pull 方式, 需要自己去关注人的个人微博列表下拉取最新微博,这种方式可以通过客户端定时轮询服务端,查询最新的微博;

pull 技术方案

  1. 用户发微博、先写入DB、再写入Redis, 使用Hash数据结构存储微博 、 key = post::id
  2. 异步推送到个人微博列表Zset;

为什么个人列表和关注列表采用zset集合

拉取微博是根据刷新时间t进行过滤的,
使用List集合的话,只能用将微博的发表时间和id一起转化为JSON字符串(或者将将ID写入list),再写入List, 获取数据时,需要根据时间进行过滤,就意味着,我们并不知道微博的发表时间,只能根据List的微博ID拉取全部微博JSON字符串,再获取发表时间,再进行过滤,太麻烦了,效率也很低;
使用Zset集合,将发表时间作为score, id作为value,查询时,根据score进行排序,进行过滤,进行分页查询,比List方便一点;

基于pull技术,实现微博个人列表

PullContentController


@Api(description = "微博内容:pull功能")
@RestController
@RequestMapping("/pull-content")
public class PullContentController {@Autowiredprivate PullContentService contentService;@ApiOperation(value="用户发微博")@PostMapping(value = "/post")public void post(@RequestBody ContentVO contentVO) {Content content=new Content();BeanUtils.copyProperties(contentVO,content);contentService.post(content);}@ApiOperation(value="获取个人列表")@GetMapping(value = "/homeList")public PageResult<Content> getHomeList(Integer userId, int page, int size){return  this.contentService.homeList(userId,page,size);}}

PullContentService

@Slf4j
@Service
public class PullContentService extends ContentService{/*** 用户发微博*/public void post(Content obj){Content temp=this.addContent(obj);this.addMyPostBox(temp);}/*** 发布微博的时候,加入到我的个人列表*/public void addMyPostBox(Content obj){String key= Constants.CACHE_MY_POST_BOX_ZSET_KEY+obj.getUserId();//按秒为单位long score=obj.getCreateTime().getTime()/1000;this.redisTemplate.opsForZSet().add(key,obj.getId(),score);}/*** 获取个人列表*/public PageResult<Content> homeList(Integer userId, int page, int size){PageResult<Content> pageResult=new PageResult();List<Integer> list=new ArrayList<>();long start = (page - 1) * size;long end = start + size - 1;try {String key= Constants.CACHE_MY_POST_BOX_ZSET_KEY+userId;//1.设置总数long total=this.redisTemplate.opsForZSet().zCard(key);pageResult.setTotal(total);//2.分页查询//redis ZREVRANGESet<ZSetOperations.TypedTuple<Integer>> rang= this.redisTemplate.opsForZSet().reverseRangeWithScores(key,start,end);for (ZSetOperations.TypedTuple<Integer> obj:rang){list.add(obj.getValue());log.info("个人post集合value={},score={}",obj.getValue(),obj.getScore());}//3.去拿明细数据List<Content> contents=this.getContents(list);pageResult.setRows(contents);}catch (Exception e){log.error("异常",e);}return pageResult;}}

基于pull技术,实现微博关注列表

/*** 刷新拉取用户关注列表* 用户第一次刷新或定时刷新 触发*/private void refreshAttentionBox(int userId){//获取刷新的时间String refreshkey=Constants.CACHE_REFRESH_TIME_KEY+userId;Long ago=(Long) this.redisTemplate.opsForValue().get(refreshkey);//如果时间为空,取2天前的时间if (ago==null){//当前时间long now=System.currentTimeMillis()/1000;//当前时间减去2天ago=now-60*60*24*2;}//提取该用户的关注列表String followerkey=Constants.CACHE_KEY_FOLLOWEE+userId;Set<Integer> sets= redisTemplate.opsForSet().members(followerkey);log.debug("用户={}的关注列表={}",followerkey,sets);//当前时间long now=System.currentTimeMillis()/1000;String attentionkey= Constants.CACHE_MY_ATTENTION_BOX_ZSET_KEY+userId;for (Integer id:sets){//去关注人的个人主页,拿最新微博String key= Constants.CACHE_MY_POST_BOX_ZSET_KEY+id;Set<ZSetOperations.TypedTuple<Integer>> rang= this.redisTemplate.opsForZSet().rangeByScoreWithScores(key,ago,now);if(!CollectionUtils.isEmpty(rang)){//加入我的关注post集合 就是通过上次刷新时间计算出最新的微博,写入关注zset集合;再更新刷新时间this.redisTemplate.opsForZSet().add(attentionkey,rang);}}//关注post集合 只留1000个//计算post集合,总数long count=this.redisTemplate.opsForZSet().zCard(attentionkey);//如果大于1000,就剔除多余的postif(count>1000){long end=count-1000;//redis ZREMRANGEBYRANKthis.redisTemplate.opsForZSet().removeRange(attentionkey,0,end);}long days=this.redisTemplate.getExpire(attentionkey,TimeUnit.DAYS);if(days<10){//设置30天过期this.redisTemplate.expire(attentionkey,30,TimeUnit.DAYS);}this.redisTemplate.opsForValue().set(refreshkey,now);}

Stream

Redis 5.0推出了一个新的数据结构:Stream。Stream就是一个流处理 的数据结构.
基于流处理的数据结构,它的功能应用于类似IM的聊天工具和典型的消息队列。
Redis 的Stream几乎满足了消息队列具备的全部内容,包括但不限于:
1.消息ID的序列化生成
2.消息遍历
3.消息的阻塞和非阻塞读取
4.消息的分组消费
5.未完成消息的处理
6.消息队列监控

xadd:向Stream追加消息
xdel:从Stream中删除消息,删除仅仅是设置标志位,不影响消息总长度。
xrange:获取Stream中的消息列表,自动过滤已经删除的消息。-表示最小值,+表示最大值。
xlen:获取Stream的消息长度,所有在链表中存在的消息
del:删除整个Stream中的所有消息。

命令

  • stream生产消息
    XADD,命令用于在某个stream(流数据)中追加消息,语法如下:
xadd key ID field string [field string ...]

需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。
ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。
field string [field string], 就是当前消息内容,由1个或多个key-value构成。

39.100.196.99:6379> xadd message * hello agan
"1587196058726-0"  #消息ID
39.100.196.99:6379> xadd message * hello agan2
"1587196063320-0"
39.100.196.99:6379> xadd message * hello agan3
"1587196067111-0"
  • stream独立消费

从消息队列中获取消息,XREAD,消费消息

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

[COUNT count],用于限定获取的消息数量
[BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式
ID,用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0),
消息队列javaID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用,表示最新的消息ID。

39.100.196.99:6379> xread STREAMS message 0
1) 1) "message"2) 1) 1) "1587196058726-0"2) 1) "hello"2) "agan"2) 1) "1587196063320-0"2) 1) "hello"2) "agan2"3) 1) "1587196067111-0"2) 1) "hello"2) "agan3"

消息ID的原理
上文这个消息id 1587196058726-0,是redis 生成的消息id。
它由2部分组成:时间戳-序号。时间戳是毫秒级,序号是为了防止相同时间内生成的id重复。

39.100.196.99:6379> multi
OK
39.100.196.99:6379> xadd message * hello agan1
QUEUED
39.100.196.99:6379> xadd message * hello agan2
QUEUED
39.100.196.99:6379> xadd message * hello agan3
QUEUED
39.100.196.99:6379> exec
1) "1587198991846-0"
2) "1587198991846-1"
3) "1587198991846-2"

IM聊天室

@Api
@RequestMapping("/im")
@RestController
@Slf4j
public class ImController {@AutowiredRedisTemplate redisTemplate;public final String room_key = "room::";@GetMapping("/init")public Object init(){Long increment = redisTemplate.opsForValue().increment("room::id");return increment;}@GetMapping("/add")public RecordId add(String roomId , String username , String content){String key = room_key + roomId;HashMap<Object, Object> map = new HashMap<>();map.put(username , content);RecordId recordId = redisTemplate.opsForStream().add(key, map);return  recordId;}public List read(String roomId , String username , String content){String key = room_key + roomId;StreamOffset<String> streamOffset = StreamOffset.create(key, ReadOffset.latest());StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofMinutes(10));List list = redisTemplate.opsForStream().read(options, streamOffset);return list;}
}

布隆过滤器

  • 布隆过滤器底层的存储结构是位图 、
  • 判断一个key是否存在,就先通过几个hash函数得出在位图中的位置,判断位置上的值是否为1,全部为1,则存在,如果有一个为0 ,则说明该key不存在;
  • 布隆过滤器典型应用场景就是 防止缓存穿透;

实现BloomFilter

添加Jedis依赖

        <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>
package com.redis.zset.filter;import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;public class BloomFilterHelper<T> {private int numHashFunctions;private int bitSize;private Funnel<T> funnel;public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) {Preconditions.checkArgument(funnel != null, "funnel不能为空");this.funnel = funnel;bitSize = optimalNumOfBits(expectedInsertions, fpp);numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);}public int[] murmurHashOffset(T value) {int[] offset = new int[numHashFunctions];long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong();int hash1 = (int) hash64;int hash2 = (int) (hash64 >>> 32);for (int i = 1; i <= numHashFunctions; i++) {int nextHash = hash1 + i * hash2;if (nextHash < 0) {nextHash = ~nextHash;}offset[i - 1] = nextHash % bitSize;}return offset;}/*** 计算bit数组长度*/private int optimalNumOfBits(long n, double p) {if (p == 0) {p = Double.MIN_VALUE;}return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));}/*** 计算hash方法执行次数*/private int optimalNumOfHashFunctions(long n, long m) {return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));}}

public class RedisBloomFilter <T>{private Jedis jedis;public RedisBloomFilter(Jedis jedis) {this.jedis = jedis;}public<T> void addByBloomFilter(BloomFilterHelper<T> filterHelper, String key, T value) {int[] offset = filterHelper.murmurHashOffset(value);for ( int i :offset) {jedis.setbit(key,i , true);}}public<T> Boolean includeByBloomFilter(BloomFilterHelper<T> filterHelper, String key, T value) {int[] offset = filterHelper.murmurHashOffset(value);for (int i = 0; i <offset.length ; i++) {if (!jedis.getbit(key,offset[i])){return false;}}return true;}
}

public class JedisBloomFilterTest {public static void main(String[] args) throws IOException {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(10);jedisPoolConfig.setMaxIdle(5);jedisPoolConfig.setMinIdle(2);// timeout,这里既是连接超时又是读写超时,从Jedis 2.8开始有区分connectionTimeout和soTimeout的构造函数JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.0.60", 6380, 3000, null);Jedis jedis = null;try {//从redis连接池里拿出一个连接执行命令jedis = jedisPool.getResource();//******* Redis测试布隆方法 ********BloomFilterHelper<CharSequence> bloomFilterHelper = new BloomFilterHelper<>(Funnels.stringFunnel(Charset.defaultCharset()), 1000, 0.1);RedisBloomFilter<Object> redisBloomFilter = new RedisBloomFilter<>(jedis);int j = 0;for (int i = 0; i < 100; i++) {redisBloomFilter.addByBloomFilter(bloomFilterHelper, "bloom", i+"");}for (int i = 0; i < 1000; i++) {boolean result = redisBloomFilter.includeByBloomFilter(bloomFilterHelper, "bloom", i+"");if (!result) {j++;}}System.out.println("漏掉了" + j + "个");} catch (Exception e) {e.printStackTrace();} finally {//注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。if (jedis != null)jedis.close();}}
}

Redis核心数据结构ZSET、GeoHash 、 Stream--排行榜、消息Pull推送、附近搜索、布隆过滤器 、IM聊天室相关推荐

  1. php消息实时推送技术,基于HTTP协议之WEB消息实时推送技术原理及实现

    很早就想写一些关于网页消息实时推送技术方面的文章,但是由于最近实在忙,没有时间去写文章.本文主要讲解基于 HTTP1.1 协议的 WEB 推送的技术原理及实现.本人曾经在工作的时候也有做过一些用到网页 ...

  2. redis核心数据结构以及他的应用场景

    文章目录 redis核心数据结构 1.string字符串 1.1应用场景 2.hash哈希 2.1应用场景 2.2优缺点 3.list数组列表 3.1应用场景 4.set集合 4.1应用场景 5.zs ...

  3. Redis核心数据结构List应用场景-商品列表、缓存击穿、PV阅读量、抢红包、推送帖子、普通分布式锁、Redis可重入锁与红锁

    List应用场景 Redis之List 一. Redis list命令实战 二.商品列表 高并发的淘宝聚划算实现技术方案 SpringBoot+Redis实现商品列表功能 二.缓存击穿 什么是缓存击穿 ...

  4. netty服务器定时发送消息,netty+websocket+quartz实现消息定时推送

    netty+websocket+quartz实现消息定时推送&&IM聊天室 在讲功能实现之前,我们先来捋一下底层的原理,后面附上工程结构及代码 1.NIO NIO主要包含三大核心部分: ...

  5. 快递企业如何完成运单订阅消息的推送

    经常网购的朋友,会实时收到运单状态的提醒信息,这些提醒信息包括微信推送,短信推送,邮件推送,支付宝生活窗推送,QQ推送等,信息内容主要包括快件到哪里,签收等信息的提醒,这些友好的提醒信息会极大的增强购 ...

  6. RabbitMQ(九):RabbitMQ 延迟队列,消息延迟推送(Spring boot 版)

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  7. RabbitMQ 延迟队列,消息延迟推送

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  8. 模版消息智能推送!我们教你发得更快更多更省

    「模板消息」能力,几乎是小程序触达用户的唯一渠道.有了它,运营人员才有足够的发挥空间,来提高用户的留存,转化. 微信为了防止有人用模板消息频繁骚扰用户,推出了一套严格的监督机制.一旦发现有人滥用,立刻 ...

  9. 仿淘宝开放平台之消息服务——消息中心推送消息至消费者

    消息中心收到生产者推送过来的业务消息外,一方面,需要给生产者推送一条消息确认的响应消息:另一方面,则需要根据消息主题,查找所有订阅该主题的消费者(实际就是消息客户端),将消息复制及转发出去. ​ 这种 ...

最新文章

  1. 颜色传感器TCS230的使用
  2. mapreduce编程实例python-使用Python语言写Hadoop MapReduce程序
  3. [luoguP1005] 矩阵取数游戏(DP + 高精度)
  4. 让secureCRT正确显示中文
  5. WIN7 运行“计算机管理”出现c:\windows\system32\compmgmt.msc没有被指定在...”错误 解决办法...
  6. Android源码分析(三)-----系统框架设计思想
  7. virtualbox win7虚拟机启动exe提示“DX11 could not switch resolution”解决方案
  8. B2C模式电商案例分享
  9. 01-初探MQ-MQ的三大使用场景:应用解耦、异步提速、削峰填谷
  10. 【论文笔记】视频物体检测(VID)系列 FGFA:Flow-Guided Feature Aggregation for Video Object Detection
  11. 数字图像处理:空间相关与卷积操作
  12. [python爬虫] bilibili视频评论翻页功能
  13. 38个优秀博客站点推荐
  14. 用python做一个表白神器_30秒教会你用Python制作520表白神器
  15. Cadence的PSPICE中CCCS,VCCS,CCVS和VCVS受控源元件库
  16. SQL Server数据库性能优化(三)之 硬件瓶颈分析
  17. C# asp.net .netcore 单层和双层PDF转为图片
  18. 【思维 构造】CodeForces - 148C Terse princess
  19. FPGA学习——基于Verilog实现的多功能时钟
  20. 单片机按键防抖程序_单片机独立按键使用程序

热门文章

  1. 鹅厂招人啦!限量内推码和面试直通卡!助你直拿Offer(内附岗位介绍)
  2. Qt编写视频监控系统69-录像计划(支持64通道7*24录像设置)
  3. [陈鹏导师精益项目实战]华东区电机企业精益生产项目第五期启动
  4. Windows远程代码执行漏洞(CVE-2020-16898) 高危漏洞加固指南
  5. 大数据学习,Scala快速学习的方法
  6. 如何选择PXI与PXIe开关模块
  7. JAVA高考加油,2017年高考加油祝福语
  8. 提取PDF内容保存到Excel--Python实现
  9. NOJ 1581.最佳加法式
  10. 【小睿精选·第八期】为NBA球员提供预警新冠肺炎的智能戒指Oura Ring