redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池
参考资料 :分布式中间件实战:java版 (书籍), 多线程视频教程(视频)…
项目启动环境
导入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.1.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.7.graal</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
添加配置类
从spring容器中获取bean
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;@Component
public class BeanContext implements ApplicationContextAware {@Autowiredprivate static ApplicationContext applicationContext;public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {BeanContext.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext(){return applicationContext;}@SuppressWarnings("unchecked")public static <T> T getBean(String name) throws BeansException {return (T)applicationContext.getBean(name);}public static <T> T getBean(Class<T> clz) throws BeansException {return (T)applicationContext.getBean(clz);}
}
redisson配置类,当然你也可以写在yml里面 或者 是配置集群啥的
@Configuration
public class RedissonConfig {@Autowiredprivate Environment environment ;@Beanpublic RedissonClient config(){Config config = new Config();config.setCodec(new org.redisson.client.codec.StringCodec());config.useSingleServer().setConnectionPoolSize(50);//设置对于master节点的连接池中连接数最大为500config.useSingleServer().setIdleConnectionTimeout(100000);//如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。config.useSingleServer().setConnectTimeout(300000);//同任何节点建立连接时的等待超时。时间单位是毫秒。config.useSingleServer().setTimeout(30000);//等待节点回复命令的时间。该时间从命令发送成功时开始计时。 切记这里要序列化 ,不然 fastjson
// 不能反序列化 阻塞队列中的string元素Codec codec = new JsonJacksonCodec();config.setCodec(codec);// 你的IPconfig.useSingleServer().setAddress("redis://***.***.***.**:6379").setKeepAlive(true) ;return Redisson.create(config) ;}}
ok~~,项目环境搭建完了
发布订阅
生产者(消息发布者)
@Slf4j
@Component
public class UserLoginPublisher {public static final String TOPICKEYLOGINUSER = "UserLoginKey" ;@Autowiredprivate RedissonClient redissonClient ;public void sendMsg(UserLoginDto user){try {log.info("准备发送消息 ~~");// TOPICKEYLOGINUSER : 是一个string ,是订阅的主题RTopic clientTopic = redissonClient.getTopic(TOPICKEYLOGINUSER,new SerializationCodec());// 消息发布(这里是异步的形式)clientTopic.publishAsync(user);long l = clientTopic.countSubscribers();System.out.println(l);} catch (Exception e) {log.info("消息发送失败 ~~:{}",e);}}
}
消费者(订阅者)
订阅者的 redissonClient.getTopic(“UserLoginKey”,new SerializationCodec()); 要和 发布者的 topic 一样
import com.alibaba.fastjson.JSON;
import com.example.redissiontest.dto.UserLoginDto;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.codec.SerializationCodec;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;@Slf4j
@Component
//implements ApplicationRunner, Ordered 是实现 线程的一种方式 当然你也可以 继承 runnable
// CommandLineRunner也可以在spring启动的时候进行执行
public class UserLoginSubscriber implements ApplicationRunner, Ordered {private static final String TOPICKEYLOGINUSER = UserLoginPublisher.TOPICKEYLOGINUSER;@Autowiredprivate RedissonClient redissonClient ;@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("线程正在运行 ~~");try {RTopic topic = redissonClient.getTopic("UserLoginKey",new SerializationCodec());topic.addListener(UserLoginDto.class, new MessageListener<UserLoginDto>() {@Overridepublic void onMessage(CharSequence charSequence, UserLoginDto userLoginDto) {String s = JSON.toJSONString(userLoginDto);System.out.println("收到消息:"+s);}});} catch (Exception e) {e.printStackTrace();}}//开局运行@Overridepublic int getOrder() {return 0;}
}
测试
@Testvoid contextLoadsPublish() {UserLoginDto loginDto = new UserLoginDto("cunk", "109922", 1);userLoginPublisher.sendMsg(loginDto);}
消息队列
生产者
基于Redis的分布式队列Queue是Redisson提供的又一个功能组件,按照不同的特性,分布式队列Queue还可以分为双端队列Deque、阻塞队列Blocking Queue、有界阻塞队列(Bounded Blocking Queue)、阻塞双端队列(Blocking Deque)、阻塞公平队列(Blocking Fair Queue)、阻塞公平双端队列(Blocking Fair Deque)等功能组件,不同的功能组件其作用不尽相同,适用的业务场景也是不一样的。
在实际业务场景中,不管是采用哪一种功能组件作为“队列”,其底层核心的执行逻辑仍旧是借助“基于发布-订阅式的主题”来实现的
注意这里有个消息重试机制 , 消息的发送和接收需要 是同一个队列里面
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class QueuePublisher {@Autowiredprivate RedissonClient redissonClient ;String key = QueueConsumer.queueName ;public void sendMSG(String msg){try {RQueue<Object> queue = redissonClient.getQueue(key);queue.add(msg) ;log.info("消息队列发送消息成功~~");} catch (Exception e) {log.info("消息队列发送消息失败~~");// 消息重试RQueue<Object> queue = redissonClient.getQueue(key);queue.add(msg) ;}}}
消费者
@Slf4j
@Component
public class QueueConsumer implements ApplicationRunner, Ordered {public static final String queueName = "redssionName" ;@Autowiredprivate RedissonClient redissonClient ;@Overridepublic void run(ApplicationArguments args) throws Exception {RQueue<String> queue = redissonClient.getQueue(queueName);while (true){String msg = queue.poll();if (msg!=null){log.info("消息队列监听到消息:{}",msg);}}}//@Overridepublic int getOrder() {return -1;}
}
测试
@GetMapping("/test/{msg}")public String queue(@PathVariable("msg") String msg) {try {queuePublisher.sendMSG(msg);} catch (Exception e) {//发送失败 重试queuePublisher.sendMSG(msg);}return "ok" ;}
延迟队列
用户的需求是多样化的,永远不会按照程序员的思路走!在实际的生产环境中,仍旧存在着需要处理不同 TTL(即过期时间/存活时间)的业务数据的场景,为了解决此种业务场景,Redisson提供了“延迟队列”这个强大的功能组件,它可以解决RabbitMQ死信队列出现的缺陷,即不管在什么时候,消息将按照 TTL从小到大的顺序先后被真正的队列监听、消费,其在实际项目中的执行流程如图
生产者
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class RedissonDelayQueuePublisher {@AutowiredRedissonClient redissonClient ;public static final String QUEUE_DELAY_KEY = "delayqueueKey";public void sendDelayMsg(String msg,Long ttl){//阻塞队列RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue(QUEUE_DELAY_KEY);//延迟队列RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(msg,ttl, TimeUnit.SECONDS);log.info("延迟队列 , 阻塞队列生成");}}
消费者
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@EnableScheduling
@Component
@Slf4j
public class RedissonDelayQueueConsumer {@Autowiredprivate RedissonClient redissonClient ;private String delayqueueKey = RedissonDelayQueuePublisher.QUEUE_DELAY_KEY;@Scheduled(cron = "0/1 * * * * ?")public void consumerMsg() throws InterruptedException {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(delayqueueKey);Object msg = blockingDeque.take();if (msg!=null){log.info("从消息队列中取出消息:{}",(String)msg);}}}
测试
@Autowiredprivate RedissonDelayQueuePublisher redissonDelayQueuePublisher ;@AutowiredRedissonDelayQueueConsumer redissonDelayQueueConsumer ;@GetMapping("/delaytest")public void queue() throws InterruptedException {for (int i = 0; i < 10; i++) {Random random = new Random();int ttl = random.nextInt(10);Long ttlTime = Long.valueOf(ttl);String msg ="这是一条消息,他的延迟时间是:"+ttl ;redissonDelayQueuePublisher.sendDelayMsg(msg,ttlTime);}}
分布式线程池
我的思路 1. redisson 延迟队列实现线程池(将普通线程池中的阻塞队列换成redisson的阻塞队列就行)
2.把普通线程池 改造成立 阻塞队列基于 redisson的分布式阻塞队列 , 线程池变成了一个线程不停监听 redisson 。虽然中间有很 多插曲 ,大体设计就是这样
线程池代码
建议先看看我的这篇文章 手搓线程池
import com.alibaba.fastjson.JSON;
import com.example.redissiontest.config.BeanContext;
import com.sun.org.apache.xpath.internal.operations.Bool;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;import java.util.HashSet;
import java.util.concurrent.BlockingQueue;@Component
@Slf4j
public class ThreadPool implements Runnable {RedissonClient redissonClient = BeanContext.getBean(RedissonClient.class);private Boolean flage =false ;private int coreSize ;private long timeOut ;private BlockingQueue<String> blockQueue ;private BlockingQueue<String> blockQueuework ;private HashSet<Worker> workers = new HashSet<>() ;public ThreadPool() {RBlockingQueue<String> blockQueue = redissonClient.getBlockingQueue("ThreadPool");RBlockingQueue<String> blockQueuework = redissonClient.getBlockingQueue("workQueue");this.coreSize = 4;this.blockQueue =blockQueue ;this.blockQueuework = blockQueuework ;}//执行任务public void execute(Task task) throws InterruptedException {//任务数量没有超过线程数量 ,交给 work执行//else//任务数超过coreSize的时候 加入任务队列暂存synchronized (workers){if (workers.size() < coreSize){Worker worker = new Worker(task);log.debug("新增work",worker);worker.start();workers.add(worker) ;}else {log.debug("加入任务队列",task);String taskStr = JSON.toJSONString(task);blockQueuework.put(taskStr);}}}//work :线程 ,(用来执行任务的)class Worker extends Thread {private Task task ;public Worker(Task task) {this.task = task;}@SneakyThrows@Overridepublic void run() {//执行任务//当task不为空 ,执行完毕//当task执行完毕,接着从任务队列获取任务while (task!=null ||( task = JSON.parseObject(blockQueuework.take(), Task.class))!=null){try{log.debug("正在执行...{}",task);task.run();}catch (Exception e){e.printStackTrace();}finally {task = null ;if (blockQueuework.size() == 0){log.info("线程池没有任务 ,阻塞中.....");}}}synchronized (workers){log.debug("work被移除...{}",task);workers.remove(this) ;}}}@SneakyThrows@Overridepublic void run() {while (true){String take = blockQueue.take();Task task = JSON.parseObject(take, Task.class);this.execute(task);}}}
测试代码
@GetMapping("/threadPool")public void queuePool() throws InterruptedException {RBlockingQueue<String> blockQueue = redissonClient.getBlockingQueue("ThreadPool");for (int i = 0; i < 15; i++) {Task task = new Task();String taskS = JSON.toJSONString(task);blockQueue.put(taskS);}}
当然 你也可以使用策略模式增加他的 功能 ,你也实现一下把~~~
而且他的拓展性非常好, 你可以创建更多不同类型的线程池进行粘合进去,反正 所有线程池都是监听的一条阻塞队列 ,当然你还可以
拓展 将 不同类型的任务对象 放入不同类型的 线程池中。
redisson究极爽文-手把手带你实现redisson的发布订阅,消息队列,延迟队列(死信队列),(模仿)分布式线程池相关推荐
- 手把手教你学Dapr - 6. 发布订阅
介绍 发布/订阅模式允许微服务使用消息相互通信.生产者或发布者在不知道哪个应用程序将接收它们的情况下向主题发送消息.这涉及将它们写入输入通道.同样,消费者或订阅者订阅该主题并接收其消息,而不知道是什么 ...
- 【手把手带你学Java EE】多线程那些事,你了解了吗?
[手把手带你学Java EE]多线程那些事,你了解了吗? 线程 概念 意义 进程和线程的区别 面试题:谈谈进程和线程的区别和联系 Java中的多线程编程 创建线程的方法 方法一 方法二 方法三 &am ...
- 一文带你熟透Java线程池的使用及源码
在单个线程使用过程中遇到的问题(new Thread().start): 线程的频繁创建与销毁 线程执行数据多且高频,频繁CPU上下文切换,造成CPU的资源浪费 以上种种问题,让我们不禁想到,怎么复用 ...
- 一文搞懂线程池原理——Executor框架详解
文章目录 1 使用线程池的好处 2 Executor 框架 2.1 Executor 框架结构 2.2 Executor 框架使用示意图 2.3 Executor 框架成员 2.3.1 Executo ...
- 群晖nas介绍文档_手把手带你玩转NAS 篇八:NAS文档随身带——多终端文件同步介绍(群晖drive篇)...
手把手带你玩转NAS 篇八:NAS文档随身带--多终端文件同步介绍(群晖drive篇) 2020-01-08 15:23:44 24点赞 214收藏 31评论 你是AMD Yes党?还是intel和N ...
- 01 手把手带你构建大规模分布式服务--高并发、高可用架构系列,高质量原创好文!...
作者:丁浪,目前在创业公司担任高级技术架构师.曾就职于阿里巴巴大文娱和蚂蚁金服.具有丰富的稳定性保障,全链路性能优化的经验.架构师社区特邀嘉宾! 阅读本(系列)文章,你将会收获: 全面.体系化的了解大 ...
- Yhen手把手带你使用百度智能云②----文字识别
声明: 以下内容为本人原创,仅供用于参考学习 禁止用于商业及违法用途 ·作者:@Yhen ·原文网站:CSDN ·原文链接:https://blog.csdn.net/Yhen1/article/de ...
- android智能音响,大屏+安卓系统 智能音响的究极形态?
有人说,音乐是上天赐予人类的奇妙礼物.对于我而言,音乐几乎和空气一样重要--无音乐不成活!当你对一种事物痴迷时,自然就会变身"处女座",处处较真.作为一名音乐发烧友,这么多年烧在音 ...
- 00后电竞女学霸直博中科院,本科武大王者全国16强,网友:现实版爽文女主角...
点击上方"视学算法",选择加"星标"或"置顶" 重磅干货,第一时间送达 梦晨 丰色 发自 凹非寺 量子位 | 公众号 QbitAI 武汉大学 ...
最新文章
- 南大和中大“合体”拯救手残党:基于GAN的PI-REC重构网络,“老婆”画作有救了 | 技术头条...
- python 手动读取cifar10_Python读入CIFAR-10数据库
- 网络流三·二分图多重匹配 HihoCoder - 1393
- Weblogic10 集群配置
- Android:如何生成自己的keystore(zz)
- 【LeetCode笔记】399. 除法求值(Java、图)
- 资产分池中的量化实战
- Linux centosVMware xshell使用xftp传输文件、使用pure-ftpd搭建ftp服务
- 【最短路】Walls
- mysql保存表出错1075_navicat出现错误1075怎么办
- c++ websocket客户端_ESP32 Arduino教程:Websocket客户端
- Hermite(埃尔米特)插值法 | 插值多项式+ 插值余项
- 从百度指数到微信指数,我们正进入“数据世界”
- 压缩ppt文件大小的方法
- zabbix_sender安装和使用
- vscode ssh连接服务器报错:过程试图写入的管道不存在
- 微信小程序(微信支付回调函数)
- ace 官网地址以及相关的下载地址--防止自己忘记
- HTTPS证书认证过程(CA)
- 作为一个数学专业的学生,我是怎么看待编程的?
热门文章
- Python软件编程等级考试二级——20220618
- 一个绝对有用的网站!
- python海龟绘图小猪佩奇_海龟绘图(Turtle Graphics)
- 比心app源码,html 获取时间
- 【渲染管线】关于透明度混合blend
- 阿里实习生招聘笔试题目
- 【嵌入式Linux开发一路清障-连载03】Ubuntu22.04使用Mount加载硬盘或NAS等硬盘
- 华为鸿蒙OS摄像头,华为放大招:首发屏下摄像头+鸿蒙OS
- 华为双前置摄像头_华为nova7 Pro的前置是两个摄像头吗?自拍功能咋样
- Excel 快速合并多行数据为一行