Spring Cloud Feign传输Header,并保证多线程情况下也适用
一、现象
微服务在生产中,常遇到需要把 header 传递到下一子服务的情况(如服务A访问服务B的接口,需要传递header),网上大多数的方案是实现 RequestInterceptor 接口,在重写方法中,把 header 填进 Feign 的请求中。我们先按这种方式,简单实现代码如下:

1、继承RequestInterceptor
服务A新建类,继承 RequestInterceptor,把 header 设置到请求中,注意 header 的key若是大写时,请求中一般会被转为小写,所以建议header的key一般设置为小写。

package com.he.feign.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;/*** <b>@Desc</b>:   1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Configuration
public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();//当主线程的请求执行完毕后,Servlet容器会被销毁当前的Servlet,因此在这里需要做判空if (attributes != null) {HttpServletRequest request = attributes.getRequest();Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()) {String name = headerNames.nextElement();//不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写if (name.equals("feignheader")) {requestTemplate.header(name,request.getHeader(name));}}}}}

2、修改 hystrix 的隔离策略为 semaphore
RequestContextHolder.getRequestAttributes()方法,实际上是从ThreadLocal变量中取得相应信息的。hystrix断路器的默认隔离策略为THREAD,该策略是无法取得ThreadLocal值的,所以需要修改hystrix的隔离策略,一般是改为[semaphore],在服务A中的 yml 新增配置如下#2、hystrix 的隔离策略改为 SEMAPHORE

hystrix:command:default:execution:timeout:enable: trueisolation:strategy: SEMAPHOREthread:timeoutInMilleseconds: 60000

3、客户端A的测试代码
3.1、服务A的controller接口

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet容器不会销毁HttpServletRequest,//所以请求属性还保存在请求链路中,能被传递下去CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}

3.2、Feign类
feignclient的注解可以省略configuration配置,即configuration = FeignConfig.class可不声明

package com.he.feign.feign;import com.he.feign.feign.hystrix.HeaderFeignFallback;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;/*** <b>@Desc</b>:   TODO* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
//@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class,configuration = FeignConfig.class)//可以省略configuration配置
@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class)
public interface HeaderFeign {@GetMapping("/test")String test();
}
package com.he.feign.feign.hystrix;import com.he.feign.feign.HeaderFeign;
import org.springframework.stereotype.Component;/*** <b>@Desc</b>:   TODO* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/1* <b>@Modify</b>:*/
@Component
public class HeaderFeignFallback implements HeaderFeign {@Overridepublic String test() {return null;}
} 

4、服务端B的接口代码

package com.he.eurekaclient.controller;import com.he.eurekaclient.feign.HelloFeign;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;/*** <b>@Desc</b>:   测试header传递* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@RequestMapping("/header")
@RestController
public class HeaderController {@Value("${spring.application.name}")private String appName;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignHeader-test@GetMapping("/test")public String test() {StringBuffer sb = new StringBuffer("hello from ").append(appName).append("\n");StringBuffer requestURL = servletRequest.getRequestURL();sb.append("requestURL: ").append(requestURL).append("\n");boolean isContain = false;sb.append("headers: \n");Enumeration<String> headerNames = servletRequest.getHeaderNames();//header的name都是小写while (headerNames.hasMoreElements()){String headername = headerNames.nextElement();String headerValue = servletRequest.getHeader(headername);sb.append(headername).append("-").append(headerValue).append("\n");if (headername.equals("feignheader")) isContain = true;}if (!isContain) {sb.append("--error--").append("not contain required header!");}return sb.toString();}
}

5、启动服务,在postman中测试如下
5.1、调用接口 http://localhost:8060/test_header/main_thread,结果如下


5.2、调用接口 http://localhost:8060/test_header/sub_thread ,结果如下


5.3、调用 http://localhost:8060/test_header/sub_thread/block,结果如下


从5.1 – 5.3的查询结果,可以得到结论

经过上述的配置后,用户线程(主线程)中调用非feign请求,可把header传递到服务B中;
若在用户线程(主线程)中启动子线程,并在子线程中调用feign请求,header传递不到服务B中;
即是子线程最终异步转同步阻塞等待结果,header仍传递不到服务B中。
二、网络上大多数的解决方案
出现上面的原因, 主要是 RequestAttributes 默认不是线程共享的;主线程调用子线程时,没把 RequestAttributes 共享给子线程。因此,只要在主线程调用其他线程前将RequestAttributes对象设置为子线程共享,就能把header等信息传递下去。

1、因此,网络上大多数的解决方案如下,在主线程调用子线程前,增加下面配置
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承,线程共享
1
修改后的代码如下

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}

2、重新启动服务A,再次调用两个带子线程的接口,现象如下
调用 http://localhost:8060/test_header/sub_thread/block,结果如下

调用接口 http://localhost:8060/test_header/sub_thread ,结果如下


测试结果,有以下两种现象

在主线程get()阻塞等待子线程执行完毕时,每次请求都成功;
主线程直接启动子线程,且执行完自己逻辑后便结束不需理会子线程结果的,请求偶尔成功, 偶尔失败;
这是为什么呢,作者认为主要是以下原因

Servlet容器中Servlet属性生命周期与接收请求的用户线程(父线程)同步, 随着父线程执行完destroy()而销毁;
子线程虽然可以从父线程共享信息中获得了请求属性,但这个属性由父线程维护
当父线程比子线程执行完慢时,请求属性还在,子线程请求成功;当快时,请求属性随着父线程结束而销毁,子线程的请求属性变为null,请求失败。
由此可见,简单的设置 RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);

在多线程情况下, 并非是一劳永逸的。

三、作者的解决方案
针对上面的问题,及问题根本原因,我们团队的解决方案仍是使用 ThreadLocal,进行线程间的变量共享通信。

1、新建 ThreadLocalUtil

package com.he.feign.thread;import java.util.HashMap;
import java.util.Map;/*** <b>@Desc</b>:   线程共享* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/22* <b>@Modify</b>:*/
public class ThreadLocalUtil {//使用InheritableThreadLocal,使得共享变量可被子线程继承private static final InheritableThreadLocal<Map<String,String>> headerMap = new InheritableThreadLocal<Map<String, String>>(){@Overrideprotected Map<String, String> initialValue() {return new HashMap<>();}};public static Map<String,String> get(){return headerMap.get();}public static String get(String key) {return headerMap.get().get(key);}public static void set(String key, String value){headerMap.get().put(key,value);}
} 

