一、场景

  项目A监听mq中的其他项目的部署消息(包括push_seq, status, environment,timestamp等),然后将部署消息同步到数据库中(项目X在对应环境[environment]上部署的push_seq[项目X的版本])。那么问题来了,mq中加入包含了两个部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情况下,dm1 和 dm2可能会分别被消费(也就是并行),那么在同步数据库的时候可能会发生 dm1 的数据保存 后于 dm2的数据保存,导致保存项目的部署信息发生异常。

二、解决思路

  将mq消息的并行消费变成串行消费,这里借助redis分布式锁来完成。同一个服务在分布式的状态下,监听到mq消息后,触发方法的执行,执行之前(通过spring aop around来做的)首先获得redis的一个分布式锁,获取锁成功之后才能执行相关的逻辑以及数据库的保存,最后释放锁。

三、主要代码

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {/*** redis的key* @return*/String value();/*** 持锁时间,单位毫秒,默认一分钟*/long keepMills() default 60000;/*** 当获取失败时候动作*/LockFailAction action() default LockFailAction.GIVEUP;public enum LockFailAction{/*** 放弃*/GIVEUP,/*** 继续*/CONTINUE;}/*** 睡眠时间,设置GIVEUP忽略此项* @return*/long sleepMills() default 500;
}

import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Component
@Aspect
public class RedisLockAspect {private static final Log log = LogFactory.getLog(RedisLockAspect.class);@Autowiredprivate RedisCacheTemplate.RedisLockOperation redisLockOperation;@Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" +"&& @annotation(me.ele.api.portal.service.redis.RedisLock)")private void lockPoint(){}@Around("lockPoint()")public Object arround(ProceedingJoinPoint pjp) throws Throwable{MethodSignature methodSignature = (MethodSignature) pjp.getSignature();Method method = methodSignature.getMethod();RedisLock lockInfo = method.getAnnotation(RedisLock.class);/*String lockKey = lockInfo.value();if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) {DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0];lockKey += deployMessage.getEnv();System.out.println(lockKey);}*/boolean lock = false;Object obj = null;while(!lock){long timestamp = System.currentTimeMillis()+lockInfo.keepMills();lock = setNX(lockInfo.value(), timestamp);//得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)long now = System.currentTimeMillis();if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){log.info("得到redis分布式锁...");obj = pjp.proceed();if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){releaseLock(lockInfo.value());}}else{if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){log.info("稍后重新请求redis分布式锁...");Thread.currentThread().sleep(lockInfo.sleepMills());}else{log.info("放弃redis分布式锁...");break;}}}return obj;}private boolean setNX(String key,Long value){return redisLockOperation.setNX(key, value);}private long getLock(String key){return redisLockOperation.get(key);}private Long getSet(String key,Long value){return redisLockOperation.getSet(key, value);}private void releaseLock(String key){redisLockOperation.delete(key);}@Pointcut(value = "execution(* me.ele..StargateBuildMessageConsumer.consumeStargateBuildMessage(me.ele.api.portal.service.mq.dto.BuildMessage)) && args(buildMessage)" +"&& @annotation(me.ele.api.portal.service.redis.RedisLock)", argNames = "buildMessage")private void buildMessageLockPoint(BuildMessage buildMessage){}@Around(value = "buildMessageLockPoint(buildMessage)", argNames = "pjp,buildMessage")public Object buildMessageAround(ProceedingJoinPoint pjp, BuildMessage buildMessage) throws Throwable {final String LOCK = buildMessage.getAppId() + buildMessage.getPushSequence();Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();return pjp.proceed();} finally {try {lock.unlock();} catch (Exception e) {log.error("buildMessage={}, Lock {} unlock failed. {}", buildMessage, lock, e);}}}}

四、遇到的问题

  

  

  开始是将锁加到deploy的方法上的,但是一直aop一直没有作用,换到consumeStargateDeployMessage方法上就可以了。考虑了一下是因为 @Transactional的原因。这里注意下。

  在一篇文章中找到了原因:SpringBoot CGLIB AOP解决Spring事务,对象调用自己方法事务失效.

  只要脱离了Spring容器管理的所有对象,对于SpringAOP的注解都会失效,因为他们不是Spring容器的代理类,SpringAOP,就切入不了。也就是说是 @Transactional注解方法的代理对象并不是spring代理对象。

  参考: 关于proxy模式下,@Transactional标签在创建代理对象时的应用

五、使用spring-redis中的RedisLockRegistry

import java.util.concurrent.locks.Lock;
import org.springframework.integration.redis.util.RedisLockRegistry;@Bean
public RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey,RedisTemplate redisTemplate) {return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000);
}Lock lock = redisLockRegistry.obtain(appId);lock.tryLock(180, TimeUnit.SECONDS);
....
lock.unlock();  

六、参考

  其他工具类,请参考这里。

七、springboot LockRegistry

  

