目前用到一些Metaq的东西,虽然对Metaq的使用很少,并且不是特别深,但还是觉得应该针对其消息的分发以及简单的机制进行一些记录。

Metaq 的简单介绍:

MetaQ(全称Metamorphosis)是一个高性能、高可用、可扩展的分布式消息中间件,,MetaQ具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,METAQ在阿里巴巴各个子公司被广泛应用,每天转发250亿+条消息。主要应用于异步解耦,Mysql数据复制,收集日志等场景。但是目前就我接触的来看,其广播机制是通过定义多个group来进行实现。其对单个消息的大小建议不超过1M,最好几百K。

Metaq的消息机制:

Metaq的消息会对应相应的消息生产者,消息消费者,一种对应的关系可以用下面一张图简单概括:

我们选择发送的每一个消息MessageA一定会选择发送到某个Topic下,那么对于该Topic,可以有若干个Group进行订阅,当某条消息发送到某个Topic下时,所有订阅该Topic的group都会收到MessageA的一个复制消息。而当消息从Group下发到某个机器时,会根据负载等其他机制选择该group下的某台机器进行处理。也就是对于GroupA 下的Machine 1,Machine 2, Machine 3等机器,只会有一台机器收到MessageA对应的复制消息。那么对于每一条消息的Tag是用来做什么呢? 当某台机器得到MessageA时,可以根据其Tag标来进行选择不同的处理器进行处理,这里其实主要是用来对消息进行处理过滤。例如:对于MessageA 加入有标签:newuser,那么我们可以选择newuser对应的processer进行处理,如果标签为olduser,我们可以选择olduser的processer进行处理。

对于Metaq的消息消费,是客户机拉取的方式进行。

简单的生产者,消费者的Java例子。

自己简单写的一个metaq的生产者例子:

<span style="font-size:14px;">public class MetaqProducer {
//private static final Logger log = LoggerFactory.getLogger(MetaqProducer.class);private static MetaqProducer sender = null;private static MetaqConsumer receiver = null;static {try {sender = new MetaqProducer();} catch (MQClientException e) {}}private MetaProducer producer = new MetaProducer("TestGroup");private MetaqProducer() throws MQClientException {producer.setInstanceName(UUID.randomUUID().toString());producer.start();}private SendResult send(String topic, String tag, String key, byte[] body) {Message msg = new Message(topic, tag, key, body);try {return producer.send(msg);}catch(Exception e) {return new SendResult();}}public static SendResult sendMessage(String topic, String tag, String key, byte[] body) {return sender.send(topic, tag, key, body);}/*** 测试用* @throws MQClientException * @throws InterruptedException */public static void main(String[] args) throws MQClientException, InterruptedException{System.out.println("Test Start!");int j = 100;int i =0 ;for( ; i < j ; i++){System.out.println("i="+i);SendResult r = sendMessage("testTopic", "testTag", "TestOnly"+ UUID.randomUUID().toString(), ("Hello world" + i).getBytes());System.out.println(r.getMsgId()+"    ");System.out.println(r.getSendStatus().toString());}}
}</span><span style="font-size: 18px;">
</span>

自己简单写的一个metaq的消费者例子:

