metaq发送和接收消息demo
为什么80%的码农都做不了架构师?>>>
一、maven依赖
<dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client</artifactId><version>1.4.6.2</version>
</dependency><dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client-extension</artifactId><version>1.4.6.2</version>
</dependency>
二、发送者
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import java.io.BufferedReader;
import java.io.InputStreamReader;/*** Created by lc-t123 on 2016/4/14.*/
public class Producer {public static void main(String[] args) throws Exception {final MetaClientConfig metaClientConfig = new MetaClientConfig();final ZKConfig zkConfig = new ZKConfig();//设置zookeeper地址zkConfig.zkConnect = "192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);// New session factory,强烈建议使用单例MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);/** create producer,强烈建议使用单例* 消息生产者的接口是MessageProducer,你可以通过它来发送消息*/MessageProducer producer = sessionFactory.createProducer();// publish topicfinal String topic = "test";/** 这一步在发送消息前是必须的,你必须发布你将要发送消息的topic* 这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接* 这个步骤针对每个topic只需要做一次,多次调用无影响*/producer.publish(topic);BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));String line = null;while ((line = reader.readLine()) != null){/** send message* 在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:* 1) id: Long型的消息id,消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。* 2) topic: 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,生产者通过指定发布的topic查找到需要连接的服务器地址,必须。* 3) data: 消息的有效载荷,二进制数据,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容通常限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须。数据是否压缩也完全取决于用户。* 4) attribute: 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。*/SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));// check resultif (!sendResult.isSuccess()){System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());}else {System.out.println("Send message successfully,sent to " + sendResult.getPartition());}}}
}
三、接收者
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import java.util.concurrent.Executor;public class AsyncConsumer {public static void main(String[] args) throws Exception {final MetaClientConfig metaClientConfig = new MetaClientConfig();final ZKConfig zkConfig = new ZKConfig();//设置zookeeper地址zkConfig.zkConnect = "192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);// New session factory,强烈建议使用单例MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig);// subscribed topicfinal String topic = "test";// consumer groupfinal String group = "meta-example";/** create consumer,强烈建议使用单例* 通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,* 这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,* 我们这里只设置了group属性,这是消费者的分组名称。* Meta的Producer、Consumer和Broker都可以为集群。* 消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。* 同一个消费者集群必须拥有同一个分组名称,也就是同一个group。我们这里将分组名称设置为meta-example*/MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));/** subscribe topic* 订阅消息通过subscribe方法,这个方法接受三个参数* 1) topic,订阅的主题* 2) maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,默认最大为1M。* 3) MessageListener,消息监听器,负责消息消息。*/consumer.subscribe(topic, 1024 * 1024, new MessageListener() {public void recieveMessages(Message message) {System.out.println("Receive message " + new String(message.getData()));}public Executor getExecutor() {// Thread pool to process messages,maybe null.return null;}});// complete subscribeconsumer.completeSubscribe();}
}
metaq-server安装参考官方文档
可以通过http://192.168.1.70:8120/ 访问web界面
转载于:https://my.oschina.net/chaun/blog/659479
metaq发送和接收消息demo相关推荐
- 【译】 WebSocket 协议第六章——发送与接收消息(Sending and Receiving Data)
概述 本文为 WebSocket 协议的第六章,本文翻译的主要内容为 WebSocket 消息发送与接收相关内容. 发送与接收消息(协议正文) 6.1 发送数据 为了通过 WebSocket 连接发送 ...
- springboot websocket发送和接收消息代码资源下载
客户端发送给服务端的消息,同时接收服务端响应给客户端的消息: 服务端收到客户端发送过来的消息,并响应给客户端消息:
- ActiveMQ 发送和接收消息
一.添加 jar 包 <dependency><groupId>org.apache.activemq</groupId><artifactId>act ...
- java mqtt客户端_java 实现mqtt发送和接收消息客户端具体用法及测试代码
注:客户端代码实现请看我的上一篇 1mqtt发送消息 发送时不用多讲,每次发送肯定需要运行一次发送消息方法 MyMqttClient mqttClient = new MyMqttClient(); ...
- SpringAMQP发送与接收消息
同步调用的优点:时效性较强,可以立即得到结果 同步调用的问题: 1.耦合度高 2.性能和吞吐能力下降 3.有额外的资源消耗 4.有级联失败问题 异步通信的优点: 1.耦合度低 2.吞吐量提升 3.故障 ...
- 通过kafka发送和接收消息
生产者配置类: @Configuration @EnableKafka public class KafkaProducerConfig {@Value("${kafkaConfig.add ...
- PC微信逆向:发送与接收消息的分析与代码实现
文章目录 定位微信的消息接收函数 定位消息接收函数的相关思路 定位消息内容的地址 分析接收消息函数 好友消息 群消息 总结 代码实现 定位微信的消息发送函数 定位消息发送函数的相关思路 过滤当前聊天窗 ...
- Netty:实现同步发送并接收消息的一种方式
Netty创建通信服务时使用Nio异步通信, 配置代码(bootstrap.channel(NioSocketChannel.class);),要怎样实现这样一个同步发送消息并接收消息功能,虽然这样做 ...
- 游戏对象之间发送和接收消息
1.创建一个游戏工程, 命名为SRMessageGo 2.在Project视图中创建3个文件夹, Scene文件夹.Resources文件夹和Script文件夹 3.将当前场景保存为GameScene ...
最新文章
- 算法竞赛入门与进阶 (三)贪心
- 带你自学Python系列(九):一文读懂Python中字典应用原理!
- Python编程一定要注意的那些“坑”(九):0与False
- python安装idle_Python从零单排之Python环境及IDLE安装
- 扫地机自动回充揭秘之小米/iRobot/云鲸/360
- Java逻辑思维训练题
- duilib介绍-1
- 基于python管理系统论文_基于Python的运动计费管理系统
- Android 蓝牙协议栈消息(bta_sys_sendmsg)发送机制 ---- 全网唯一解析
- C语言求解一元二次方程组的代码
- 支付宝证书模式支付接口
- Cruehead-CrackMe-3
- oracle20005,oracle 由于impdp 引起的表统计信息被锁 ORA-20005: object statistics are locked...
- 四川南溪仙源长江公路大桥通车 结束千年摆渡过江历史
- st-link v2怎么连接_深度学习之 YOLO v1,v2,v3详解 - 一杯清酒邀明月
- 巅峰战舰服务器维护中,维护公告~
- 贝叶斯概率问题(美团笔试题)
- nginx的高级配置(4)——防盗链
- 京东APP sign、cipher算法分析
- 打造地图拼接利器(二)软件框架
热门文章
- 图像的连通域检测的堆栈算法
- linux安装源码包出现apr错误,Linux源码安装Apchae时报错:error: APR not found.Please read the documentation....
- 一文读懂RPA与BPM的区别和联系
- mysql deadlock found when trying to get lock 问题排查
- 使用简介EntityFramework6.0
- .NET多线程总结和实例介绍
- ASP网站精品源码集合(免积分下载)
- Exception在语义上的处理。在系统中的意义。
- Scanner类的一个小例子
- YFIOServer 后台IO接口使用说明