Spring Boot 项目 RedisTemplate 实现轻量级消息队列
背景公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
excel文件读写--阿里easyexcel sdk
文件上传、下载--腾讯云对象存储
远程服务调用--restTemplate
生产者、消费者--redisTemplate leftPush和rightPop操作
异步处理数据--Executors线程池
读取网络文件流--HttpClient
自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口
当然, Java实现咯 涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库DAO层放到另一个模块了, 不是本文重点
三、主要maven依赖
1、easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>${easyexcel-latestVersion}</version></dependency>
2、JWT
<dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.7.0</version></dependency>
3、redis
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-redis</artifactId><version>1.3.5.RELEASE</version></dependency>
4、腾讯cos
<dependency><groupId>com.qcloud</groupId><artifactId>cos_api</artifactId><version>5.4.5</version></dependency>
四、流程
用户上传文件
将文件存储到腾讯cos
将上传后的文件id及上传记录保存到数据库
redis生产一条导入消息, 即保存文件id到redis
请求结束, 返回"处理中"状态
redis消费消息
读取cos文件, 异步处理数据
将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
客户端轮询查询处理状态, 并可以下载错误文件
结束
五、实现效果
上传文件
数据库导入记录
导入的数据
下载错误文件
错误数据提示
查询导入记录
六、代码实现
1、导入excel控制层
@LoginRequired@RequestMapping(value = "doImport", method = RequestMethod.POST)public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {PLUser user = getUser(request);return orderImportService.doImport(file, user.getId());}
2、service层
@Overridepublic JsonResponse doImport(MultipartFile file, Integer userId) {if (null == file || file.isEmpty()) {throw new ServiceException("文件不能为空");}String filename = file.getOriginalFilename();if (!checkFileSuffix(filename)) {throw new ServiceException("当前仅支持xlsx格式的excel");}// 存储文件String fileId = saveToOss(file);if (StringUtils.isBlank(fileId)) {throw new ServiceException("文件上传失败, 请稍后重试");}// 保存记录到数据库saveRecordToDB(userId, fileId, filename);// 生产一条订单导入消息redisProducer.produce(RedisKey.orderImportKey, fileId);return JsonResponse.ok("导入成功, 处理中...");}/*** 校验文件格式* @param fileName* @return*/private static boolean checkFileSuffix(String fileName) {if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {return false;}int pointIndex = fileName.lastIndexOf(".");String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();if (".xlsx".equals(suffix)) {return true;}return false;}/*** 将文件存储到腾讯OSS* @param file* @return*/private String saveToOss(MultipartFile file) {InputStream ins = null;try {ins = file.getInputStream();} catch (IOException e) {e.printStackTrace();}String fileId;try {String originalFilename = file.getOriginalFilename();File f = new File(originalFilename);inputStreamToFile(ins, f);FileSystemResource resource = new FileSystemResource(f);MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();param.add("file", resource);ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);fileId = (String) responseResult.getData();} catch (Exception e) {fileId = null;}return fileId;}
3、redis生产者
@Service
public class RedisProducerImpl implements RedisProducer {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic JsonResponse produce(String key, String msg) {Map<String, String> map = Maps.newHashMap();map.put("fileId", msg);redisTemplate.opsForList().leftPush(key, map);return JsonResponse.ok();}}
4、redis消费者
@Service
public class RedisConsumer {@Autowiredpublic RedisTemplate redisTemplate;@Value("${txOssFileUrl}")private String txOssFileUrl;@Value("${txOssUploadUrl}")private String txOssUploadUrl;@PostConstructpublic void init() {processOrderImport();}/*** 处理订单导入*/private void processOrderImport() {ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(() -> {while (true) {Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);if (null == object) {continue;}String msg = JSON.toJSONString(object);executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));}});}}
5、处理任务线程类
public class OrderImportTask implements Runnable {public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {this.msg = msg;this.txOssFileUrl = txOssFileUrl;this.txOssUploadUrl = txOssUploadUrl;}
}/*** 注入bean*/private void autowireBean() {this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);}@Overridepublic void run() {// 注入beanautowireBean();JSONObject jsonObject = JSON.parseObject(msg);String fileId = jsonObject.getString("fileId");MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();param.add("id", fileId);ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);String fileUrl = (String) responseResult.getData();if (StringUtils.isBlank(fileUrl)) {return;}InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);List<Object> list = ExcelUtil.read(inputStream);process(list, fileId);}/*** 将文件上传至oss* @param file* @return*/private String saveToOss(File file) {String fileId;try {FileSystemResource resource = new FileSystemResource(file);MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();param.add("file", resource);ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);fileId = (String) responseResult.getData();} catch (Exception e) {fileId = null;}return fileId;}
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
@RequestMapping("/txOssUpload")@ResponseBodypublic ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {if (null == file || file.isEmpty()) {return ResponseResult.fail("文件不能为空");}String originalFilename = file.getOriginalFilename();originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题String contentType = getContentType(originalFilename);String key;InputStream ins = null;File f = null;try {ins = file.getInputStream();f = new File(originalFilename);inputStreamToFile(ins, f);key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);} catch (Exception e) {return ResponseResult.fail(e.getMessage());} finally {if (null != ins) {try {ins.close();} catch (IOException e) {e.printStackTrace();}}if (f.exists()) {// 删除临时文件f.delete();}}return ResponseResult.ok(key);}public static void inputStreamToFile(InputStream ins,File file) {try {OutputStream os = new FileOutputStream(file);int bytesRead = 0;byte[] buffer = new byte[8192];while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {os.write(buffer, 0, bytesRead);}os.close();ins.close();} catch (Exception e) {e.printStackTrace();}}public String txOssUpload(FileInputStream inputStream, String key, String contentType) {key = Uuid.getUuid() + "-" + key;OSSUtil.txOssUpload(inputStream, key, contentType);try {if (null != inputStream) {inputStream.close();}} catch (IOException e) {e.printStackTrace();}return key;}public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {ObjectMetadata objectMetadata = new ObjectMetadata();try{int length = inputStream.available();objectMetadata.setContentLength(length);}catch (Exception e){logger.info(e.getMessage());}objectMetadata.setContentType(contentType);cosclient.putObject(txbucketName, key, inputStream, objectMetadata);}
7、下载文件
/*** 腾讯云文件下载* @param response* @param id* @return*/@RequestMapping("/txOssDownload")public Object txOssDownload(HttpServletResponse response, String id) {COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);String contentType = getContentType(id);FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);return null;}public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {FileOutputStream fos = null;response.reset();OutputStream os = null;try {response.setContentType(contentType + "; charset=utf-8");if(!contentType.equals(PlConstans.FileContentType.image)){try {response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));} catch (UnsupportedEncodingException e) {response.setHeader("Content-Disposition", "attachment; filename=" + fileName);logger.error("encoding file name failed", e);}}os = response.getOutputStream();byte[] b = new byte[1024 * 1024];int len;while ((len = fileStream.read(b)) > 0) {os.write(b, 0, len);os.flush();try {if(fos != null) {fos.write(b, 0, len);fos.flush();}} catch (Exception e) {logger.error(e.getMessage());}}} catch (IOException e) {IOUtils.closeQuietly(fos);fos = null;} finally {IOUtils.closeQuietly(os);IOUtils.closeQuietly(fileStream);if(fos != null) {IOUtils.closeQuietly(fos);}}}
8、读取网络文件流
/*** 读取网络文件流* @param url* @return*/public static InputStream readFileFromURL(String url) {if (StringUtils.isBlank(url)) {return null;}HttpClient httpClient = new DefaultHttpClient();HttpGet methodGet = new HttpGet(url);try {HttpResponse response = httpClient.execute(methodGet);if (response.getStatusLine().getStatusCode() == 200) {HttpEntity entity = response.getEntity();return entity.getContent();}} catch (Exception e) {e.printStackTrace();}return null;}
9、ExcelUtil
/*** 读excel* @param inputStream 文件输入流* @return list集合*/public static List<Object> read(InputStream inputStream) {return EasyExcelFactory.read(inputStream, new Sheet(1, 1));}/*** 写excel* @param data list数据* @param clazz* @param saveFilePath 文件保存路径* @throws IOException*/public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {File tempFile = new File(saveFilePath);OutputStream out = new FileOutputStream(tempFile);ExcelWriter writer = EasyExcelFactory.getWriter(out);Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);writer.write(data, sheet);writer.finish();out.close();}
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@LoginRequired注解
/*** 在需要登录验证的Controller的方法上使用此注解*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}
2、MyControllerAdvice
@ControllerAdvice
public class MyControllerAdvice {@ResponseBody@ExceptionHandler(TokenValidationException.class)public JsonResponse tokenValidationExceptionHandler() {return JsonResponse.loginInvalid();}@ResponseBody@ExceptionHandler(ServiceException.class)public JsonResponse serviceExceptionHandler(ServiceException se) {return JsonResponse.fail(se.getMsg());}@ResponseBody@ExceptionHandler(Exception.class)public JsonResponse exceptionHandler(Exception e) {e.printStackTrace();return JsonResponse.fail(e.getMessage());}}
3、AuthenticationInterceptor
public class AuthenticationInterceptor implements HandlerInterceptor {private static final String CURRENT_USER = "user";@Autowiredprivate UserService userService;@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {// 如果不是映射到方法直接通过if (!(handler instanceof HandlerMethod)) {return true;}HandlerMethod handlerMethod = (HandlerMethod) handler;Method method = handlerMethod.getMethod();// 判断接口是否有@LoginRequired注解, 有则需要登录LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);if (methodAnnotation != null) {// 验证tokenInteger userId = JwtUtil.verifyToken(request);PLUser plUser = userService.selectByPrimaryKey(userId);if (null == plUser) {throw new RuntimeException("用户不存在,请重新登录");}request.setAttribute(CURRENT_USER, plUser);return true;}return true;}@Overridepublic void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {}@Overridepublic void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {}
}
4、JwtUtil
public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天public static final String SECRET = "pl_token_secret";public static final String HEADER = "token";public static final String USER_ID = "userId";/*** 根据userId生成token* @param userId* @return*/public static String generateToken(String userId) {HashMap<String, Object> map = new HashMap<>();map.put(USER_ID, userId);String jwt = Jwts.builder().setClaims(map).setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME)).signWith(SignatureAlgorithm.HS512, SECRET).compact();return jwt;}/*** 验证token* @param request* @return 验证通过返回userId*/public static Integer verifyToken(HttpServletRequest request) {String token = request.getHeader(HEADER);if (token != null) {try {Map<String, Object> body = Jwts.parser().setSigningKey(SECRET).parseClaimsJws(token).getBody();for (Map.Entry entry : body.entrySet()) {Object key = entry.getKey();Object value = entry.getValue();if (key.toString().equals(USER_ID)) {return Integer.valueOf(value.toString());// userId}}return null;} catch (Exception e) {logger.error(e.getMessage());throw new TokenValidationException("unauthorized");}} else {throw new TokenValidationException("missing token");}}
结语: OK, 搞定,睡了, 好困
Spring Boot 项目 RedisTemplate 实现轻量级消息队列相关推荐
- Spring Boot:使用Rabbit MQ消息队列
综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以 ...
- 后端开发实践:Spring Boot项目模板
点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达今日推荐:2020年7月程序员工资统计,平均14357元,又跌了,扎心个人原创100W+访问量博客:点击前往,查看更多 作者 ...
- 后端开发实践:Spring Boot项目模板,拿去用吧!
点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:干掉 Navicat:这个 IDEA 的兄弟真香!个人原创100W+访问量博客:点击前往,查看更多 作者:无知者 ...
- 《SpringCloud超级入门》Spring Boot项目搭建步骤(超详细)《六》
目录 编写第一个 REST 接口 读取配置文件 profiles 多环境配置 热部署 actuator 监控 自定义 actuator 端点 统一异常处理 异步执行 随机端口 编译打包 在 Sprin ...
- STS创建Spring Boot项目实战(Rest接口、数据库、用户认证、分布式Token JWT、Redis操作、日志和统一异常处理)
STS创建Spring Boot项目实战(Rest接口.数据库.用户认证.分布式Token JWT.Redis操作.日志和统一异常处理) 1.项目创建 1.新建工程 2.选择打包方式,这边可以选择为打 ...
- Spring boot 项目(十二)——实现电脑网页微信扫码自动授权
引言 电脑端微信网页扫码授权登录有2种方式: 第一种:基于微信公众号,单独获取登录二维码扫码,然后扫码登录,程序控制跳转逻辑,例如CSDN 第二种:基于微信开放平台,跳转到微信二维码页面进行扫码登录, ...
- Spring boot 项目Kafka Error connecting to node xxx:xxx Kafka项目启动异常 Failed to construct kafka consumer
Spring boot 项目Kafka Error connecting to node xxx:xxx Spring boot Kafka项目启动异常 新建了一个springBoot集成Kafka的 ...
- Vue + Spring Boot 项目实战(二十一):缓存的应用
重要链接: 「系列文章目录」 「项目源码(GitHub)」 本篇目录 前言 一.缓存:工程思想的产物 二.Web 中的缓存 1.缓存的工作模式 2.缓存的常见问题 三.缓存应用实战 1.Redis 与 ...
- 【MySQL】Spring Boot项目基于Sharding-JDBC和MySQL主从复制实现读写分离(8千字详细教程)
目录 前言 一. 介绍 二. 主从复制 1. 原理 2. 克隆从机 3. 克隆从机大坑 4. 远程登陆 5. 主机配置 6. 从机配置 7. 主机:建立账户并授权 8. 从机:配置需要复制的主机 9. ...
最新文章
- 机器学习中的优化方法小结
- mysql 唯一性约束报错_怪异的MySQL Online DDL报错Duplicate entry
- boost::mp11::mp_reverse_fold相关用法的测试程序
- 本机电脑与 Android 设备如何进行文件传输?
- 让我们回顾一下如何通过JDBC插入Clob或Blob
- tensorflow(centos 7.0 64)安装
- 在ubuntu上安装Avant Window Navigator (AWN)
- FastFDS--文件服务系统
- ubuntu显卡测试软件,Linux显卡性能测试程序Unigine Valley 和 Unigine Heaven
- 工业树莓派结合USB摄像头实现远程网络监控
- 嵌入式测试 模拟共享单车
- webpack 5高级配置优化
- SyntaxError: Non-UTF-8 code starting with ‘\xb5‘ in file问题如何解决???求助求助!!!
- SAP 小币种金额的转换函数和处理
- (选做)基于数组的工资处理系统
- php setcookie应该在哪里使用,php setcookie 用法
- 重庆兰格机械集团有限公司招聘-船讯网
- 如何和后台接触的_民熔小课堂|跌落式熔断器该如何检修?点进来告诉你答案!...
- 浅显易懂入门大数据系列:二、MapReduce、YARN(超详细)
- 1w存银行一年多少利息_100万存在银行,一年最高能有多少利息?
热门文章
- 用Python实现一个1加到n求和功能的函数
- 【OpenCV3】棋盘格角点检测与绘制——cv::findChessboardCorners()与cv::drawChessboardCorners()详解
- webpack 之 code spliting
- java日志处理汇总
- ProxySQL 监控和统计
- RAC实例 表空间 维护
- 简单的XML和JSON数据的处理
- asp.net %% %#%%=%%@%%$%用法与区别
- ubuntu -- 安装memcached
- Prototype1.5.1源代码解读分析-5