读取XML


import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.dom4j.Element;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;public class AgentConfig1 {private final static Logger logger = Logger.getLogger(AgentConfig.class);private final static AgentConfig _instance = new AgentConfig();private Element root;private AgentConfig1() {InputStream inputStream = null;try {inputStream = loadConfigAsStream("agent_conf.xml");String xmlString = IOUtils.toString(inputStream, "UTF-8");root = Dom4JUtils.getRootElement(xmlString);} catch (Exception e) {logger.error("load config error", e);} finally {IOUtils.closeQuietly(inputStream);}}public static Element getRoot() {return _instance.root;}private static InputStream loadConfigAsStream(String resource) {InputStream in = null;if (!resource.startsWith("/")) {resource = "/" + resource;}try {//首先从conf目录读取File file = new File(System.getProperty("user.dir") + "/conf" + resource);if (!file.exists()) {file = new File(System.getProperty("user.dir") + "/omp-agent-main/src/main/conf" + resource);}if (file.exists()) {in = new FileInputStream(file);System.err.println("load [" + resource + "] from file [" + file.getAbsolutePath() + "]");}//conf目录找不到,从classpath读取if (in == null) {in = ClasspathResourceUtils.readResourceFromClasspath(resource);if (in != null) {System.err.println("load [" + resource + "] from classpath");}}} catch (Exception e) {logger.error("load config file[" + resource + "] error", e);}return in;}
}

生产者


import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.proxy.RedisProxy;import java.util.Date;public class TaskProvider1 {private final static Logger logger = LoggerFactory.getLogger(TaskProvider.class);private TaskProvider1() {}private static TaskProvider1 single = new TaskProvider1();public static TaskProvider1 getInstance() {return single;}private DefaultMQProducer producer;private String mqAddress;private String tag;private String topic;public static final String SEND_STATUS = "SEND_OK";public void start() throws InterruptedException, MQClientException {//获取MQ配置mqAddress = AgentConfig1.getRoot().element("agent").element("rocketmq").element("address").getTextTrim();topic = AgentConfig1.getRoot().element("agent").element("rocketmq").element("topic").getTextTrim();tag = AgentConfig1.getRoot().element("agent").element("rocketmq").element("tag").getTextTrim();if (StringUtils.isBlank(mqAddress) || StringUtils.isBlank(topic) || StringUtils.isBlank(tag)) {throw new IllegalArgumentException("config agent/rocketmq must be set");}//声明并初始化一个producer//需要一个producer group名字作为构造方法的参数,这里为producer1producer = new DefaultMQProducer("middleware_monitor_producer");//设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔//NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里producer.setNamesrvAddr(mqAddress);//消息发送失败重试次数producer.setRetryTimesWhenSendFailed(3);//调用start()方法启动一个producer实例producer.start();}public boolean send(String msg) {logger.info("  send task msg :" + msg);boolean flag = true;try {Message message = new Message(topic, tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));//调用producer的send()方法发送消息//这里调用的是同步的方式,所以会有返回结果SendResult sendResult = producer.send(message);SendStatus status = sendResult.getSendStatus();if (!SEND_STATUS.equals(status.toString())) {//打印返回结果,可以看到消息发送的状态以及一些相关信息logger.info(" error send task msg result:" + sendResult + " , msg:" + msg);flag = false;}} catch (Exception e) {logger.error("taskPrivoder send msg error :{}", e);}return flag;}public void stop() {if (producer != null) {producer.shutdown();}}}

消费者

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.proxy.RedisProxy;
import java.util.List;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;public class TaskConsumer1 {/*** 监控任务工作线程池*/private ThreadPoolExecutor workThreadPool = null;private final static ObjectMapper objectMapper = new ObjectMapper();private final static Logger logger = LoggerFactory.getLogger(TaskConsumer.class);private TaskConsumer1() {}private static TaskConsumer1 single = new TaskConsumer1();public static TaskConsumer1 getInstance() {return single;}private DefaultMQPushConsumer consumer;private String mqAddress;private String tag;private String topic;public void start() throws InterruptedException, MQClientException {//获取MQ配置mqAddress = AgentConfig1.getRoot().element("agent").element("rocketmq").element("address").getTextTrim();topic = AgentConfig1.getRoot().element("agent").element("rocketmq").element("topic").getTextTrim();tag = AgentConfig1.getRoot().element("agent").element("rocketmq").element("tag").getTextTrim();//工作线程数量Element workThreadCountEle = AgentConfig.getRoot().element("agent").element("workThreadCount");int workThreadCount = 2;if (workThreadCountEle != null) {workThreadCount = Integer.parseInt(workThreadCountEle.getTextTrim());}//最多并行6000个监控任务workThreadPool = ThreadPoolFactory.newFixedThreadPool("MonTaskWorker", workThreadCount, 6000);workThreadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {logger.warn("======fixed1 rate job rejected=========" + r.toString() + "=======thread pool=========" + executor.toString());}});//声明并初始化一个consumer//需要一个consumer group名字作为构造方法的参数,这里为consumer1consumer = new DefaultMQPushConsumer("middleware_monitor_consumer");//同样也要设置NameServer地址consumer.setNamesrvAddr(mqAddress);//CONSUME_FROM_LAST_OFFSET 默认消费策略,从该队列最尾开始消费,即跳过历史消息consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//设置consumer所订阅的Topic和Tag,*代表全部的Tagconsumer.subscribe(topic, "*");//设置一个Listener,主要进行消息的逻辑处理consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);try {String json = new String(messageExt.getBody());MessageBO message = JacksonUtil.json2pojo(json, MessageBO.class);if (message == null || StringUtils.isBlank(message.getMsg())) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}List<MonTarget> monTargetList = JacksonUtil.json2list(message.getMsg(), MonTarget.class);RedisProxy jedisCluster = RedisClientUtil.getInstance().getJedisCluster();for (MonTarget monTarget : monTargetList) {logger.info("receiver task msg monTarget:" + monTarget.getTarget() + ",param" + monTarget.getParam());//添加链路2--消费消息RedisService.setLink(jedisCluster, monTarget.getTarget(), SortEnum.TWO.getIndex(), JacksonUtil.obj2json(monTarget));MonTask monTask1 = buildMonTask(monTarget, message.getCreateTime());                     workThreadPool.execute(monTask1);}} catch (Exception e) {logger.error("mon task1 timer error", e);//如果消费重试3次if (messageExt.getReconsumeTimes() == 3) {//该消息存储在DB或者LOG中,采取其他方式处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.err.println("Consumer Started.");}private MonTask buildMonTask(MonTarget monTarget, long createTime) {MonTask monTask = PluginManager.getInstance().newMonTaskInstance(monTarget.getType());     monTask.setParams(monTarget.getParam());return monTask;}public void stop() {if (consumer != null) {consumer.shutdown();}}}

rocket使用实例相关推荐

  1. WordPress加速插件秘笈:五款加速插件让您的网站飞起来

    WordPress加速重要吗? Wordpress加速:重要.重要.很重要. 世界著名的电商平台亚马逊,曾经做过测试:Amazon.com 网站速度每慢一秒钟,就会损失超过16亿美金的销售额. 在这篇 ...

  2. 前端开发基础知识汇总

    一.HTML 1.前言与常用标签 浏览器 内核 备注 IE Trident IE.猎豹安全.360极速浏览器.百度浏览器 firefox Gecko 可惜这几年已经没落了,打开速度慢.升级频繁.猪一样 ...

  3. Spring Cloud微服务系统架构的一些简单介绍和使用

    Spring Cloud 目录 特征 云原生应用程序 Spring Cloud上下文:应用程序上下文服务 引导应用程序上下文 应用程序上下文层次结构 改变Bootstrap的位置Properties ...

  4. 64位开源处理器Rocket该人士介绍

    最近大概读一点UCB发布时间Rocket处理器的源代码,的每个文件的源代码的功能有一定的一般理解,Mark一点点. Rocket是一家64bit标量处理器,5第一阶段管道,用途risc-v指令集.综合 ...

  5. Rocket之消息发送

    涉及角色 生产者组:一个逻辑概念,在使用生产者实例的时候需要指定一个组名.一个生产者组可以生产多个Topic的消息. 生产者实例:一个生产者组部署了多个进程,每个进程都可以称为一个生产者实例. Top ...

  6. Rocket.chat 安装

    Rocket.chat 群组聊天 直接通信 私聊群 桌面通知 媒体嵌入 链接预览 文件上传 语音/视频聊天 截图 多平台支持:Android IOS Windows桌面 网页 在Ubuntu20.04 ...

  7. Rocket mq的一些介绍

    概念 NameServer 相当于服务的注册中心,为整个MQ集群提供服务协调和治理; 可以部署多个,但是NameServer节点之间不会有通信,依靠Broker同时向所有的NameServer注册,上 ...

  8. Rocket MQ 详解

    开这个专栏主要之前实习的时候经常用到Rocket MQ,大厂使用的也比较多,主要讲解一些关键原理和概念. 介绍 定义:分布式消息中间件.MQ是消息队列的意思. 特点:低延迟.高并发.高可用.高可靠. ...

  9. RocketMq最强总结 带你rocket从入门到入土为安

    docker下安装单例RocketMq和集群 暂时之后再进行更新 rocket 各个角色分工 Producer:消息的发送者:举例:发信者 Consumer:消息接收者:举例:收信者 Broker:暂 ...

最新文章

  1. 使用Python,OpenCV线程化方式提高视频FPS(每秒帧数)
  2. 北大校友马里千:计算机视觉商用的下一个十年,AI 生成应占有一席之地
  3. ipcs ipcrm命令
  4. 学会这些你就是Android 开发高手了!
  5. C++标准转换运算符static_cast
  6. boost::lexical_cast
  7. CSS 自定义属性 -- 使用 JS 和不使用 JS
  8. 重装系统(U盘篇+U盘复原)——保姆级教学
  9. python实现C4.5
  10. 服务器安装系统路径,裸金属服务器安装多路径软件
  11. 打补丁是什么意思?如何快速对云主机批量打补丁?用什么软件?
  12. BATCH/批处理命令
  13. Windows命令行用法
  14. 2020中兴硬件类笔试
  15. 负债累累怎样白手起家?
  16. Object类型转为Map 强制转换
  17. Win32学习(七) 鼠标消息
  18. 获取excel文件路径的两种方法
  19. 钛备份提示 android id,钛备份(com.keramidas.TitaniumBackup) - 8.3.3 - 应用 - 酷安
  20. IBM flex system P260

热门文章

  1. $.each()的理解
  2. 蓝墨云班课与中职计算机课,蓝墨云环境下中职《计算机应用基础》的对分课堂教学研究...
  3. 使用springboot的banner给小伙伴输出一波月饼
  4. 魔兽世界活跃人数持续下降
  5. 暑假来袭!带孩子配镜前,请先了解“散瞳验光”!
  6. [SDOI2009][BZOJ 1226]学校食堂
  7. Antd pro中ProFormSelect使用initialValues
  8. 关于Docker Toolbox安装的一点经验(算是吧)
  9. 许纪霖《中华儒家文化发展脉络》思维导图
  10. 24339 Problem B 采药