为什么80%的码农都做不了架构师?>>>   

本文主要研究一下WebClient的LoadBalance支持

代码实例

配置

@Configuration
public class WebClientConfig {@Autowiredprivate LoadBalancerExchangeFilterFunction lbFunction;@Beanpublic WebClient webClient(){return WebClient.builder().filter(lbFunction).build();}
}

请求department-service

@Component
public class DepartmentService {@AutowiredWebClient webClient;public Flux<Department> getDepartmentsByOrgId(Long orgId) {return webClient.get().uri("http://department-service/organization/{orgId}",orgId).retrieve().bodyToFlux(Department.class);}
}

controller

    @AutowiredDepartmentService departmentService;@GetMapping("/departments")public Flux<Department> getDepartmentsById(Long orgId){return departmentService.getDepartmentsByOrgId(orgId);}

/flux/departments?orgId=1

[{"id": 1,"name": "department 1","employees": []}
]

异常情况

Connection refused

2018-04-29 13:09:15 ERROR [organization-service,,,] Failed to handle request [GET http://localhost:8092/flux/departments?orgId=1]
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.16.205.106:8091at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_151]at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_151]at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
Caused by: java.net.ConnectException: Connection refused... 10 common frames omitted

instance can not be null

2018-04-29 13:12:08 ERROR [organization-service,,,] Failed to handle request [GET http://localhost:8092/flux/departments?orgId=1]
java.lang.IllegalArgumentException: instance can not be nullat org.springframework.util.Assert.notNull(Assert.java:193) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.reconstructURI(RibbonLoadBalancerClient.java:53) ~[spring-cloud-netflix-ribbon-2.0.0.RC1.jar:2.0.0.RC1]at org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerExchangeFilterFunction.filter(LoadBalancerExchangeFilterFunction.java:34) ~[spring-cloud-commons-2.0.0.RC1.jar:2.0.0.RC1]at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$andThen$1(ExchangeFilterFunction.java:56) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$apply$2(ExchangeFilterFunction.java:67) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:320) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.retrieve(DefaultWebClient.java:367) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]at com.example.service.DepartmentService.getDepartmentsByOrgId(DepartmentService.java:24) ~[classes/:na]at com.example.controller.FluxDemoController.getDepartmentsById(FluxDemoController.java:23) ~[classes/:na]at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source) ~[na:na]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]at org.springframework.web.reactive.result.method.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:243) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:138) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefaultIfEmpty.subscribe(MonoDefaultIfEmpty.java:37) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1]at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:312) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE]at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:381) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:397) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE]at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final]at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final]at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]

源码解析

LoadBalancerExchangeFilterFunction

spring-cloud-commons-2.0.0.RC1-sources.jar!/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerExchangeFilterFunction.java

public class LoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {private final LoadBalancerClient loadBalancerClient;public LoadBalancerExchangeFilterFunction(LoadBalancerClient loadBalancerClient) {this.loadBalancerClient = loadBalancerClient;}@Overridepublic Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {URI originalUrl = request.url();String serviceId = originalUrl.getHost();Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUrl);//TODO: reactive lb clientServiceInstance instance = this.loadBalancerClient.choose(serviceId);URI uri = this.loadBalancerClient.reconstructURI(instance, originalUrl);ClientRequest newRequest = ClientRequest.method(request.method(), uri).headers(headers -> headers.addAll(request.headers())).cookies(cookies -> cookies.addAll(request.cookies())).attributes(attributes -> attributes.putAll(request.attributes())).body(request.body()).build();return next.exchange(newRequest);}}

对于webclient来说,在这个filterChain中使用了LoadBalancerExchangeFilterFunction,可以看到使用了LoadBalancerExchangeFilterFunction的filter方法里头,对原来的request进行了包装,使用loadBalancerClient根据服务ID进行服务发现选取可用的服务地址,然后替换原来的uri,构造成新的request传递到下一个filter

DefaultWebClientBuilder

spring-webflux-5.0.5.RELEASE-sources.jar!/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.java

class DefaultWebClientBuilder implements WebClient.Builder {@Nullableprivate List<ExchangeFilterFunction> filters;@Overridepublic WebClient.Builder filter(ExchangeFilterFunction filter) {Assert.notNull(filter, "ExchangeFilterFunction must not be null");initFilters().add(filter);return this;}@Overridepublic WebClient build() {ExchangeFunction exchange = initExchangeFunction();ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream().reduce(ExchangeFilterFunction::andThen).map(filter -> filter.apply(exchange)).orElse(exchange) : exchange);return new DefaultWebClient(filteredExchange, initUriBuilderFactory(),unmodifiableCopy(this.defaultHeaders), unmodifiableCopy(this.defaultCookies),new DefaultWebClientBuilder(this));}

可以看到调用webClient的filter就会往filters添加,之后在build的时候,利用ExchangeFilterFunction::andThen构造一个ExchangeFunction,传递给DefaultWebClient的构造器

ExchangeFilterFunction

spring-webflux-5.0.5.RELEASE-sources.jar!/org/springframework/web/reactive/function/client/ExchangeFilterFunction.java

