前言

上篇文章《Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失》我们对ThreadLocal数据丢失进行了详细的分析,并通过代码的方式复现了这个问题。

在上篇文章的末尾我也说了思路给大家提供了,如果需要能够在Hystrix 为线程隔离模式也能正确传递数据的话,需要我们自己去修改。

我这边以Zuul中自定义负载均衡策略来进行讲解,在Zuul中需要实现灰度发布的功能,需要在Filter中将请求的用户信息传递到自定的负载策略中,Zuul中整合了Hystrix,从Zuul Filter的请求到Ribbon的策略类中,线程已经发生了变化,变成了Hystrix提供的线程池来执行(配置隔离模式为线程)。这个时用ThreadLocal就会出问题了,数据传输会错乱。也就是我们前面分析的问题。

关于修改我说下自己分析问题的一些思路,ransmittable-thread-local可以解决这个问题,可以对线程或者线程池进行修饰,其实最终的原理就是对线程进行包装,在线程run之前和之后做一些处理来保证数据的正确传递。

改造思路

首先我想的就是改掉Hystrix中的线程池或者线程,只有这样才能让ransmittable-thread-local来接管线程中数据的传递。

通过调试的方式找到com.netflix.hystrix.HystrixThreadPool是Hystrix线程池的接口,里面定义了一个获取ExecutorService方法,代码如下:

  1. public interface HystrixThreadPool {

  2.    /**

  3.     * Implementation of {@link ThreadPoolExecutor}.

  4.     *

  5.     * @return ThreadPoolExecutor

  6.     */

  7.    public ExecutorService getExecutor();

  8. }

通过查找接口的实现类,发现只有一个默认的实现com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault,实现也在接口中,是一个静态类。实现的方法如下:

  1. @Override

  2. public ThreadPoolExecutor getExecutor() {

  3.     touchConfig();

  4.     return threadPool;

  5. }

threadPool是类中的一个变量,主要是通过touchConfig方法来设置线程的参数,touchConfig代码如下:

  1. private void touchConfig() {

  2.      final int dynamicCoreSize = properties.coreSize().get();

  3.      final int configuredMaximumSize = properties.maximumSize().get();

  4.      int dynamicMaximumSize = properties.actualMaximumSize();

  5.      final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();

  6.      boolean maxTooLow = false;

  7.      if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {

  8.          //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum

  9.          dynamicMaximumSize = dynamicCoreSize;

  10.          maxTooLow = true;

  11.      }

  12.      //......

  13. }

这是最外层获取线程池的地方,可以根据代码一步步进去看,最终获取线程池的代码在com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy.getThreadPool方法中。

上面是线程池的源码分析,我们可以改造源码,将线程池用ransmittable-thread-local进行修饰。

改造线程方式

另外一种是改造线程的方式,在Hystrix将命令丢入线程池的时候对线程进行修饰也可以解决此问题,因为ransmittable-thread-local对线程池进行修饰,其原理也是改造了线程,通过源码可以看出:

  1. public static ExecutorService getTtlExecutorService(ExecutorService executorService) {

  2.        if (executorService == null || executorService instanceof ExecutorServiceTtlWrapper) {

  3.            return executorService;

  4.        }

  5.        return new ExecutorServiceTtlWrapper(executorService);

  6. }

  7. class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService {

  8.    private final ExecutorService executorService;

  9.    ExecutorServiceTtlWrapper(ExecutorService executorService) {

  10.        super(executorService);

  11.        this.executorService = executorService;

  12.    }

  13.    @Override

  14.    public <T> Future<T> submit(Callable<T> task) {

  15.        return executorService.submit(TtlCallable.get(task));

  16.    }

  17.    @Override

  18.    public <T> Future<T> submit(Runnable task, T result) {

  19.        return executorService.submit(TtlRunnable.get(task), result);

  20.    }

  21.    @Override

  22.    public Future<?> submit(Runnable task) {

  23.        return executorService.submit(TtlRunnable.get(task));

  24.    }

  25.    // ...........

  26. }

重点在TtlRunnable.get()

改造Hystrix中线程的方式,可以通过HystrixContextScheduler进行入手,Hystrix通过HystrixContextScheduler的ThreadPoolScheduler把命令submit到ThreadPoolExecutor中去执行。

