为什么80%的码农都做不了架构师?>>>   

一、maven依赖

<dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client</artifactId><version>1.4.6.2</version>
</dependency><dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client-extension</artifactId><version>1.4.6.2</version>
</dependency>

二、发送者

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import java.io.BufferedReader;
import java.io.InputStreamReader;/*** Created by lc-t123 on 2016/4/14.*/
public class Producer {public static void main(String[] args) throws Exception {final MetaClientConfig metaClientConfig = new MetaClientConfig();final ZKConfig zkConfig = new ZKConfig();//设置zookeeper地址zkConfig.zkConnect = "192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);// New session factory,强烈建议使用单例MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);/**  create producer,强烈建议使用单例*  消息生产者的接口是MessageProducer,你可以通过它来发送消息*/MessageProducer producer = sessionFactory.createProducer();// publish topicfinal String topic = "test";/** 这一步在发送消息前是必须的,你必须发布你将要发送消息的topic* 这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接* 这个步骤针对每个topic只需要做一次,多次调用无影响*/producer.publish(topic);BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));String line = null;while ((line = reader.readLine()) != null){/** send message* 在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:* 1) id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。* 2) topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。* 3) data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。* 4) attribute: 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。*/SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));// check resultif (!sendResult.isSuccess()){System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());}else {System.out.println("Send message successfully,sent to " + sendResult.getPartition());}}}
}

三、接收者

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import java.util.concurrent.Executor;public class AsyncConsumer {public static void main(String[] args) throws Exception {final MetaClientConfig metaClientConfig = new MetaClientConfig();final ZKConfig zkConfig = new ZKConfig();//设置zookeeper地址zkConfig.zkConnect = "192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);// New session factory,强烈建议使用单例MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);// subscribed topicfinal String topic = "test";// consumer groupfinal String group = "meta-example";/** create consumer,强烈建议使用单例* 通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,* 这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,* 我们这里只设置了group属性,这是消费者的分组名称。* Meta的Producer、Consumer和Broker都可以为集群。* 消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。* 同一个消费者集群必须拥有同一个分组名称,也就是同一个group。我们这里将分组名称设置为meta-example*/MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));/** subscribe topic* 订阅消息通过subscribe方法,这个方法接受三个参数* 1) topic,订阅的主题* 2) maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,默认最大为1M。* 3) MessageListener,消息监听器,负责消息消息。*/consumer.subscribe(topic, 1024 * 1024, new MessageListener() {public void recieveMessages(Message message) {System.out.println("Receive message " + new String(message.getData()));}public Executor getExecutor() {// Thread pool to process messages,maybe null.return null;}});// complete subscribeconsumer.completeSubscribe();}
}

metaq-server安装参考官方文档

可以通过http://192.168.1.70:8120/ 访问web界面

转载于:https://my.oschina.net/chaun/blog/659479

metaq发送和接收消息demo相关推荐

  1. 【译】 WebSocket 协议第六章——发送与接收消息(Sending and Receiving Data)

    概述 本文为 WebSocket 协议的第六章,本文翻译的主要内容为 WebSocket 消息发送与接收相关内容. 发送与接收消息(协议正文) 6.1 发送数据 为了通过 WebSocket 连接发送 ...

  2. springboot websocket发送和接收消息代码资源下载

    客户端发送给服务端的消息,同时接收服务端响应给客户端的消息: 服务端收到客户端发送过来的消息,并响应给客户端消息:

  3. ActiveMQ 发送和接收消息

    一.添加 jar 包 <dependency><groupId>org.apache.activemq</groupId><artifactId>act ...

  4. java mqtt客户端_java 实现mqtt发送和接收消息客户端具体用法及测试代码

    注:客户端代码实现请看我的上一篇 1mqtt发送消息 发送时不用多讲,每次发送肯定需要运行一次发送消息方法 MyMqttClient mqttClient = new MyMqttClient(); ...

  5. SpringAMQP发送与接收消息

    同步调用的优点:时效性较强,可以立即得到结果 同步调用的问题: 1.耦合度高 2.性能和吞吐能力下降 3.有额外的资源消耗 4.有级联失败问题 异步通信的优点: 1.耦合度低 2.吞吐量提升 3.故障 ...

  6. 通过kafka发送和接收消息

    生产者配置类: @Configuration @EnableKafka public class KafkaProducerConfig {@Value("${kafkaConfig.add ...

  7. PC微信逆向:发送与接收消息的分析与代码实现

    文章目录 定位微信的消息接收函数 定位消息接收函数的相关思路 定位消息内容的地址 分析接收消息函数 好友消息 群消息 总结 代码实现 定位微信的消息发送函数 定位消息发送函数的相关思路 过滤当前聊天窗 ...

  8. Netty:实现同步发送并接收消息的一种方式

    Netty创建通信服务时使用Nio异步通信, 配置代码(bootstrap.channel(NioSocketChannel.class);),要怎样实现这样一个同步发送消息并接收消息功能,虽然这样做 ...

  9. 游戏对象之间发送和接收消息

    1.创建一个游戏工程, 命名为SRMessageGo 2.在Project视图中创建3个文件夹, Scene文件夹.Resources文件夹和Script文件夹 3.将当前场景保存为GameScene ...

最新文章

  1. 算法竞赛入门与进阶 (三)贪心
  2. 带你自学Python系列(九):一文读懂Python中字典应用原理!
  3. Python编程一定要注意的那些“坑”(九):0与False
  4. python安装idle_Python从零单排之Python环境及IDLE安装
  5. 扫地机自动回充揭秘之小米/iRobot/云鲸/360
  6. Java逻辑思维训练题
  7. duilib介绍-1
  8. 基于python管理系统论文_基于Python的运动计费管理系统
  9. Android 蓝牙协议栈消息(bta_sys_sendmsg)发送机制 ---- 全网唯一解析
  10. C语言求解一元二次方程组的代码
  11. 支付宝证书模式支付接口
  12. Cruehead-CrackMe-3
  13. oracle20005,oracle 由于impdp 引起的表统计信息被锁 ORA-20005: object statistics are locked...
  14. 四川南溪仙源长江公路大桥通车 结束千年摆渡过江历史
  15. st-link v2怎么连接_深度学习之 YOLO v1,v2,v3详解 - 一杯清酒邀明月
  16. 巅峰战舰服务器维护中,维护公告~
  17. 贝叶斯概率问题(美团笔试题)
  18. nginx的高级配置(4)——防盗链
  19. 京东APP sign、cipher算法分析
  20. 打造地图拼接利器(二)软件框架

热门文章

  1. 图像的连通域检测的堆栈算法
  2. linux安装源码包出现apr错误,Linux源码安装Apchae时报错:error: APR not found.Please read the documentation....
  3. 一文读懂RPA与BPM的区别和联系
  4. mysql deadlock found when trying to get lock 问题排查
  5. 使用简介EntityFramework6.0
  6. .NET多线程总结和实例介绍
  7. ASP网站精品源码集合(免积分下载)
  8. Exception在语义上的处理。在系统中的意义。
  9. Scanner类的一个小例子
  10. YFIOServer 后台IO接口使用说明