参考资料

windows下配置rocketMQ

解压缩

系统环境变量配置

变量名:ROCKETMQ_HOME

变量值:MQ解压路径MQ文件夹名

启动NAMESERVER

Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。

D:

cd D:softwarerocketmq-all-4.2bin

start mqnamesrv.cmd

结果:

暂时忽略警告。

启动BROKER

Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

建立项目测试生产者消费者(测试用代码)

该代码只是单纯用于测试,要用于实际环境还需要有相当的规则和编写规范,下一章将补充这个

项目结构如下:

各个文件内容:

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

net.funfunle

TestMQ

1.0-SNAPSHOT

org.apache.maven.plugins

maven-compiler-plugin

3.3

1.8

1.8

org.apache.rocketmq

rocketmq-common

4.2.0

org.apache.rocketmq

rocketmq-client

4.2.0

mq.properties

# 消费者的组名

apache.rocketmq.consumer.PushConsumer=PushConsumer

# 生产者的组名

apache.rocketmq.producer.producerGroup=Producer

# NameServer地址

apache.rocketmq.namesrvAddr=localhost:9876

MQConfig.java

package net.funfunle.TestMQ.config;

import org.apache.commons.lang3.StringUtils;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.util.Properties;

/**

* Created by DGDL-08 on 2017/3/24.

*/

public class MQConfig {

private static Properties config;

private static final String producerGroup;

private static final String namesrvAddr;

private static final String consumerGroup;

static {

//InputStream inputStream = MyClass.class.getClassLoader().getResourceAsStream("com/john/basis/conf.properties");

config=new Properties();

InputStream inputStream=null;

try{

InputStream in = ClassLoader.getSystemResourceAsStream("mq.properties");

InputStreamReader is=new InputStreamReader(in,"utf-8");

config.load(is);

is.close();

in.close();

// config.load(inputStream);

}

catch (Exception ed){

ed.printStackTrace();

}

consumerGroup=config.getProperty("apache.rocketmq.consumer.PushConsumer");

producerGroup=config.getProperty("apache.rocketmq.producer.producerGroup");

namesrvAddr=config.getProperty("apache.rocketmq.namesrvAddr");

}

public static String getProducerGroup() {

return producerGroup;

}

public static String getNamesrvAddr() {

return namesrvAddr;

}

public static String getConsumerGroup() {

return consumerGroup;

}

}

RocketMQClient.java

package net.funfunle.TestMQ;

import net.funfunle.TestMQ.config.MQConfig;

import org.apache.commons.lang3.time.StopWatch;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQClient {

/**

* 生产者的组名

*/

private String producerGroup= MQConfig.getProducerGroup();

/**

* NameServer 地址

*/

private String namesrvAddr=MQConfig.getNamesrvAddr();

public void defaultMQProducer() {

//生产者的组名

DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

//指定NameServer地址,多个地址以 ; 隔开

producer.setNamesrvAddr(namesrvAddr);

try {

/**

* Producer对象在使用之前必须要调用start初始化,初始化一次即可

* 注意:切记不可以在每次发送消息时,都调用start方法

*/

producer.start();

//创建一个消息实例,包含 topic、tag 和 消息体

//如下:topic 为 "TopicTest",tag 为 "push"

Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

StopWatch stop = new StopWatch();

stop.start();

for (int i = 0; i < 10000; i++) {

SendResult result = producer.send(message);

System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());

}

stop.stop();

System.out.println("----------------发送一万条消息耗时:" + stop.getTime());

} catch (Exception e) {

e.printStackTrace();

} finally {

producer.shutdown();

}

}

public static void main(String[] args){

new RocketMQClient().defaultMQProducer();

}

}

RocketMQServer.java

package net.funfunle.TestMQ;

import net.funfunle.TestMQ.config.MQConfig;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import org.apache.rocketmq.remoting.common.RemotingHelper;

/**

* Created by zhisheng_tian on 2018/2/6

*/

public class RocketMQServer {

/**

* 消费者的组名

*/

private String consumerGroup= MQConfig.getConsumerGroup();

/**

* NameServer 地址

*/

private String namesrvAddr=MQConfig.getNamesrvAddr();

public void defaultMQPushConsumer() {

//消费者的组名

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

//指定NameServer地址,多个地址以 ; 隔开

consumer.setNamesrvAddr(namesrvAddr);

try {

//订阅PushTopic下Tag为push的消息

consumer.subscribe("TopicTest", "push");

//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

//如果非第一次启动,那么按照上次消费的位置继续消费

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {

try {

for (MessageExt messageExt : list) {

System.out.println("messageExt: " + messageExt);//输出消息内容

String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容

}

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功

});

consumer.start();

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args){

new RocketMQServer().defaultMQPushConsumer();

}

}

分别执行生产者和消费者,可以得到结果:

java+创建metaq生产者_微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置-Go语言中文社区...相关推荐

