我的同事正在开发一种交易系统,该系统可以处理大量的传入交易。 每笔交易都涵盖一种Instrument (例如债券或股票),并且具有某些(现在)不重要的属性。 他们坚持使用Java(<8),所以我们坚持下去:

class Instrument implements Serializable, Comparable<Instrument> {private final String name;public Instrument(String name) {this.name = name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument = instrument;}//...Java boilerplate}

Instrument稍后将用作HashMap的键,因此将来我们会主动实现Comparable<Instrument> 。 这是我们的领域,现在的要求是:

  1. 交易进入系统,需要尽快处理(无论如何)
  2. 我们可以按任何顺序自由处理它们
  3. …但是,同一种工具的交易需要按照进来时的顺序完全相同地顺序进行。

最初的实现很简单–将所有传入的事务放入一个使用方的队列(例如ArrayBlockingQueue )中。 这满足了最后的要求,因为队列在所有事务中都保留了严格的FIFO顺序。 但是,这种架构阻止了针对不同工具的不相关交易的并发处理,从而浪费了令人信服的吞吐量提高。 毫无疑问,这种实现尽管很简单,却成为了瓶颈。

第一个想法是以某种方式分别按工具和流程工具拆分传入的交易。 我们提出了以下数据结构:

priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx);
}

! 但是最坏的时刻还没有到来。 您如何确保最多一个线程一次处理每个队列? 毕竟,否则,两个线程可以从一个队列(一种仪器)中提取项目并以相反的顺序处理它们,这是不允许的。 最简单的情况是每个队列都有一个Thread -这无法扩展,因为我们期望成千上万种不同的工具。 因此,我们可以说N线程,让每个线程处理队列的一个子集,例如instrument.hashCode() % N告诉我们哪个线程负责处理给定的队列。 但是由于以下三个原因,它仍然不够完美:

  1. 一个线程必须“观察”许多队列(很可能是忙等待),并始终对其进行遍历。 或者,队列可能以某种方式唤醒其父线程
  2. 在最坏的情况下,所有工具都将具有冲突的哈希码,仅针对一个线程-这实际上与我们最初的解决方案相同
  3. 这只是该死的复杂! 漂亮的代码并不复杂!

实现这种怪异是可能的,但是困难且容易出错。 此外,还有另一个非功能性的要求:仪器来来往往,随着时间的流逝,成千上万的仪器。 一段时间后,我们应删除代表最近未见过的仪器的地图条目。 否则我们会发生内存泄漏。

如果您能提出一些更简单的解决方案,请告诉我。 同时,让我告诉你我对同事的建议。 如您所料,它是Akka –结果非常简单。 我们需要两种角色: DispatcherProcessorDispatcher有一个实例,并接收所有传入的事务。 它的责任是为每个Instrument找到或生成工作Processor角色,并向其推送事务:

public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}}
}

这很简单。 由于我们的Dispatcher actor实际上是单线程的,因此不需要同步。 我们几乎没有收到Transaction ,查找或创建Processor并进一步传递Transaction 。 这是Processor实现的样子:

public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);}
}

而已! 有趣的是,我们的Akka实现几乎与我们第一个使用队列映射的想法相同。 毕竟,参与者只是一个队列,还有一个(逻辑)线程在该队列中处理项目。 区别在于:Akka管理有限的线程池,并可能在成千上万的参与者之间共享它。 而且,由于每个工具都有其专用(和“单线程”)执行器,因此可以保证每个工具的事务顺序处理。

还有一件事。 如前所述,有大量的乐器,我们不想让演员出现一段时间了。 假设如果Processor在一个小时内未收到任何交易,则应停止并收集垃圾。 如果以后我们收到此类工具的新交易,则可以随时重新创建它。 这是一个非常棘手的问题–我们必须确保,如果处理器决定删除自身时,如果事务到达,我们将无法松开该事务。 Processor没有停止自身,而是向其父Processor发出空闲时间过长的信号。 然后, Dispatcher将发送PoisonPill到它。 因为ProcessorIdleTransaction消息都是顺序处理的,所以没有交易发送到不再存在的参与者的风险。

每个setReceiveTimeout通过使用setReceiveTimeout安排超时来独立地管理其生命周期:

public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE
}

显然,当Processor在一个小时内未收到任何消息时,它会向其父级( Dispatcher )轻轻发出信号。 但是演员仍然活着,并且如果交易恰好在一小时后发生,便可以处理。 Dispatcher作用是杀死给定的Processor并将其从地图中删除:

public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}