2、修改服务A 的接口 TestHeaderController

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import com.he.feign.thread.ThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承Enumeration<String> headerNames = servletRequest.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,servletRequest.getHeader(name));}}new Thread(() -> {new Thread(() -> {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}).start();}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承Enumeration<String> headerNames = servletRequest.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,servletRequest.getHeader(name));}}CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
} 

3、修改服务A的 FeignConfig

package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;import java.util.Map;/*** <b>@Desc</b>:   1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@Configuration
public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {
//        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
//        //当主线程的请求执行完毕后,Servlet会被销毁,因此在这里需要做判空
//        if (attributes != null) {
//            HttpServletRequest request = attributes.getRequest();
//
//            Enumeration<String> headerNames = request.getHeaderNames();
//
//            while (headerNames.hasMoreElements()) {
//                String name = headerNames.nextElement();
//                //不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写
//                if (name.equals("feignheader")) {
//                    requestTemplate.header(name,request.getHeader(name));
//                }
//            }
//        }//读取设置的header信息,传递到下一个服务Map<String, String> headerMap = ThreadLocalUtil.get();for (String key : headerMap.keySet()) {log.info("--从ThreadLocal获取消息头传递到下一个服务:key-[{}],value-[{}]",key,headerMap.get(key));requestTemplate.header(key,headerMap.get(key));}}
}

4、重启服务A,测试结果如下
4.1、连续调用 http://localhost:8060/test_header/sub_thread 接口,日志打印如下

2020-06-22 23:18:23.658  INFO 18236 --- [     Thread-131] com.he.feign.config.FeignConfig          : --从ThreadLocal获取消息头传递到下一个服务:key-[feignheader],value-[test]
2020-06-22 23:18:23.662  INFO 18236 --- [     Thread-131] c.h.f.controller.TestHeaderController    : resp: hello from eureka-client
requestURL: http://192.168.56.1:8200/header/test
headers:
feignheader-test
accept-*/*
user-agent-Java/1.8.0_162
host-192.168.56.1:8200
connection-keep-alive 


结合执行日志可知,header信息通过feign成功传递到下一个服务,而且不再出现偶尔失败的情况!

4.2、连续调用接口 http://localhost:8060/test_header/sub_thread/block


综上可见,真正解决从网关或者上层链路,把header经过feign传递到另一个服务,既要配置feign,也需要结合threadlocal。

下一步的优化,可设置拦截器或者切面,把header信息统一设置到threadlocal中。