  1. python微服务架构设计模式_微服务架构设计模式 PDF 电子书 百度云 网盘下载

    你还没有注册,无法下载本站所有资源,请立即注册! 您需要 登录 才可以下载或查看,没有帐号?立即注册 x java自学网(http://www.137zw.com)-java论坛,java电子书推荐: ...

  2. Java生鲜电商平台-微服务架构概述

    Java生鲜电商平台-微服务架构概述 单体架构存在的问题 在传统的软件技术架构系统中,基本上将业务功能集中在单一应用内,或者是单一进程中.尽管现代化的软件架构理论以及设计原则已推广多年,但实际技术衍化 ...

  3. python微服务框架_微服务架构框架有哪些?常用微服务架构介绍

    小伙伴们知道常用的微服务架构框架有哪些吗?上回我们介绍了一些常用的微服务架构设计模式,这次我们就来了解一下一些常用的微服务架构框架吧. 一.Dubbo Dubbo框架是由阿里巴巴开发的开源式的分布式服 ...

  4. .Net Core微服务架构技术栈的那些事

    一.前言 大家一直都在谈论微服务架构,园子里面也有很多关于微服务的文章,前几天也有一些园子的朋友问我微服务架构的一些技术,我这里就整理了微服务架构的技术栈路线图,这里就分享出来和大家一起探讨学习,同时 ...

  5. java 类隔离_微服务架构中zuul的两种隔离机制实验

    ZuulException REJECTED_SEMAPHORE_EXECUTION 是一个最近在性能测试中经常遇到的异常.查询资料发现是因为zuul默认每个路由直接用信号量做隔离,并且默认值是100 ...

  6. java微服务是什么_微服务架构:什么是微服务

    博主 本文为微服务连载第一篇,如果有幸看到,还请找个时间仔细阅读,欢迎收藏或转载,如有不足之处烦请留言指正,共同进步,希望对你有帮助,谢谢 引言 和朋友聊天,招聘,看个行业要闻都是微服务... 最近几 ...

  7. go与Java微服务对比_微服务架构对比-Go语言中文社区

    最近使用Docker+SpringCloud来代替Zookper+Dobbo来做微服务,总结如下 现如今微服务架构十分流行,而采用微服务构建系统也会带来更清晰的业务划分和可扩展性.同时,支持微服务的技 ...

  8. java 模块解耦_微服务架构:如何用十步解耦你的系统?

    导言: 耦合性,是对模块间关联程度的度量.耦合的强弱取决于模块间接口的复杂性.调用模块的方式以及通过界面传送数据的多少.模块间的耦合度是指模块之间的依赖关系,包括控制关系.调用关系.数据传递关系.模块 ...

  9. java 熔断器模式_微服务架构熔断器机制的概念以及常用组件类型

    熔断器机制是我们在学习微服务编程开发的时候需要重点掌握的一个编程技术知识点,而今天我们就通过案例分析来了解一下,熔断器机制的概念以及常用组件类型都有哪些. 所谓熔断器机制,即类似电流的保险器,当然电压 ...

最新文章

  1. bd3.2 Python高级
  2. nova 命令汇总三 ——网络相关命令
  3. Android项目开发实战—倒计时[Handler,Timer,TimerTask,Message]
  4. gulp html 压缩,gulp-gzip压缩
  5. python tcp协议_python 网络编程 -- Tcp协议
  6. usb协议规范_USB连接标准接口简述发布
  7. Linux sudoers文件的写法
  8. 【跃迁之路】【593天】程序员高效学习方法论探索系列(实验阶段350-2018.09.21)...
  9. 【SICP练习】123 练习3.54
  10. 谷歌浏览器安装Postman插件 亲测有效!!!
  11. Linux串口驱动(8250)的编写与调试
  12. BScroll 使用(Vue)
  13. Windows Mobile 6 Professional SDK
  14. 后端和前端有什么区别,哪个工资高?
  15. 永磁无刷直流电机的分类与区别
  16. 多元逻辑回归公式推导
  17. Win10 免快捷键进BIOS
  18. 彩色照片转化为黑白照片
  19. 随笔——物质与思想(或称意识)
  20. golang panic和recover

热门文章

  1. Nginx学习笔记:基础
  2. Tplink客户端设置
  3. php小算法总结一(数组重排,进制转换,二分查找)
  4. Access数据库列名的命名规则
  5. (推荐)jQuery性能优化指南
  6. 无法装载这个对象_面试官:别的我不管,这个JVM虚拟机内存模型你必须知道
  7. windows8怎么关机_按下电源键后发生了什么?电脑是如何关机的?
  8. php 怎么查看文件类型信息,php获取文件类型和文件信息的方法
  9. 会走索引吗 oracle_茅台酒会走兰花的老路吗?
  10. MyBatis-Plus速览【学习笔记】