通过上面的分析,最终可以定位到提交命令的代码如下:

  1. private static class ThreadPoolWorker extends Worker {

  2.        private final HystrixThreadPool threadPool;

  3.        private final CompositeSubscription subscription = new CompositeSubscription();

  4.        private final Func0<Boolean> shouldInterruptThread;

  5.        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {

  6.            this.threadPool = threadPool;

  7.            this.shouldInterruptThread = shouldInterruptThread;

  8.        }

  9.        @Override

  10.        public void unsubscribe() {

  11.            subscription.unsubscribe();

  12.        }

  13.        @Override

  14.        public boolean isUnsubscribed() {

  15.            return subscription.isUnsubscribed();

  16.        }

  17.        @Override

  18.        public Subscription schedule(final Action0 action) {

  19.            if (subscription.isUnsubscribed()) {

  20.                // don't schedule, we are unsubscribed

  21.                return Subscriptions.unsubscribed();

  22.            }

  23.            // This is internal RxJava API but it is too useful.

  24.            ScheduledAction sa = new ScheduledAction(action);

  25.            subscription.add(sa);

  26.            sa.addParent(subscription);

  27.            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();

  28.            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);

  29.            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

  30.            return sa;

  31.        }

  32.        @Override

  33.        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {

  34.            throw new IllegalStateException("Hystrix does not support delayed scheduling");

  35.        }

  36. }

核心代码在schedule方法中,只需要将schedule中的sa进行修饰即可。

改造后的代码如下:

  1. public Subscription schedule(final Action0 action) {

  2.     if (subscription.isUnsubscribed()) {

  3.            // don't schedule, we are unsubscribed

  4.            return Subscriptions.unsubscribed();

  5.     }

  6.     // This is internal RxJava API but it is too useful.

  7.     ScheduledAction sa = new ScheduledAction(action);

  8.     subscription.add(sa);

  9.     sa.addParent(subscription);

  10.     ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();

  11.     FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa));

  12.     sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

  13.     return sa;

  14. }

改源码还涉及到重新打包等问题,每个项目都得用修改后的jar包,比较麻烦,最简单的做法就是在项目中建一个同样的HystrixContextScheduler类,包名也要和之前一样,让jvm优先加载,这样就能用这个修改的类来代替Hystrix原始的类。

最后我们来验证下这样的改动是否正确,首先我们在Zuul的Filter中进行值的传递:

RibbonFilterContextHolder是基于InheritableThreadLocal做的值传递,代码如下:

  1. public class RibbonFilterContextHolder {

  2.    private static final ThreadLocal<RibbonFilterContext> contextHolder = new InheritableThreadLocal<RibbonFilterContext>() {

  3.        @Override

  4.        protected RibbonFilterContext initialValue() {

  5.            return new DefaultRibbonFilterContext();

  6.        }

  7.    };

  8.    public static RibbonFilterContext getCurrentContext() {

  9.        return contextHolder.get();

  10.    }

  11.    public static void clearCurrentContext() {

  12.        contextHolder.remove();

  13.    }

  14. }

完整源码请参考: https://github.com/yinjihuan/spring-cloud/blob/master/fangjia-common/src/main/java/com/fangjia/common/support/RibbonFilterContextHolder.java

  1. private static AtomicInteger ac = new AtomicInteger();

  2.    @Override

  3.    public Object run() {

  4.        RequestContext ctx = RequestContext.getCurrentContext();

  5.        RibbonFilterContextHolder.getCurrentContext().add("servers",ac.addAndGet(1)+"");

  6.        return null;

  7.    }

通过AtomicInteger 进行数字的累加操作,后面测试的时候用10个线程并发测试,如如果在Ribbon的自定义负载策略中接收的值是0-9的话表示正确,否则错误。

接下来定义一个负载策略类,输出接收的值:

  1. public class GrayPushRule extends AbstractLoadBalancerRule {

  2.    private AtomicInteger nextServerCyclicCounter;

  3.    private static final boolean AVAILABLE_ONLY_SERVERS = true;

  4.    private static final boolean ALL_SERVERS = false;

  5.    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

  6.    public GrayPushRule() {

  7.        this.nextServerCyclicCounter = new AtomicInteger(0);

  8.    }

  9.    public GrayPushRule(ILoadBalancer lb) {

  10.        this();

  11.        this.setLoadBalancer(lb);

  12.    }

  13.    public Server choose(ILoadBalancer lb, Object key) {

  14.        String servers = RibbonFilterContextHolder.getCurrentContext().get("servers");

  15.        System.out.println(Thread.currentThread().getName()+":"+servers);  

  16.        return null;

  17.    }

  18.    public Server choose(Object key) {

  19.        return this.choose(this.getLoadBalancer(), key);

  20.    }

  21.    public void initWithNiwsConfig(IClientConfig clientConfig) {

  22.    }

  23. }