package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil;
import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Enumeration;
import java.util.Objects;/*** <b>@Desc</b>:   拦截器* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/22* <b>@Modify</b>:*/
public class MyInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//拦截请求,设置header到ThreadLocal中Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,request.getHeader(name));}}return true;}} 
package com.he.feign.config;import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;/*** <b>@Desc</b>:   web配置* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/25* <b>@Modify</b>:*/
@Configuration
public class WebConfig extends WebMvcConfigurationSupport {@Overrideprotected void addInterceptors(InterceptorRegistry registry) {//添加自定义的拦截器registry.addInterceptor(new MyInterceptor()).addPathPatterns("/**");}
}

TestHeaderController修改如下

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Value("${server.port}")private String port;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承//        Enumeration<String> headerNames = servletRequest.getHeaderNames();
//        while (headerNames.hasMoreElements()){
//            String name = headerNames.nextElement();
//            if (Objects.equals(name,"feignheader")){
//                ThreadLocalUtil.set(name,servletRequest.getHeader(name));
//            }
//        }new Thread(() -> {new Thread(() -> {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}).start();}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承//        Enumeration<String> headerNames = servletRequest.getHeaderNames();
//        while (headerNames.hasMoreElements()){
//            String name = headerNames.nextElement();
//            if (Objects.equals(name,"feignheader")){
//                ThreadLocalUtil.set(name,servletRequest.getHeader(name));
//            }
//        }CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: {}", resp);return resp;}}

以上,便是作者针对spring cloud feign 传递 header 信息在多线程情况下失败问题的解决方式,若有错误请指正,欢迎交流指导。
————————————————
版权声明:本文为CSDN博主「HE-RUNNING」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

以上是刚开始使用的文章,但是随着使用线程池,就出现了问题,由于线程池是把线程回收,不是新建,就出现了在变量传递的时候,下次取到线程是从上一次父线程提供的共享变量导致了变量错乱问题。经过研究 阿里的解决方案出现在眼前

加入以下pom依赖:

<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.2.0</version>
</dependency>

  

转载改造hystrix线程池方法:

改造线程池方式

上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:

在启动之前调用进行注册自定义实现的逻辑:

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。

我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.PlatformSpecific;public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);@Overridepublic ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);final int dynamicCoreSize = corePoolSize.get();final int dynamicMaximumSize = maximumPoolSize.get();if (dynamicCoreSize > dynamicMaximumSize) {logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize+ ".  Maximum size will be set to " + dynamicCoreSize+ ", the coreSize value, since it must be equal to or greater than the coreSize value");return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit,workQueue, threadFactory);} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit,workQueue, threadFactory);}}@Overridepublic ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,HystrixThreadPoolProperties threadPoolProperties) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();final int dynamicCoreSize = threadPoolProperties.coreSize().get();final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();final int maxQueueSize = threadPoolProperties.maxQueueSize().get();final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);if (allowMaximumSizeToDivergeFromCoreSize) {final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();if (dynamicCoreSize > dynamicMaximumSize) {logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize+ ".  Maximum size will be set to " + dynamicCoreSize+ ", the coreSize value, since it must be equal to or greater than the coreSize value");return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime,TimeUnit.MINUTES, workQueue, threadFactory);} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime,TimeUnit.MINUTES, workQueue, threadFactory);}} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES,workQueue, threadFactory);}}private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {if (!PlatformSpecific.isAppEngineStandardEnvironment()) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r,"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());thread.setDaemon(true);return thread;}};} else {return PlatformSpecific.getAppEngineThreadFactory();}}
}

ThreadLocalThreadPoolExecutor的代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>();public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void execute(Runnable command) {super.execute(TtlRunnable.get(command));}
}

启动时加入插件

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

使用方法:调用feign client服务之前,设置线程变量

ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);

在FeignAuthConfiguration里,调用appTokenHolder.get();之前加入设置租户id

Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get();
DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);

  

  