<span style="font-size:14px;">public class MetaqConsumer {private static MetaPushConsumer consumer = null;public static MetaPushConsumer getMetaqClient(){consumer = new  MetaPushConsumer("TestGroup");consumer.setInstanceName(UUID.randomUUID().toString());consumer.setConsumeThreadMax(10);consumer.setConsumeThreadMin(5);try {consumer.subscribe("testTopic", "testTag");} catch (MQClientException e) {// TODO Auto-generated catch blocke.printStackTrace();}return consumer;}public static void receiveMsg() throws MQClientException{consumer = getMetaqClient();consumer.registerMessageListener(new MessageListenerConcurrently() {/*** 1、默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br>* 2、如果设置为批量消费方式,要么都成功,要么都失败。<br>* 3、此方法由MetaQ客户端多个线程回调,需要应用来处理并发安全问题<br>* 4、抛异常与返回ConsumeConcurrentlyStatus.RECONSUME_LATER等价<br>* 5、每条消息失败后,会尝试重试,重试5次都失败,则丢弃<br>*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {if (msgs == null || msgs.size() == 0) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}if (msgs.size() == 1) {// 一个消息String data = new String(msgs.get(0).getBody());System.out.println("Test");System.out.println(data);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}else{for(MessageExt s : msgs){String data = new String(s.getBody());System.out.println("Test");System.out.println(data);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}});// Consumer对象在使用之前必须要调用start初始化,初始化一次即可consumer.start();}public static void main(String[] args) throws MQClientException{receiveMsg();}}</span>

一个很好的消息中间件,目前可能对他的了解还很浅薄。希望能够在更多的应用场景中对其有所了解。

Metaq的一些简单机制相关推荐

  1. 神策军丨优秀 Leader 养成记:多做简单又有效的事

    一个优秀的 Leader,对上要理解公司战略和目标,找准定位指明方向:对下要拆解目标,合理设计工作内容,让团队形成一股合力,朝着目标努力. 实现目标的道路很多,具体选择背后是公司价值观.把通向理想中精 ...

  2. 使用NSURLProtocol实现UIWebView的离线缓存的简单实现

    文章介绍了使用NSURLProtocol实现UIWebView的离线缓存的简单实现,你可以在github上下载这个demo的代码. 无论是"MKNetworkKit"还是" ...

  3. 管理任务执行-如何制定有效的机制

    背景 之前做工程师的时候,自己负责数据开发,现在做管理了,数据开发这一块工作分给了团队成员,管理沟通占据了大量的时间,项目的质量没有很好的把控.问题出在哪? 想让团队成员分担我们手头的工作,要么靠梯队 ...

  4. 《游戏机制——高级游戏设计技术》一2.4 渐进型游戏

    本节书摘来异步社区<游戏机制--高级游戏设计技术>一书中的第2章,第2.4节,作者: [美]Ernest Adams 译者: 石曦 责编: 陈冀康,更多章节内容可以访问云栖社区" ...

  5. 《游戏机制——高级游戏设计技术》一1.1 规则定义游戏

    本节书摘来异步社区<游戏机制--高级游戏设计技术>一书中的第1章,第1.1节,作者: [美]Ernest Adams 译者: 石曦 责编: 陈冀康,更多章节内容可以访问云栖社区" ...

  6. 如何用Machinations示意图来模拟《吃豆人》的游戏机制?

    下面我们来展示一下如何用Machinations 示意图来模拟一个简单游戏的机制.我们使用的案例是经典街机游戏<吃豆人>(Pac-Man),我们将会把模拟这个游戏的过程分解成六步,并在Ma ...

  7. 智能合约及其web3共识机制

    目录 什么是共识? 什么是共识机制? 共识机制的目标 为什么需要共识机制? 如何评价一个共识机制的优劣: 共识机制分类 PoW( Proof of Work)工作量证明:多劳多得 PoS(Proof ...

  8. 数据库字段命名及设计规范

    1.设计原则 1) 标准化和规范化 数据的标准化有助于消除数据库中的数据冗余.标准化有好几种形式,但 Third Normal Form(3NF)通常被认为在性能.扩展性和数据完整性方面达到了最好平衡 ...

  9. 可伸缩系统的设计模式(译)

    Ricky Ho在他的博客中分享了该文章,该文章是一个简单的概括分享,详细的可以参见他博客的其它详细文章.下面主要是意译. 1.Load Balancer:负载均衡 – 由分发者来决定哪个工作者处理下 ...

最新文章

  1. 经验分享:如何在自己的创业中,用上GPT-3等AI大模型
  2. Socket通信原理探讨(C++为例)
  3. 用 JMX 检测应用程序
  4. 科大星云诗社动态20211201
  5. ElasticSearch配置扩展分词
  6. 理论修炼之ETCD,高一致性Key-Value服务提供者中的佼佼者
  7. 三国志战略版360区S4服务器合并信息,三国志战略版pk赛季怎么转区?s4转区规则[多图]...
  8. 二,表格table的使用细节
  9. 开源GIS(十)——openlayers中加载在线标准与自定义切片
  10. 【光学】基于Matlab模拟光流场
  11. JSOUP爬虫常见问题解决方法
  12. 加载项目的时候提示:需要缺少的web组件才能进行加载
  13. CISCO路由基本配置命令
  14. js 随机生成时间段
  15. 秋招一个半月流水账+招银网路科技offer
  16. 使用 yarn 安装时,报错node_modules\node sass:Command failed.
  17. Android开发支付集成——微信集成
  18. Davinci的异构多核间通信基础组件SysLink 2.0
  19. 折腾黑群晖之域名直接访问群晖
  20. 关于excel选定任意行截图的操作

热门文章

  1. [树状数组] Galahad
  2. vue配置favicon.ico图标
  3. input输入框type参数
  4. 百分百胜率只是个例,我们追求的目标是稳步获利!
  5. 工具之DBeaver安装及使用
  6. dnf加物理攻击的卡片有哪些_dnf物理攻击宝珠_dnf2019物理攻击宝珠大全_快吧游戏...
  7. V-for and slot-scoped报错问题
  8. 这6个编程语言排行榜,据说全都知道的人不足1% | 年终榜单大盘点
  9. Robomongo与MongoDB的故事
  10. jquery服务器文件保存到本地,jQuery本地存储