然后增加配置,使用自定义的策略,还需要将Hystrix的线程池数量改小一点,这样才可以线程复用

  1. fsh-house.ribbon.NFLoadBalancerRuleClassName=com.fangjia.fsh.api.rule.GrayPushRule

  2. # 线程隔离模式

  3. zuul.ribbon-isolation-strategy=thread

  4. hystrix.threadpool.default.coreSize=3

启动服务,用ab进行测试:

  1. ab -n 10 -c 10 http://192.168.10.170:2103/fsh-house/house/1

输出结果如下:

  1. hystrix-RibbonCommand-3:10

  2. hystrix-RibbonCommand-2:3

  3. hystrix-RibbonCommand-1:8

  4. hystrix-RibbonCommand-3:10

  5. hystrix-RibbonCommand-2:3

  6. hystrix-RibbonCommand-1:8

  7. hystrix-RibbonCommand-3:10

  8. hystrix-RibbonCommand-2:3

  9. hystrix-RibbonCommand-1:8

  10. hystrix-RibbonCommand-3:10

很多数据都重复了,这就是线程复用导致的问题,接下来我们用上面讲的方式进行改造,需要将RibbonFilterContextHolder中的InheritableThreadLocal改成TransmittableThreadLocal

  1.    private static final TransmittableThreadLocal<RibbonFilterContext> contextHolder = new TransmittableThreadLocal<RibbonFilterContext>() {

  2.        @Override

  3.        protected RibbonFilterContext initialValue() {

  4.            return new DefaultRibbonFilterContext();

  5.        }

  6.    };

然后在项目中新建一个HystrixContextScheduler类,包名必须是com.netflix.hystrix.strategy.concurrency,代码就按上面贴的进行改,主要是对线程进行修饰:

  1. FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa));

再次启动服务,进行测试,结果如下:

  1. hystrix-RibbonCommand-2:10

  2. hystrix-RibbonCommand-1:1

  3. hystrix-RibbonCommand-3:7

  4. hystrix-RibbonCommand-3:8

  5. hystrix-RibbonCommand-1:2

  6. hystrix-RibbonCommand-2:4

  7. hystrix-RibbonCommand-3:5

  8. hystrix-RibbonCommand-1:9

  9. hystrix-RibbonCommand-2:3

  10. hystrix-RibbonCommand-3:6

现在的结果已经是正确的

改造线程池方式

上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:

在启动之前调用进行注册自定义实现的逻辑:

  1. HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。

我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。

  1. public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

  2.    private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);

  3.    @Override

  4.    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,

  5.            HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,

  6.            BlockingQueue<Runnable> workQueue) {

  7.        return this.doGetThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

  8.    }

  9.    @Override

  10.    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,

  11.            HystrixThreadPoolProperties threadPoolProperties) {

  12.        return this.doGetThreadPool(threadPoolKey, threadPoolProperties);

  13.    }

  14. }

在doGetThreadPool方法中就返回包装的线程池,代码如下:

  1. return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue,

  2.                    threadFactory);

最后就是ThreadLocalThreadPoolExecutor的代码:

  1. public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {

  2.    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

  3.    public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

  4.            BlockingQueue<Runnable> workQueue) {

  5.        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

  6.    }

  7.    public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

  8.            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {

  9.        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);

  10.    }

  11.    @Override

  12.    public void execute(Runnable command) {

  13.        super.execute(TtlRunnable.get(command));

  14.    }

  15. }

完整源码参考:

https://github.com/yinjihuan/spring-cloud/tree/master/fangjia-fsh-api

阿里云1C2G虚拟机【99/年】羊毛党集合啦!

推荐阅读

  • Spring Boot使用@Async实现异步调用:自定义线程池

  • Spring Cloud Feign的文件上传实现

  • 如何在Spring Boot中玩转智能合约【修订版】

  • Spring Boot中增强对MongoDB的配置(连接池等)

  • Spring Boot 与 OAuth2

  • Spring Boot 2.0 新特性(二):新增事件ApplicationStartedEvent

  • Spring Boot 2.0 新特性(一):配置绑定 2.0 全解析

  • Spring Boot 2.0正式发布,升还是不升呢?

  • Spring Boot 2.0 新特性概览

  • Spring Boot/Cloud干货汇总

长按指纹

一键关注



深入交流、更多福利

扫码加入我的知识星球