使用线程变量三种方式测试:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;public class Test {public static void main(String[] args) throws InterruptedException, ExecutionException {
//      testThreadLocal1();// testThreadLocal2();testThreadLocal3();}private static void testThreadLocal1() throws InterruptedException, ExecutionException {final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {local.set(i + "");System.out.println(local.get());Future<?> future = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":" + local.get());local.set(null);}});future.get();System.out.println(local.get());local.set(null);}}private static void testThreadLocal2() throws InterruptedException, ExecutionException {ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {local.set(i + "");System.out.println(local.get());Future<?> future = executorService.submit(new ParamRunnable(i + ""));future.get();System.out.println(local.get());local.set(null);}}private static void testThreadLocal3() throws InterruptedException, ExecutionException {final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {context.set(i + "");System.out.println(context.get());Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + ":" + context.get());context.set(null);}}));future.get();System.out.println(context.get());context.set(null);}}private static class ParamRunnable implements Runnable {private String param;public ParamRunnable(String param) {this.param = param;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":" + param);}}}

原文链接:https://blog.csdn.net/weishaoqi2/article/details/106964787

Feign接口 多线程问题相关推荐

  1. Java接口多线程并发测试 (一)

    本文为作者原创,禁止转载,违者必究法律责任!!! 本文为作者原创,禁止转载,违者必究法律责任!!! Java接口多线程并发测试 一,首先写一个接口post 请求代码: import org.apach ...

  2. InfluxData【付诸实践 01】SpringBoot 集成时序数据库 InfluxDB 应用分享(InfluxDB实例+Feign接口调用InfluxDB API)源码分享

    1.InfluxDB实例 1.1 依赖及配置 <dependency><groupId>org.influxdb</groupId><artifactId&g ...

  3. 解决Feign接口调用有时候不好用的分析思路

    很多架构师为了鉴权,会把controller带过来的header信息一股脑的利用feign的拦截器带入RequestTemplate,然后方便feign接口鉴权.这时候可能会带入其他的header信息 ...

  4. feign 接口请求405

    在使用feign接口的时候 ,请求出现了405的情况,分析原因如下 看异常信息,应该是请求方式不对, 请求方式就是get: 如果请求方式也正确,检查下feign请求参数,看是否缺少@RequestPa ...

  5. 单独某个设置feign接口的超时时间

    1.在配置文件里配置:hystrix:enabled: true feign:httpclient.enable: falseokhttp.enable: true# 开启熔断hystrix:enab ...

  6. 通过依赖注入feign接口启动项目时报Error creating bean with name ‘xxxcontroller‘错误

    在我做springcloud微服务项目时,服务通过依赖注入feign接口,调用其它的服务,但是启动的时候报了以下错误. 原因就是feign接口方法的形参列表中,需要在参数前边加上@RequestPar ...

  7. Feign接口获取文件流问题

    文件下载 @GetMapping(value = "/v1/files/**/{file_name:.+}")public void downFile(@PathVariable( ...

  8. java使用feign接口下载文件

    1.feign接口定义(注意:Response 导包) import feign.Response; /*** 导出定义*/@PostMapping(value="/xx/export&qu ...

  9. 华为云调用feign接口时出现java.io.IOException: too many bytes written

    最近在开发项目时遇到使用springcloud调用feign接口时没问题,但是在使用华为云时调用feign接口出现java.io.IOException: too many bytes written ...

最新文章

  1. vector容器 begin()与end()函数、front()与back()的用法
  2. 二十八、深入浅出Python中的 logging模块
  3. codeigniter中创建自己的类库
  4. C++之函数指针实现函数回调
  5. C++输入一个整数后接着输入字符串
  6. Arm-Linux 移植 ssh
  7. 无心剑中译叶芝诗17首
  8. eclipse报错:An error has occurred. See error log for more details. java.lang.NullPointerException
  9. 3D Bounding Box Estimation Using Deep Learning and Geometry
  10. win10如何打来计算机的工具,电脑系统教程:Win10自带解压缩文件工具如何使用
  11. 匹配问题——匈牙利算法
  12. 一文告诉你 K8s PR (Pull Request) 怎样才能被 merge?
  13. win10豆沙绿设置
  14. GDSOI2019退役祭
  15. TreeUtil(树形结构转换)
  16. stm32L0系统----开发环境搭建
  17. 【运用flex制作携程界面练习笔记】
  18. 科学家用iPS细胞研究阿尔兹海默氏病最新进展
  19. poj2942 圆桌骑士
  20. 中关村工业互联网产业联盟成立大会成功召开

热门文章

  1. 操作系统上机作业--根据莱布尼兹级数计算PI(1)(多线程)
  2. 《MySQL——外部检测与内部统计 判断 主库是否出现问题》
  3. css链接样式_CSS中的样式链接
  4. dp 扔鸡蛋_使用动态编程(DP)的鸡蛋掉落问题
  5. python中注释语句和运算_python 运算及注释
  6. dapperpoco mysql_.NET(C#)有哪些主流的ORM框架,SqlSugar,Dapper,EF还是...
  7. Unknown column '' in 'field list'
  8. 浅拷贝+引用计数--写时拷贝---模拟实现string容器
  9. 使用wireshark+ssh+tcpdump远程抓包
  10. UVA - 12096:The SetStack Computer