akka使用

我的同事正在开发一种交易系统,该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument (例如债券或股票),并且具有某些(现在)不重要的属性。 他们坚持使用Java(<8),所以我们坚持下去:

class Instrument implements Serializable, Comparable<Instrument> {private final String name;public Instrument(String name) {this.name = name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument = instrument;}//...Java boilerplate}

Instrument稍后将用作HashMap的键,因此将来我们会主动实现Comparable<Instrument> 。 这是我们的领域,现在的要求是:

  1. 交易进入系统,需要尽快处理(无论如何)
  2. 我们可以自由地以任何顺序处理它们
  3. …但是,同一种工具的交易需要按照进来时完全相同的顺序进行处理。

最初的实现很简单–将所有传入的事务放入一个使用者的队列(例如ArrayBlockingQueue )中。 这满足了最后一个要求,因为队列在所有事务中保留了严格的FIFO顺序。 但是,这样的架构阻止了针对不同工具的不相关交易的并发处理,从而浪费了令人信服的吞吐量提高。 毫无疑问,这种实现尽管很简单,却成为了瓶颈。

第一个想法是以某种方式分别按工具和过程工具拆分传入的交易。 我们提出了以下数据结构:

priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx);
}

! 但是最坏的时刻还没有到来。 您如何确保最多一个线程一次处理每个队列? 毕竟,否则,两个线程可以从一个队列(一种工具)中提取项目,并以相反的顺序处理它们,这是不允许的。 最简单的情况是每个队列都有一个Thread -这无法扩展,因为我们期望成千上万种不同的工具。 因此,我们可以说N线程,让每个线程处理队列的一个子集,例如instrument.hashCode() % N告诉我们哪个线程负责处理给定的队列。 但是由于以下三个原因,它仍然不够完美:

  1. 一个线程必须“观察”许多队列(很可能是忙于等待),并始终对其进行遍历。 或者,队列可能以某种方式唤醒其父线程
  2. 在最坏的情况下,所有工具都将具有冲突的哈希码,仅针对一个线程-这实际上与我们最初的解决方案相同
  3. 这只是该死的复杂! 漂亮的代码并不复杂!

实现这种怪异是可能的,但是困难且容易出错。 此外,还有另一个非功能性的要求:仪器来来往往,随着时间的流逝,成千上万的仪器。 一段时间后,我们应删除代表最近未见过的仪器的地图条目。 否则我们会发生内存泄漏。

如果您能提出一些更简单的解决方案,请告诉我。 同时,让我告诉你我对同事的建议。 如您所料,它是Akka –结果非常简单。 我们需要两种角色: DispatcherProcessorDispatcher有一个实例,并接收所有传入的事务。 它的职责是为每个Instrument找到或生成工作Processor角色,并将交易推向该角色:

public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}}
}

这很简单。 由于我们的Dispatcher actor实际上是单线程的,因此不需要同步。 我们几乎没有收到Transaction ,查找或创建Processor并进一步传递Transaction 。 这是Processor实现的样子:

public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);}
}

而已! 有趣的是,我们的Akka实现几乎与我们第一个使用队列映射的想法相同。 毕竟,参与者只是一个队列,还有一个(逻辑)线程处理该队列中的项目。 区别在于:Akka管理有限的线程池,并可能在成千上万的参与者之间共享它。 而且,由于每个工具都有其专用(和“单线程”)执行器,因此可以保证每个工具的事务的顺序处理。

还有一件事。 如前所述,有大量的乐器,我们不想让演员们出现一段时间了。 假设如果Processor在一个小时内未收到任何交易,则应停止并收集垃圾。 如果以后我们收到此类工具的新交易,则可以随时重新创建它。 这是一个非常棘手的问题–我们必须确保,如果在处理器决定删除自身时交易到达,我们就不能放弃该交易。 Processor没有停止自身,而是向其父Processor发出空闲时间过长的信号。 然后, Dispatcher将发送PoisonPill到它。 因为ProcessorIdleTransaction消息都按顺序处理,所以没有交易发送到不再存在的参与者的风险。

每个setReceiveTimeout通过使用setReceiveTimeout安排超时来独立地管理其生命周期:

public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE
}

显然,当Processor在一个小时内未收到任何消息时,它会向其父级( Dispatcher )轻轻发出信号。 但是演员仍然活着,并且只要一个小时后发生交易就可以处理交易。 Dispatcher作用是杀死给定的Processor并将其从地图中删除:

public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}

