Hystrix具有折叠(或批处理)请求的高级功能。 如果两个或多个命令同时运行相似的请求,Hystrix可以将它们组合在一起,运行一个批处理请求,并将拆分结果分派回所有命令。 首先让我们看看Hystrix如何工作而不会崩溃。 假设我们有一个StockPrice给定Ticker StockPrice的服务:

import lombok.Value;
import java.math.BigDecimal;
import java.time.Instant;@Value
class Ticker {String symbol;
}@Value
class StockPrice {BigDecimal price;Instant effectiveTime;
}interface StockPriceGateway {default StockPrice load(Ticker stock) {final Set<Ticker> oneTicker = Collections.singleton(stock);return loadAll(oneTicker).get(stock);}ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers);
}

为了方便起见, StockPriceGateway核心实现必须提供loadAll()批处理方法,而实现load()方法则是为了方便。 因此,我们的网关能够批量加载多个价格(例如,以减少延迟或网络协议开销),但是目前我们不使用此功能,始终一次加载一个股票的价格:

class StockPriceCommand extends HystrixCommand<StockPrice> {private final StockPriceGateway gateway;private final Ticker stock;StockPriceCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stock = stock;}@Overrideprotected StockPrice run() throws Exception {return gateway.load(stock);}
}

这样的命令将始终为每个Ticker调用StockPriceGateway.load() ,如以下测试所示:

class StockPriceCommandTest extends Specification {def gateway = Mock(StockPriceGateway)def 'should fetch price from external service'() {given:gateway.load(TickerExamples.any()) >> StockPriceExamples.any()def command = new StockPriceCommand(gateway, TickerExamples.any())when:def price = command.execute()then:price == StockPriceExamples.any()}def 'should call gateway exactly once when running Hystrix command'() {given:def command = new StockPriceCommand(gateway, TickerExamples.any())when:command.execute()then:1 * gateway.load(TickerExamples.any())}def 'should call gateway twice when command executed two times'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:commandOne.execute()commandTwo.execute()then:2 * gateway.load(TickerExamples.any())}def 'should call gateway twice even when executed in parallel'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:2 * gateway.load(TickerExamples.any())}}

如果您不了解Hystrix,则通过将外部调用包装在命令中可以获得许多功能,例如超时,断路器等。但这不是本文的重点。 看一下最后两个测试:当两次,顺序或并行( queue() )两次询问任意行情的价格时,我们的外部gateway也被两次调用。 上一次测试特别有趣–我们几乎同时要求相同的报价,但Hystrix不能弄清楚。 这两个命令是完全独立的,将在不同的线程中执行,彼此之间一无所知-即使它们几乎同时运行。

折叠就是找到类似的请求并将其组合。 批处理(我将这个术语与崩溃互换使用)不会自动发生,并且需要一些编码。 但是首先让我们看看它的行为:

def 'should collapse two commands executed concurrently for the same stock ticker'() {given:def anyTicker = TickerExamples.any()def tickers = [anyTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:0 * gateway.load(_)1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any())
}def 'should collapse two commands executed concurrently for the different stock tickers'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())
}def 'should correctly map collapsed response into individual requests'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setgateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())and:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:def anyPrice = futureOne.get()def otherPrice = futureTwo.get()then:anyPrice == StockPriceExamples.any()otherPrice == StockPriceExamples.other()
}

第一个测试证明,不是两次调用load() ,而是几乎一次调用loadAll() 。 还要注意,由于我们要求相同的Ticker (来自两个不同的线程),因此loadAll()仅请求一个代码。 第二个测试显示两个并发请求,两个不同的行情收录器被折叠为一个批处理调用。 第三次测试可确保我们仍能对每个单独的请求得到正确的响应。 相反,延长HystrixCommand我们必须扩展更加复杂HystrixCollapser 。 现在是时候看到StockTickerPriceCollapsedCommand实现,它无缝替换了StockPriceCommand

class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {private final StockPriceGateway gateway;private final Ticker stock;StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));this.gateway = gateway;this.stock = stock;}@Overridepublic Ticker getRequestArgument() {return stock;}@Overrideprotected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {final Set<Ticker> stocks = collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(toSet());return new StockPricesBatchCommand(gateway, stocks);}@Overrideprotected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {collapsedRequests.forEach(request -> {final Ticker ticker = request.getArgument();final StockPrice price = batchResponse.get(ticker);request.setResponse(price);});}}