点击 “阅读原文” 看看本号其他精彩内容

Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失(续)相关推荐

  1. Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失

    在Spring Cloud中我们用Hystrix来实现断路器,Zuul中默认是用信号量(Hystrix默认是线程)来进行隔离的,我们可以通过配置使用线程方式隔离. 在使用线程隔离的时候,有个问题是必须 ...

  2. Spring Cloud中Hystrix、Ribbon及Feign的熔断关系是什么?

    导读 今天和大家聊一聊在Spring Cloud微服务框架实践中,比较核心但是又很容易把人搞得稀里糊涂的一个问题,那就是在Spring Cloud中Hystrix.Ribbon以及Feign它们三者之 ...

  3. Spring Cloud中Hystrix的请求合并

    在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导 ...

  4. Spring Cloud中Hystrix仪表盘与Turbine集群监控

    Hystrix仪表盘,就像汽车的仪表盘实时显示汽车的各项数据一样,Hystrix仪表盘主要用来监控Hystrix的实时运行状态,通过它我们可以看到Hystrix的各项指标信息,从而快速发现系统中存在的 ...

  5. Spring Cloud中Hystrix仪表盘与Turbine集群监控 1

    Hystrix仪表盘,就像汽车的仪表盘实时显示汽车的各项数据一样,Hystrix仪表盘主要用来监控Hystrix的实时运行状态,通过它我们可以看到Hystrix的各项指标信息,从而快速发现系统中存在的 ...

  6. Spring Cloud中Hystrix实现断路器原理

    多个微服务之间调用的时候,假设微服务A调用微服务B和微服务C,微服务B和微服务C又在调用其他的微服务,这就是所谓的"扇出".如果扇出的链路上某个微服务的调用响应时间过长或者不可用, ...

  7. 10 在Spring Cloud中使用Hystrix

    Hystrix主要用于保护调用服务的一方,如果被调用的服务发生故障,符合一定条件,就会开启断路器对调用的程序进行隔离. 1.准备测试程序 在进行Spring Cloud整合Hystrix之前,我们先准 ...

  8. 【夯实Spring Cloud】Spring Cloud中使用Hystrix实现断路器原理详解(上)

    本文属于[夯实Spring Cloud]系列文章,该系列旨在用通俗易懂的语言,带大家了解和学习Spring Cloud技术,希望能给读者带来一些干货.系列目录如下: [夯实Spring Cloud]D ...

  9. spring Cloud中,解决Feign/Ribbon整合Hystrix第一次请求失败的问题?

    Spring Cloud中,Feign和Ribbon在整合了Hystrix后,可能会出现首次调用失败的问题,要如何解决该问题呢? 造成该问题的原因 Hystrix默认的超时时间是1秒,如果超过这个时间 ...

最新文章

  1. JavaScript中的静态成员
  2. JAVA_OA管理系统(二):SpringMVC笔记基础篇01注入方法
  3. 窗口的z-order是什么?PyQt5
  4. ORA-01925:maximum of 80 enabled roles exceeded
  5. USACO-Section1.5 Mother's Milk (深度优先搜索)
  6. Java JSR303 valid
  7. NYOJ39-水仙花数
  8. MSP430 F5529 单片机 OLED 音乐播放器 八音盒 蜂鸣器 音乐
  9. 获取JSON文本(复嵌对象)转换指定JSON数据并Ajax实现数据初始可视化【附上echarts地图官方数据形式json文件数据】
  10. 移动硬盘坏点测试软件,移动硬盘坏道检测修复
  11. Linux 查看日志命令tail的用法
  12. iomega ix2 Android,Lenovo Iomega ix2共享功能_联想 IOMEGA IX2_服务器评测与技术-中关村在线...
  13. 自己动手写ORB特征
  14. ios真机测试,Ineligible Devices,不可以选中真机
  15. 微信公众平台注册与认证图文教程分享
  16. BOM 物料清单 Bill Of Materials
  17. 开发技术指南 | 最全 Substrate 与 Polkadot 技术文档、教程、课程
  18. Windows下搭建ant+jenkins+jmeter自动化接口测试框架
  19. 双向链表、环形链表及约瑟夫问题
  20. openMVS深度图计算:DenseReconstruction Estimate之EVTEstimateDepthMap之patchmatch的传播优化

热门文章

  1. easyui panel 默认折叠 右上角按钮
  2. centos7最小安装没有 ifconfig netstat 命令
  3. linux下文件夹的创建、复制、剪切、重命名、清空和删除命令
  4. VC++创建个性的对话框之MFC篇
  5. Linux的mount命令简介
  6. Linux C编程--临时文件
  7. java执行器是什么_java使用Executor(执行器)管理线程
  8. 动态追踪技术思想及应用
  9. Python 在定义函数时 为什么默认参数不能放在必选参数前面
  10. 数据结构 - 二叉排序树