不便之处。 instrumentProcessors过去是Map<Instrument, ActorRef> 。 事实证明这是不够的,因为我们突然不得不按值删除此映射中的条目。 换句话说,我们需要找到一个映射到给定ActorRefProcessor )的键( Instrument )。 有多种处理方法(例如,空闲的Processor可以发送它处理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以起作用,是因为指定的InstrumentActorRef都是唯一的(每个乐器的actor)。 使用BiMap我可以简单地对地图进行inverse() (从BiMap<Instrument, ActorRef>BiMap<ActorRef, Instrument>并将ActorRef视为键。

这个Akka例子只不过是“ hello,world ”。 但是与卷积解决方案相比,我们必须使用并发队列,锁和线程池进行编写,这是完美的。 我的队友非常兴奋,以至于最终他们决定将整个应用程序重写为Akka。

翻译自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html

使用Akka简化交易系统相关推荐

  1. akka使用_使用Akka简化交易系统

    akka使用 我的同事正在开发一种交易系统,该系统可以处理大量的传入交易. 每笔交易都涵盖一种Instrument (例如债券或股票),并且具有某些(现在)不重要的属性. 他们坚持使用Java(< ...

  2. akka 文章 博客

    http://blog.csdn.net/wsscy2004/article/category/2430395 Actor生命周期理解 Actor生命周期理解 镇图:Actor内功心法图 Actor的 ...

  3. akka typed mysql_现代化的 Java (三十四)—— Akka Typed 的 Clojure 封装和简化

    这几天在家读了一下 Akka Typed 的文档,发现还是挺有意思的. 这个体系已经是 Akka 官方推荐的默认风格,现在打开 Akka 官网的最新版 Akka 文档(2.6.0),入门教程就是 ty ...

  4. 简化供采交易路径,B2B电子商务交易系统实现钢铁行业全链路数字化

    钢铁行业是典型的具有规模经济效应的行业,是建设社会主义现代化强国的重要基石,钢铁产业作为国民经济的重要基础产业,钢铁行业正面临供应周期变长.呆滞物料增多.频繁的供应短缺.原材料涨价和产能受到限制等问题 ...

  5. Akka实战:HTTP大文件断点上传、下载,秒传

    2019独角兽企业重金招聘Python工程师标准>>> 访问:https://github.com/yangbajing/scala-applications/tree/master ...

  6. 重构:改善饿了么交易系统的设计思路

    文 | 盛赫 叮~,您有新的饿了么订单,正在阿里云上被接单. 这篇文章成型于交易系统重构一期之后,主要是反思其过程中做决策的思路,我没有使用「架构」这个词语,是因为它给人的感受充满权利和神秘感,谈论「 ...

  7. [转] AKKA简介

    [From] https://blog.csdn.net/linuxarmsummary/article/details/79399602 Akka in JAVA(一) AKKA简介 什么是AKKA ...

  8. Akka的Hello World(二)Akka的Actor生命周期

    (〇)介绍 每当一个Actor停止时,它的所有孩子也会被递归地停止.此行为极大地简化了资源清理,并有助于避免资源泄漏,例如由打开sockets 和文件引起的资源泄漏.实际上,处理低级多线程代码时常常被 ...

  9. Akka入门(一)Akka简介与为什么使用Akka

    AKKA是什么 AKKA是一套开源库,用于设计跨处理器和跨网络的可扩展弹性系统.Akka允许开发者专注于满足业务需求,而不是编写低级代码以提供可靠的行为,容错和高性能. 分布式系统环境中,组件崩溃而不 ...

最新文章

  1. 《面向模式的软件体系结构2-用于并发和网络化对象模式》读书笔记(3)--- 服务访问和配置模式...
  2. 2021-04-05 Python tqdm显示代码任务进度
  3. PMP知识点(一、全局概览)
  4. tensorflow从入门到放弃(三)
  5. 本文主要总结关于mysql的优化(将会持续更新)
  6. android 音量键 广播,【Android 7.0 Audio】: 按键调节音量的调用过程
  7. java sendto,Android:套接字-java.net.SocketException:sendto失败:EPIPE(管道断开)
  8. SpringBoot:Could not autowire there is more than one bean of xx type
  9. 库克为 iOS 操碎了心
  10. AC日记——丢瓶盖 洛谷 P1316
  11. Mybatis(2)——Mapper映射文件
  12. 四川省大学生计算机作品大赛,我校学子在2019“新华三杯”四川省大学生计算机作品大赛中获奖20项...
  13. 图解n=4的汉诺塔问题
  14. Win10企业版本激活方法
  15. 《社会心理学》第一章读书笔记
  16. 表单的js验证框架,只提供提示信息及正则表达式即可自动验证及提示
  17. 小米5(mi5)开启-全面屏手势-详细步骤
  18. C. Madoka and Childish Pranks #777 div2
  19. 轻松复制360个人图书馆的文档
  20. NAS 网络错误0x8007003B的解决方案

热门文章

  1. React中的方法调用
  2. sql server 2008安装_性能不够?基于时序数据库的Zabbix 5.2安装指南
  3. JSON与JS对象的区别
  4. ReviewForJob(3)表、栈和队列
  5. payara 创建 集群_Apache Payara:让我们加密
  6. jdk8 bug_JDK Bug系统浪费时间
  7. java jep_Java 10 – JEP 286:局部变量类型推断
  8. 朝着理想坚实迈进_坚实原则:接口隔离原则
  9. java终结器_Java的终结器仍然存在
  10. java应用性能指标_性能与可靠性:Java应用为何像F1汽车