 /*** Apply this filter to the given request and exchange function.* <p>The given {@linkplain ExchangeFunction exchange function} represents the next entity* in the chain, and can be {@linkplain ExchangeFunction#exchange(ClientRequest) invoked}* in order to proceed to the exchange, or not invoked to block the chain.* @param request the request* @param next the next exchange function in the chain* @return the filtered response*/Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next);/*** Return a composed filter function that first applies this filter, and then applies the* {@code after} filter.* @param after the filter to apply after this filter is applied* @return a composed filter that first applies this function and then applies the* {@code after} function*/default ExchangeFilterFunction andThen(ExchangeFilterFunction after) {Assert.notNull(after, "'after' must not be null");return (request, next) -> {ExchangeFunction nextExchange = exchangeRequest -> after.filter(exchangeRequest, next);return filter(request, nextExchange);};}/*** Apply this filter to the given exchange function, resulting in a filtered exchange function.* @param exchange the exchange function to filter* @return the filtered exchange function*/default ExchangeFunction apply(ExchangeFunction exchange) {Assert.notNull(exchange, "'exchange' must not be null");return request -> this.filter(request, exchange);}

ExchangeFilterFunction通过andThen添加filter链(里头的lambda是实现filter这个函数式方法),最后通过apply转换为ExchangeFunction

小结

webClient的loadBalanced支持比restTemplate更为简洁和清晰,直接使用filter模式,通过loadBalancerClient获取服务地址,替换uri再传递给下一个filter。如果loadBalancerClient没能获得到服务地址的话,则RibbonLoadBalancerClient.reconstructURI方法会报错java.lang.IllegalArgumentException: instance can not be null。另外由于注册中心的信息可能有延迟,因为也可能存在Connection refused的异常。

doc

  • Spring WebFlux WebClient as a Load Balancer Client

转载于:https://my.oschina.net/go4it/blog/1803113