分布式锁-RedisLockRegistry源码分析[转]

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
import redis.clients.jedis.JedisShardInfo;@Ignore
public class RedisLockTest {private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class);private static final String LOCK = "xxx.xxx";private RedisLockRegistry redisLockRegistry;@Beforepublic void setUp() {JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1");JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo);redisLockRegistry = new RedisLockRegistry(factory, "test", 50L);}private class TaskA implements Runnable {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();LOGGER.info("Lock {} is obtained", lock);Thread.sleep(10);lock.unlock();LOGGER.info("Lock {} is unlocked", lock);} catch (Exception ex) {LOGGER.error("Lock {} unlock failed", lock, ex);}}}private class TimeoutTask implements Runnable {@Overridepublic void run() {Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();LOGGER.info("Lock {} is obtained", lock);Thread.sleep(5000);lock.unlock();LOGGER.info("Lock {} is unlocked", lock);} catch (Exception ex) {LOGGER.error("Lock {} unlock failed", lock, ex);}}}@Testpublic void test() throws InterruptedException, TimeoutException {ExecutorService service = Executors.newFixedThreadPool(2);service.execute(new TimeoutTask());service.execute(new TaskA());service.shutdown();if (!service.awaitTermination(1, TimeUnit.MINUTES)) {throw new TimeoutException();}}
}

转载于:https://www.cnblogs.com/hujunzheng/p/7612264.html

redis分布式锁小试相关推荐

  1. redis分布式锁 在集群模式下如何实现_收藏慢慢看系列:简洁实用的Redis分布式锁用法...

    在微服务中很多情况下需要使用到分布式锁功能,而目前比较常见的方案是通过Redis来实现分布式锁,网上关于分布式锁的实现方式有很多,早期主要是基于Redisson等客户端,但在Spring Boot2. ...

  2. 快来学习Redis 分布式锁的背后原理

    以前在学校做小项目的时候,用到Redis,基本也只是用来当作缓存.可阿粉在工作中发现,Redis在生产中并不只是当作缓存这么简单.在阿粉接触到的项目中,Redis起到了一个分布式锁的作用,具体情况是这 ...

  3. Redis分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!

    点击关注公众号,Java干货及时送达 来源:juejin.cn/post/6854573212831842311 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本篇文章主要是基于我们实际项目 ...

  4. Redis 分布式锁使用不当,酿成一个重大事故,超卖了100瓶飞天茅台!!!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 基于Redis使用分布式锁在当今已经不是什么新鲜事了. 本 ...

  5. 秒杀商品超卖事故:Redis分布式锁请慎用!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:浪漫先生 来源:juejin.im/post/6854573 ...

  6. 记一次由Redis分布式锁造成的重大事故,避免以后踩坑!

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:浪漫先生 juejin.im/post/5f159cd8f2 ...

  7. 简单介绍redis分布式锁解决表单重复提交的问题

    在系统中,有些接口如果重复提交,可能会造成脏数据或者其他的严重的问题,所以我们一般会对与数据库有交互的接口进行重复处理.本文就详细的介绍一下redis分布式锁解决表单重复提交,感兴趣的可以了解一下 假 ...

  8. Redis 分布式锁没这么简单,网上大多数都有 bug

    Redis 分布式锁这个话题似乎烂大街了,不管你是面试还是工作,随处可见,「码哥」为啥还写? 因为看过很多文章没有将分布式锁的各种问题讲明白,所以准备写一篇,也当做自己的学习总结. 在进入正文之前,我 ...

  9. 深度剖析:Redis分布式锁到底安全吗?看完这篇文章彻底懂了!

    ‍‍‍‍‍‍‍‍‍‍‍‍阅读本文大约需要 20 分钟. 大家好,我是 Kaito. 这篇文章我想和你聊一聊,关于 Redis 分布式锁的「安全性」问题. Redis 分布式锁的话题,很多文章已经写烂了 ...

最新文章

  1. Java线程状态及 wait、sleep、join、interrupt、yield等的区别
  2. 拜托,面试别再问我表达式求值了!!!
  3. asp.net form submit 在Chrome里面看Form提交
  4. Flutter 初学者的简单例子充分解释
  5. python学习-Pillow图像处理
  6. 从Spring起,Java EE 6必须具备哪些附加功能?
  7. (RMAN)使用恢复目录数据库执行RMAN步骤
  8. php 语义解析,[扩展推荐] PHP 语义化版本(SemVer)辅助库
  9. java使用数组排序方法_java数组中的排序问题(冒泡排序方法的实现,及内置排序算法的应用)...
  10. Linux下find命令用法详解
  11. OpenCL编程基本流程及完整示例
  12. ffplay音视频同步
  13. 基于java中国跳棋游戏
  14. 手机虚拟键盘的设置显示隐藏
  15. 从0开始学大数据(一)
  16. 增量学习BiC: Large Scale Incremental Learning
  17. 一月17日新生冬季练习赛解题报告 A.小Q的生日
  18. [解决方案] Mendelay无法打开pdf文档:显示 unable to open this file
  19. 云数据库 Redis 版
  20. vcs+verdi Debug记录

热门文章

  1. php+静态变量的初始值,php 静态变量的初始化
  2. mysql 事务日志备份_事务日志备份与恢复 5
  3. 数据加载约定表模型变更_08
  4. Flowable 数据库表结构 ACT_HI_DETAIL
  5. quarz 定时任务 cron表达式
  6. git.exe init#timeout = 10错误:克隆远程repo'origin'时出错hudson.plugins.git
  7. idea全局搜索快捷鍵ctrl+shift+F失效
  8. Java-自增自减运算符 初始Math类
  9. npoi 所有列调整为一页_别再浪费纸了,一张纸就能打印Word、Excel、PPT所有内容,真厉害...
  10. java servlet spring_spring与tomcat 对应关系,servlet各版本写法