RocketMQ Filtersrv详解

RocketMQ入门手册

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException, IOException {String group_name = "filter_consumer";DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);consumer.setNamesrvAddr("localhost:9876");String filterCode = MixAll.file2String("C:\\JavaEE_Workspace\\RocketMQ\\src\\main\\java\\com\\aztech\\filter\\MessageFilterImpl.java");
//        System.out.println("filterCode: " + filterCode);/*** 使用Java代码,在服务器做消息过滤*/consumer.subscribe("TopicFilter7", MessageFilterImpl.class.getCanonicalName());
//        consumer.subscribe("TopicFilter7", "com.aztech.filter.MessageFilterImpl",filterCode);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);MessageExt me = msgs.get(0);try {System.out.println("收到信息:" + new String(me.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>*/consumer.start();System.out.println("Consumer Started.");}
}
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {String group_name = "filter_producer";DefaultMQProducer producer = new DefaultMQProducer(group_name);producer.setNamesrvAddr("localhost:9876");producer.start();try {for (int i = 0; i < 10; i++) {Message msg = new Message("TopicFilter7",// topic"TagA",// tag"OrderID001",// key("Hello RocketMQ" + i).getBytes());// bodymsg.putUserProperty("SequenceId", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.println(sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}
import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext arg1) {// 尽量遵循规范,使用getUserPropertyString property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);//if ((id % 3) == 0 && (id > 10)) {if ((id % 2) == 0) {return true;}}return false;}}

RocketMQ Filtersrv相关推荐

  1. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  2. 搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务

    搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务 初步认识RocketMQ的核心模块 rocketmq模块 rocketmq-broker:接受生产者发来的消息并存储(通过调用rocke ...

  3. RocketMQ学习笔记(7)----RocketMQ的整体架构

    1. RocketMQ主要的9个模块,如图: 2. 模块介绍 1. rocketmq-common:通用的常量枚举,基类方法或者数据结构,按描述的目标来分包,通俗易懂.报名有admin,consume ...

  4. 《浅入浅出》-RocketMQ

    捞一下 消息队列系列前面两章分别讲了消息队列的基础知识,还有比较常见的问题和常见分布式事务解决方案,那么在实际开发过程中,我们使用频率比较高的消息队列中间件有哪些呢? 帅丙我工作以来接触的消息队列中间 ...

  5. RocketMQ 介绍

    前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难. 作为一个在互联网公司面一次拿一次Offer的面霸,打败了无数竞争对 ...

  6. rocketmq框架详细介绍

    文章目录 消息队列 应用场景 rocketmq 为什么选择RocketMQ消息队列 RocketMQ所拥有的功能 rocketmq应用场景 应用解耦 流量削峰 数据分发 异步处理 日志处理 顺序消息 ...

  7. mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析

    在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...

  8. 看完保送阿里的RocketMQ知识点(超详细)

    你知道的越多,你不知道的越多 点赞再看,养成习惯 本文GitHub https://github.com/JavaFamily 已收录,有一线大厂面试点脑图.个人联系方式,欢迎Star和指教 前言 消 ...

  9. RocketMQ集群部署结构

    RocketMQ四大核心组成部分:NameServer.Broker.Producer以及Consumer四部分: 各组件通讯 Broker与Name Server集群中的所有节点建立长连接: Pro ...

最新文章

  1. sql2008 删除日志
  2. 关于各种回归评价指标MSE、RMSE、MAE、R-Squared、Standard Deviation(标准差)
  3. 037——VUE中表单控件处理之表单修饰符:lazy/number/trim
  4. sparksql优化_Spark SQL amp; Streaming
  5. 做了极度危险的事情各种奔忙
  6. linux系统电脑接硬盘盒,在linux系统下添加新硬盘
  7. 【转】VNode节点
  8. gwas snp 和_Science | 群体研究新思路:De novo + GWAS
  9. 什么是设计模式?为什么要使用设计模式?有什么好处?
  10. 空间几何变换知识点——摘自《机器视觉研究与发展》赵彭
  11. 图像恢复系列之(6)超分(7)反光去除(8)光斑去除 (9)阴影去除(10)水下图像失真去除 | ICCV2021生成对抗GAN...
  12. opencv——批量修改图片像素大小
  13. 利用arcscene将shape文件拉伸后三维展示
  14. 连接远程电脑虚拟机时,怎样重启远程电脑?
  15. 嵌入式系统概念以及嵌入式基础知识
  16. Spring模块简介
  17. 微信开放平台 错误码61007: api is unauthorized to component
  18. python遇到can not import xxx错误
  19. ios APP项目架构心得
  20. 刚刚开始写博客,怎样让自己的博客能够被百度搜索到呢?

热门文章

  1. IE中a标签绝对定位时才生的bug
  2. iOS隐藏键盘的几种方式
  3. NoSQL学习笔记(二)之CAP理论
  4. 基于MaxCompute打造轻盈的人人车移动端数据平台
  5. javaScript变量、作用域链
  6. Windows Server 2003的功能级别
  7. js判断是否是ie浏览器
  8. Hazelcast集群服务(2)——Hazelcast基本配置
  9. 告别2013拥抱2014
  10. 腾讯微博Android客户端开发——OAuth认证介绍