几周前,我写了一篇有关Hazelcast入门的博客,描述了创建分布式地图,列表和队列是多么简单。 当时我提到Hazelcast还能做很多其他事情。 该博客快速浏览了Hazelcast的另一个功能:基于Publish / Subscribe模式的广播消息系统。 这采用通常的格式,即邮件发件人应用通过该格式发布有关特定主题的邮件。 这些消息并不针对任何特定的客户端,而是可以由对主题感兴趣的任何客户端读取。

发布和订阅的明显场景来自高金融和做市商的世界。 做市商买卖股票之类的金融工具,并通过在通常是电子的市场上宣传买卖价格来竞争业务。 为了使用Hazelcast实现非常简单的做市商方案,我们需要三个类: StockPrice bean, MarketMakerClient

以下代码已添加到我在Github上可用的现有Hazelcast项目中。 无需担心其他POM依赖项。

public class StockPrice implements Serializable { private static final long serialVersionUID = 1L; private final BigDecimal bid; private final BigDecimal ask; private final String code; private final String description; private final long timestamp; /** * Create a StockPrice for the given stock at a given moment */ public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, long timestamp) { super(); this.bid = bid; this.ask = ask; this.code = code; this.description = description; this.timestamp = timestamp; } public BigDecimal getBid() { return bid; } public BigDecimal getAsk() { return ask; } public String getCode() { return code; } public String getDescription() { return description; } public long getTimestamp() { return timestamp; } @Override public String toString() { StringBuilder sb = new StringBuilder("Stock - "); sb.append(code); sb.append(" - "); sb.append(description); sb.append(" - "); sb.append(description); sb.append(" - Bid: "); sb.append(bid); sb.append(" - Ask: "); sb.append(ask); sb.append(" - "); SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); sb.append(df.format(new Date(timestamp))); return sb.toString(); }
}

StockPrice bean具有所有常用的获取器和设置器,可以在任何给定时间模拟股票的买价和买价(以正常语言进行买卖),并且MarketMaker类使用Hazelcast发布这些bean。

通常,做市商会在一种以上的金融工具中发布价格; 但是,为简单起见, MarketMaker在此演示中仅发布单个价格。

public class MarketMaker implements Runnable { private static Random random = new Random(); private final String stockCode; private final String description; private final ITopic<StockPrice> topic; private volatile boolean running; public MarketMaker(String topicName, String stockCode, String description) { this.stockCode = stockCode; this.description = description; this.topic = createTopic(topicName); running = true; } @VisibleForTesting ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } public void publishPrices() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { do { publish(); sleep(); } while (running); } private void publish() { StockPrice price = createStockPrice(); System.out.println(price.toString()); topic.publish(price); } @VisibleForTesting StockPrice createStockPrice() { double price = createPrice(); DecimalFormat df = new DecimalFormat("#.##"); BigDecimal bid = new BigDecimal(df.format(price - variance(price))); BigDecimal ask = new BigDecimal(df.format(price + variance(price))); StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis()); return stockPrice; } private double createPrice() { int val = random.nextInt(2010 - 1520) + 1520; double retVal = (double) val / 100; return retVal; } private double variance(double price) { return (price * 0.01); } private void sleep() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } public void stop() { running = false; } public static void main(String[] args) throws InterruptedException { MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); bt.publishPrices(); cbry.publishPrices(); bp.publishPrices(); } }

像往常一样,设置Hazelcast相当简单,上面MarketMaker类中的大多数代码与Hazelcast无关。 该课程分为两部分:建筑价格和出版价格。 构造函数采用三个参数,将其存储起来以备后用。 它还创建一个Hazelcast实例,并通过私有createTopic()方法注册一个名为"STOCKS"的简单主题。 如您所料,创建Hazelcast实例并注册主题需要两行代码,如下所示:

ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); }

该类的其余部分使用线程来调用MarketMakerrun()方法来运行价格发布机制。 此方法生成随机出价,为关联的股票代码要价,并使用Hazelcast发布。 使用以下单行代码即可完成发布:

topic.publish(price);

MarketMaker类的最后一部分是main()方法,其作用是创建多个MarketMaker实例并使它们运行。

现在,Hazelcast知道了我们不断变化的股价,接下来要做的就是整理客户代码。

public class Client implements MessageListener<StockPrice> { public Client(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); } /** * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) */ @Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } public static void main(String[] args) { new Client("STOCKS"); } }

与任何消息传递系统一样,消息发送者代码必须知道呼叫谁和呼叫什么。 客户端通过创建Hazelcast实例并在"STOCKS"主题中注册兴趣来实现“调用什么"STOCKS" ,方法与发布者相同,如下所示:

HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this);

客户端实现Hazelcast的MessageListener接口及其单一方法onMessage()实现“呼叫”

@Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); }

客户端代码的最后一部分是其main()方法,该方法创建一个客户端实例。

最后要做的是运行代码。 为此,我仅将所有必需的JAR文件放在一个目录中,只需考虑两个:hazel cast-3.1.jar和guava-13.0.1.jar。

完成后,我转到项目的classes目录:

cd /Users/Roger/git/captaindebug/hazelcast/target/classes

…并解雇了发布者

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

……然后是客户。

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client

当然,如果您正在使用此粗略且已准备好的技术在计算机上运行此程序,则请记住将其替换
/Users/Roger/tmp/mm以及放置这些JAR文件副本的路径。

如果您在一个终端中运行MarketMaker发布者,并在其他两个终端中运行几个客户,那么您将得到类似的信息,在这里您可以看到正在发布的价格以及客户正在接收更新。

关于Hazelcast的一件事要注意的是,“ 集群 ”是指Hazelcast实例的集群,而不是JVM的集群。 在您为每个应用程序请求多个Hazelcast实例之前,这是不明显的。 当其他客户端加入集群时,您将看到类似以下内容:

Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}