聊聊WebClient的LoadBalance支持相关推荐

  1. 聊聊呗|红包系统支持以下三种模式

    本文标题:聊聊呗|红包系统支持以下三种模式 常见的消费者扫码送红包,导购扫码送红包.代理商扫码送红包,都是通过扫码送红包营销活动,来实现企业的各种战略目标. 目标一:扫码送红包,激励导购卖货 导购是门 ...

  2. Shortcut setting in Webclient UI

    Created by Jerry Wang, last modified on Dec 08, 2014 webclient ui能够支持end user为特定的动作设定键盘shortcut: 如上例 ...

  3. python支持向量机回归_支持向量机——核函数与支持向量回归(附Python代码)

    上期跟大家介绍了支持向量机的一般原理,今天继续跟大家聊聊支持向量机--核函数与支持项链回归. 1 核函数 数据通过某种变换,使原本二维的问题通过某种函数转换到高维的特征空间,而这个函数就称为核函数.核 ...

  4. 精讲响应式WebClient第2篇-GET请求阻塞与非阻塞调用方法详解

    本文是精讲响应式WebClient第2篇,前篇的blog访问地址如下: 精讲响应式webclient第1篇-响应式非阻塞IO与基础用法 在上一篇文章为大家介绍了响应式IO模型和WebClient的基本 ...

  5. webclient是什么意思_WebClient 用法小结

    进来的项目中要实现能够在windows service中调用指定项目的链接页面.由于访问页面时候使用的是ie浏览器或其他浏览器,所以想起用webclient类. 如果只想从特定的URI请求文件,则使用 ...

  6. 爬虫二式 —— WebClient

    优点:可设置一个运行JavaScript的时间,解决页面数据延时加载问题 缺点:慢,而且参数不好设置 // 爬取网站网址+关键字(关键字需转换为gbk的url,如"%2B%3E") ...

  7. C#模拟POST提交表单(二)--HttpWebRequest以及HttpWebResponse

    上次介绍了用WebClient的方式提交POST请求,这次,我继续来介绍用其它一种方式 HttpWebRequest以及HttpWebResponse 自认为与上次介绍的WebClient最大的不同之 ...

  8. Sharepoint学习笔记 –架构系列—09 Sharepoint的服务器端对象模型(Server Object Model) 1.物理对象层次结构

    关于Sharepoint的服务器端对象模型的内容很庞大很繁杂,而事实上,我们在这里只把最关键的对象梳理一下,我们会从三个体系来大致描述它们. 这三个体系分别是: 1.物理对象层次结构(Physical ...

  9. TPL 和传统 .NET Framework 异步编程

    .NET Framework 提供了以下两种标准模式,用于执行 I/O 密集型和计算密集型异步操作: 异步编程模型 (APM),其中异步操作由一对 Begin/End 方法(如 FileStream. ...

最新文章

  1. 【jQuery】如何用jQuery实现鼠标移入后改变背景颜色
  2. [再读书]私有构造函数
  3. 像教光学一样在高中教深度学习?怼过LeCun的Google大牛认为这事有出路
  4. 报告:最大化人工智能(AI)机遇
  5. 5G的3大应用场景落地开花,中国或将引领全球5G产业发展
  6. 网络经济与企业管理(第 2 章:企业战略管理)
  7. 快速对比UART、SPI、I2C通信的区别与应用
  8. project euler Problem 52
  9. 4月28日见!努比亚红魔3电竞手机曝光:90Hz屏幕刷新率加持
  10. Unity3D引擎工作笔记0002---Unity3D 5下载安装
  11. OpenGL基础25:多光源(附简单GLSL配置)
  12. 机器学习 之 LBP特征
  13. 重启oracle数据库
  14. idea工程在maven projects中显示灰色的解决办法
  15. 【直播礼物特效】vap原理图理解
  16. 苹果电脑键盘没反应_MAC PRO type-c接口无反应,充电无反应或一直在充电解决方案...
  17. OpenGL(五)立体图形
  18. Android 8.0 Activity启动流程分析
  19. 浅谈对象的深拷贝和浅拷贝
  20. 清理Elasticsearch中过期的日志索引

热门文章

  1. 大数据在银行业的应用场景
  2. it专员职责_网络IT专员岗位职责
  3. spss方差分析_SPSS单因素重复测量方差分析
  4. POJ1149 PIGS(最大流)
  5. 【Linux 内核 内存管理】RCU 机制 ② ( RCU 机制适用场景 | RCU 机制特点 | 使用 RCU 机制保护链表 )
  6. 【错误记录】Ubuntu 安装 Linux 内核后需要重新安装 VMware Tools 软件进行兼容操作
  7. 【Android 插件化】VirtualApp 源码分析 ( 安装应用源码分析 | HomePresenterImpl 添加应用 | AppRepository.addVirtualApp )
  8. 【Flutter】Flutter Gallery 官方示例简介 ( 学习示例 | 邮件应用 | 零售应用 | 理财应用 | 旅行应用 | 新闻应用 | 自适应布局应用 )
  9. 【鸿蒙 HarmonyOS】UI 组件 ( Text 组件 )
  10. 【软件工程】CMMI 能力成熟度模型集成 ( CMMI 工程过程域 | CMMI 支持过程域 ) ★