不便之处。 instrumentProcessors过去是Map<Instrument, ActorRef> 。 事实证明这是不够的,因为我们突然不得不按值删除此映射中的条目。 换句话说,我们需要找到一个映射到给定ActorRefProcessor )的键( Instrument )。 有多种处理方法(例如,空闲Processor可以发送它处理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以可以使用它,是因为指定的InstrumentActorRef都是唯一的(每个演员actor)。 使用BiMap我可以简单地对地图进行inverse() (从BiMap<Instrument, ActorRef>BiMap<ActorRef, Instrument>并将ActorRef视为键。

这个Akka例子只不过是“ hello,world ”。 但是与复杂的解决方案相比,我们必须使用并发队列,锁和线程池进行编写,这是完美的。 我的队友非常兴奋,以至于最终他们决定将整个应用程序重写为Akka。

翻译自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html

akka使用

akka使用_使用Akka简化交易系统相关推荐

  1. akka应用_处理Akka应用程序中的每个事件

    akka应用 这里的事件,那里的事件,到处都是事件. 发布有关检查每一项Akka事件最终都能找到归宿的信息. Akka和基于事件的React式应用程序是创建软件的新方法. 在当前基于Scala的项目中 ...

  2. akka 异常处理_使用Akka处理1000万条消息

    akka 异常处理 Akka演员承诺并发. 有什么更好的模拟方法,看看使用商品硬件和软件处理1000万条消息需要花费多少时间,而无需进行任何低级调整.我用Java编写了整个1000万条消息的处理过程, ...

  3. java akka 实战_《Akka实战:快速构建高可用分布式应用》(杜云飞)【摘要 书评 试读】- 京东图书...

    Akka 是一款优秀的分布式并发框架,虽然它是基于 Scala 语言实现的,但我们却可轻松地将其运行在JVM上,在不改变现有架构的基础上支持更高的并发量.另一方面,Akka 是一款轻量级开源技术,它既 ...

  4. 莫德友_去哪儿酒店交易系统架构实践

    2019独角兽企业重金招聘Python工程师标准>>> 莫德友_去哪儿酒店交易系统架构实践 去哪儿网在刚开始做酒店交易业务的时候,为了实现各个业务线的快速搭建和运营,采取了比较简单粗 ...

  5. Akka 指南 之「Akka 简介」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. Akka 简介 欢迎来到 Akka,它是一组用于设计跨越处理器和网络的可扩展.弹性系统 ...

  6. akka学习教程(十三) akka分布式

    akka系列文章目录 akka学习教程(十四) akka分布式实战 akka学习教程(十三) akka分布式 akka学习教程(十二) Spring与Akka的集成 akka学习教程(十一) akka ...

  7. 使用Akka简化交易系统

    我的同事正在开发一种交易系统,该系统可以处理大量的传入交易. 每笔交易都涵盖一种Instrument (例如债券或股票),并且具有某些(现在)不重要的属性. 他们坚持使用Java(<8),所以我 ...

  8. java akka 教程_快速入门 Akka Java 指南

    快速入门 Akka Java 指南 Akka 是一个用于在 JVM 上构建高并发.分布式和容错的事件驱动应用程序的运行时工具包.Akka 既可以用于 Java,也可以用于 Scala.本指南通过描述 ...

  9. akka+java实现_使用Akka实现并发

    介绍 我开始分配读取包含100列和10万行的CSV文件并将其写入数据库. 方法1:简单的Java程序 所以我从一个简单的Java程序开始,运行一个while循环直到EOF,然后进行JDBC调用来存储值 ...

最新文章

  1. 特斯拉Tesla Model 3整体架构解析(上)
  2. VS2010 VS2012 的快捷键
  3. 基于SpringBoot和Vue的分布式爬虫系统(JavaWeb)
  4. php查询数据存到下一界面_PHP从另一个页面获取数据
  5. python随机生成20个数字_python – 生成大量唯一的随机float32数字
  6. Android逆向文档阅读笔记-Android Application Fundamentals
  7. Android 系统(196)---Android 属性动画
  8. Linux指令:lspci显示PCI总线设备信息
  9. ICE通信之IceGrid服务(一)
  10. 百度微软云服务器地址,win10的ie浏览器默认地址被百度劫持
  11. java调用其他程序吗_java本地方法如何调用其他程序函数,方法详解
  12. java.lang.NoClassDefFoundError: weblogic/rmi/extensions/DisconnectListener
  13. ArcView Image Analyst v1.0.rar
  14. Java到底能干什么?有哪些实际用途?
  15. Csdn富文本编辑器中使用Emoji表情包
  16. php amp 转义,HTML转义和反转义
  17. FPGA-DDRx的VTT电源设计要点
  18. 企业办公入门之选 用ThinkCentre E95更划算!
  19. 移动互联网产品设计的原则
  20. 操作系统实验--进程的创建撤销和控制

热门文章

  1. 【dfs】家族(jzoj 1985)
  2. 深入理解TCP/IP协议-TCP建立与终止连接
  3. 如何使ArrayList 线程安全
  4. Oracle入门(十二C)之表修改
  5. Oracle入门(七A)之表空间配额(quota)
  6. 架构师必须搞懂DNS,一篇文章就够了。
  7. SpringCloud注册中心高可用搭建
  8. 【Python】Conda的安装
  9. Spring AOP知识点简介
  10. javaWeb服务详解(含源代码,测试通过,注释)