这里有很多事情要做,所以让我们逐步回顾StockTickerPriceCollapsedCommand 。 前三种通用类型:

  • BatchReturnType (在我们的示例中为ImmutableMap<Ticker, StockPrice> )是批处理命令响应的类型。 如您将在后面看到的那样,崩溃程序会将多个小命令变成批处理命令。 这是该批处理命令的响应的类型。 请注意,它与StockPriceGateway.loadAll()类型相同。
  • ResponseTypeStockPrice )是要折叠的每个命令的类型。 在我们的例子中,我们正在折叠HystrixCommand<StockPrice> 。 稍后,我们将BatchReturnTypeBatchReturnType为多个StockPrice
  • RequestArgumentTypeTicker )是我们将要折叠(批处理)的每个命令的输入。 当多个命令一起批处理时,我们最终将所有这些替换为一个批处理命令。 此命令应接收所有单个请求,以便执行一个批处理请求。

withTimerDelayInMilliseconds(100)将在稍后说明。 createCommand()创建一个批处理命令。 此命令应替换所有单个命令并执行批处理逻辑。 在我们的情况下,我们不会进行多次单独的load()调用:

class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {private final StockPriceGateway gateway;private final Set<Ticker> stocks;StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stocks = stocks;}@Overrideprotected ImmutableMap<Ticker, StockPrice> run() throws Exception {return gateway.loadAll(stocks);}
}

此类与StockPriceCommand之间的唯一区别是,它需要一堆Ticker并返回所有价格。 Hystrix将收集几个StockTickerPriceCollapsedCommand实例,一旦它具有足够StockTickerPriceCollapsedCommand (稍后再介绍),它将创建一个StockPriceCommand 。 希望这很清楚,因为mapResponseToRequests()涉及的更多。 折叠后的StockPricesBatchCommand完成后,我们必须以某种方式拆分批处理响应,并将回复传达回各个命令,而不会崩溃。 从这个角度来看, mapResponseToRequests()实现非常简单:我们收到批处理响应和包装CollapsedRequest<StockPrice, Ticker>的集合。 现在,我们必须遍历所有正在等待的单个请求并完成它们( setResponse() )。 如果我们不完成某些请求,它们将无限期挂起并最终超时。

怎么运行的

这是描述如何实现折叠的正确时机。 我之前说过崩溃是在同时发生两个请求时发生的。 没有一样的时间了 。 实际上,当第一个可折叠请求进入时,Hystrix会启动一个计时器。 在我们的示例中,我们将其设置为100毫秒。 在此期间,我们的命令被挂起,等待其他命令加入。 在此可配置时间段之后,Hystrix将调用createCommand() ,收集所有请求键(通过调用getRequestArgument() )并运行它。 批处理命令完成后,它将使我们将结果分发给所有等待中的单个命令。 如果我们担心创建庞大的批处理,也可以限制已折叠请求的数量–另一方面,在此短时间内可以容纳多少并发请求?

用例和缺点

请求折叠应在承受高负载(请求频率很高)的系统中使用。 如果每个折叠时间窗口(在示例中为100毫秒)仅收到一个请求,则折叠只会增加开销。 这是因为每次您调用可折叠命令时,它都必须等待,以防万一其他命令想要加入并形成批处理。 仅当至少折叠了几个命令时,这才有意义。 节省网络等待时间和/或更好地利用合作者中的资源可以平衡浪费的等待时间(与单个呼叫相比,批处理请求通常要快得多)。 但是请记住,折叠是一把双刃剑,在特定情况下很有用。

最后要记住的一件事–为了使用请求折叠,您需要在try-finally块中使用HystrixRequestContext.initializeContext()shutdown()

HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {//...
} finally {context.shutdown();
}

崩溃与缓存

您可能会认为可以通过适当的缓存来代替崩溃。 这不是真的。 在以下情况下使用缓存:

  1. 资源可能会被多次访问
  2. 我们可以安全地使用先前的值,它会在一段时间内保持有效, 或者我们确切地知道如何使它无效
  3. 我们可以提供对同一资源的并发请求以多次计算

另一方面,折叠不会强制数据的局部性(1),它总是命中实际服务,并且永远不会返回陈旧的数据(2)。 最后,如果我们从多个线程中请求相同的资源,我们将仅调用一次备份服务(3)。 在进行缓存的情况下,除非您的缓存真的很聪明,否则两个线程将独立地发现缓存中没有给定的资源,并两次请求支持服务。 但是,折叠可以与缓存一起使用-通过在运行可折叠命令之前咨询缓存。

摘要

请求折叠是一个有用的工具,但是用例非常有限。 它可以显着提高我们系统的吞吐量,并限制外部服务的负载。 崩溃可以神奇地平抑流量高峰,而不是将其散布到各处。 只要确保将其用于以极高频率运行的命令即可。

翻译自: https://www.javacodegeeks.com/2014/11/batching-collapsing-requests-in-hystrix.html