在上面的日志中,有两个侦听器条目,每个侦听器条目一个,每个发布者条目三个,在MarketMakermain()方法中启动的每个MarketMaker实例一个。


这里要考虑的事情是,是否每个对象实例创建一个Hazelcast实例是一种好习惯(就像我在示例代码中所做的那样),还是在代码中有一个static Hazelcast实例更好。 我不确定该答案是什么,因此,如果有任何Hazelcast专家正在阅读此书,请告诉我。

就是这样:Hazelcast可以在发布和订阅模式下愉快地运行,但是我还没有介绍Hazelcast的所有功能。 也许以后再说……

  • 可以在Github上找到此源代码: https : //github.com/roghughe/captaindebug/tree/master/hazelcast
参考: Captain Debug的Blog博客上的JCG合作伙伴 Roger Hughes的Hazelcast发布和订阅 。

翻译自: https://www.javacodegeeks.com/2014/01/publish-and-subscribe-with-hazelcast.html

使用Hazelcast发布和订阅相关推荐

  1. hazelcast 使用_使用Hazelcast发布和订阅

    hazelcast 使用 几周前,我写了一篇有关Hazelcast入门的博客,描述了创建分布式地图,列表和队列是多么简单. 当时我提到,Hazelcast还做很多其他事情. 该博客快速介绍了Hazel ...

  2. hazelcast_使用Hazelcast发布和订阅

    hazelcast 几周前,我写了一篇有关Hazelcast入门的博客,描述了创建分布式地图,列表和队列是多么简单. 当时我提到,Hazelcast还做很多其他事情. 该博客快速介绍了Hazelcas ...

  3. 面向.NET开发人员的Dapr——发布和订阅

    目录: 面向.NET开发人员的Dapr--前言 面向.NET开发人员的Dapr--分布式世界 面向.NET开发人员的Dapr--俯瞰Dapr 面向.NET开发人员的Dapr--入门 面向.NET开发人 ...

  4. 面试被问到Redis实现发布与订阅,手摸手教

    简介 Redis发布与发布功能(Pub/Sub)是基于事件座位基本的通信机制,是目前应用比较普遍的通信模型,它的目的主要是解除消息的发布者与订阅者之间的耦合关系. Redis作为消息发布和订阅之间的服 ...

  5. Meteor:发布与订阅

    我们可以使用安全的方法让用户端不直接操作数据库,但是还是可以直接读取数据库内容,如果我们还需要保护私有的数据存储,在客户端直接使用Collection.find(),这样的操作方式在实际的项目中并不会 ...

  6. 知方可补不足~SQL2008中的发布与订阅模式~续

    上一回介绍了如何在sql2008中建立一个数据库的发布者,今天来说一下如何建立一个订阅者,其实订阅者也是一个数据库,而这个数据库是和发布者的数据结构相同的库,它们之间通过SQL代理进行数据上的同步. ...

  7. etcd分布式之消息发布与订阅

    分布式之消息发布与订阅:        应用中用到的一些配置信息放到etcd上进行集中管理        索引的元信息和服务器集群机器的节点状态存放在etcd中        分布式日志收集系统   ...

  8. 023_Jedis的发布和订阅

    1. Redis为我们提供了publish/subscribe(发布/订阅)功能.我们可以对某个channel(频道)进行subscribe(订阅),当有人在这个channel上publish(发布) ...

  9. Redis发布与订阅(pub/sub)

    Redis发布与订阅(pub/sub) 本文档翻译自: http://redis.io/topics/pubsub . SUBSCRIBE . UNSUBSCRIBE 和 PUBLISH 三个命令实现 ...

最新文章

  1. 奔图打印机显示未连接_手机连接奔图打印机,无法打印的解决方法
  2. rails的一些问题
  3. thinkcmf 去掉index.php,​ThinkCMF5.0如何修改入口文件 解决方法
  4. 10、MySQL存储引擎有哪些?
  5. 集算器访问HTTP数据的代码示例
  6. 线程协作-CountDownLatch
  7. 一句话解释什么是回归
  8. 基于用户滚动应用CSS
  9. SQL Server中的数据库文件组和零碎还原
  10. Android实训案例(四)——关于Game,2048方块的设计,逻辑,实现,编写,加上色彩,分数等深度剖析开发过程!...
  11. 剑指 Offer II 106. 二分图
  12. JavaScript和HTML及CSS的通俗解释
  13. SharePoint 2013 同步FBA认证用户
  14. centos标准分区调整大小_磁盘怎么调整分区大小 磁盘调整分区大小教程【详细步骤】...
  15. win10操作系统上编译assimp库
  16. 求余函数mod和fmod
  17. 关于Unable to read additional data from server sessionid 0x0问题的解决
  18. 基于ARM开发板的嵌入式项目设计(C完整代码)
  19. 郑职院官计算机网络,2020年陕西省青年职业技能大赛计算机网络管理员决赛开幕式在汉中职院举行...
  20. UML系列——包图Package

热门文章

  1. 同步和异步有何异同,什么场景使用
  2. phpst安装memcache扩展_在 Ubuntu/Debian 下安装 PHP7.3 教程
  3. java_advanced_review(3)补充:利用网络套接字实现类似qq 的控制台通讯
  4. optional空值判断_Java 8 Optional不仅用于替换空值
  5. 响应式多级菜单 侧边菜单栏_使用纯HTML和OmniFaces构建动态响应的多级菜单
  6. c#中overlord实例_具有Overlord的WildFly 8.1中的API管理
  7. 维持硒测试自动化的完美方法
  8. 管理多个Java安装
  9. 模拟用户输入并检查输出的简单方法
  10. mongodb web_MongoDB和Web应用程序