Hystrix中的批量(折叠)请求相关推荐

  1. Hystrix面试 - 基于 request cache 请求缓存技术优化批量商品数据查询接口

    Hystrix面试 - 基于 request cache 请求缓存技术优化批量商品数据查询接口 Hystrix command 执行时 8 大步骤第三步,就是检查 Request cache 是否有缓 ...

  2. 软件测试中的批量交易测试

    初次接触批量测试的小伙伴一定一头雾水,不知从何下手.批量交易是什么?它与联机交易有何不同?批量测试都要关注哪些内容?本文结合实际测试经验,详细介绍软件测试中的批量交易测试. 一.什么是批量交易? 本文 ...

  3. 记一次解决 Feign 提交批量添加请求收到 400 报错的经历

    上周在实现用 OpenFeign 提交批量添加请求时,遇到一个奇怪的问题,本来 Post 请求的 body 体,因为是批量请求,内容就比较多,准备起来就比较麻烦,仔细检查了很多次,提交时还是各种失败, ...

  4. JSP中的重定向和请求转发以及它们的区别

    我们先硬着头皮看一下重定向的定义: 重定向(Redirect): 客户端浏览器向Web应用服务器端发送一个请求,Web服务器端使用HttpServletResponse的sendRedirect()方 ...

  5. 爬虫之requests模块在headers参数中携带cookie发送请求

    爬虫之requests模块在headers参数中携带cookie发送请求 网站经常利用请求头中的Cookie字段来做用户访问状态的保持,那么我们可以在headers参数中添加Cookie,模拟普通用户 ...

  6. Python中的http网络请求,用它就对了

    软硬件环境 windows 10 64bits anaconda with python 3.7 requests 2.25.0 简介 requests是用来在Python中进行标准HTTP请求的第三 ...

  7. php循环输出多个网络地址图片,php中curl循环往请求多个URL和多线程去请求多个URL的方法...

    php 中curl 循环去请求多个URL和多线程去请求多个URL的方法 第一种:循环请求$sr=array(url_1,url_2,url_3); foreach ($sr as $k=>$v) ...

  8. Php如何发出请求,PHP中如何发送HTTP请求?

    PHP中如何发送HTTP请求? 在 HTML上 提交参数 A 和 B 到 send.php中,在send.php中接收到传过来的参数 A 和 B 并将这2个参数以 http的形式发送给目标地址:一下是 ...

  9. 请求体的方式传参_Angularjs中$http以post请求通过消息体传递参数的实现方法

    本文实例讲述了Angularjs中$http以post请求通过消息体传递参数的方法.分享给大家供大家参考,具体如下: Angularjs中,$http以post在消息体中传递参数,需要做以下修改,以确 ...

最新文章

  1. react绑定this_React绑定模式:处理“ this”的5种方法
  2. android 6.0 获取手机号,头条小程序获取手机号码,回调未执行,导致无法获取手机号码...
  3. Latex安装中知道的基础常识
  4. 初等数论--同余--WILSON定理
  5. Linux系统安装中文环境,中文帮助,中文输入法的实现
  6. 少儿编程150讲轻松学Scratch(十二)-用Scratch制作石头剪子布游戏
  7. 路由交换基础——DHCP工作原理及DHCP Relay
  8. NFC技术:Android中的NFC技术
  9. office2003word解除安全模式启动
  10. Leedcode错误 error:control reaches end of non-void function[-Werror=return-type]
  11. Android Title标题栏的修改(隐藏,菜单)
  12. 最小公倍数用c语言,如何用C语言求最小公倍数。。。
  13. ICPC焦作站(E、F)+思维+树上dp
  14. oracle查看所有表信息和字段信息以及注释信息等
  15. C51 - DS18B20
  16. 矢量切片工具:tippecanoe
  17. 嵌入式操作系统风云录:历史演进与物联网未来Chapter1 第1章
  18. Vue select的使用以及select设置默认选中,element select联动不能选择问题
  19. flutter 中使用 WebView加载H5页面异常net:ERR_CLEARTEXT_NOT_PERMITTED
  20. 美团新一代渠道包打包神器walle

热门文章

  1. Mybatisplus用updateById默认没有传的值不会进行改变
  2. 计算机网络协议和通信规则,计算机网络协议基本知识
  3. C打印函数printf的一种实现原理简要分析
  4. 连接堡垒机出现java环境_Java 8:长期支持的堡垒
  5. javafx动画_JavaFX动画工具
  6. java的网络编程有用吗_十大有用但又偏执的Java编程技术
  7. maven 可执行 jar_Maven提示:有关可执行jar的所有信息
  8. 序列化和反序列化的概念_序列化的概念
  9. 关于Jakarta EE软件包名称更改的思考
  10. 自然数 素数 质数_在Java中获取素数的无限列表