文章目录

  • 概述
  • 集群信息
  • 项目结构
  • 生产者
  • 自定义类
  • 消费者
  • 测试结果


概述

RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息 通过SQL92的方式,消费者可以过滤到自己想要的消息,其实RocketMQ还提供了一个更为抢到的功能,支持自定义Java类…

直接来看下如何使用的吧


集群信息

RocketMQ : V4.3.2

集群模式: 互为主备

节点信息: 192.168.18.130 192.168.18.131 双机互为主备

broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重启。


项目结构

生产者

package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:30* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/
public class FilterProducer {/**** TAG-FILTER-1000 ---> 布隆过滤器* 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。* @param args* @throws Exception*/public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 3; i++) {Message msg = new Message("TopicFilter","TAG-FILTER",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息msg.putUserProperty("a", String.valueOf(i));if (i % 2 == 0) {msg.putUserProperty("b", "artisan");} else {msg.putUserProperty("b", "smart artisan");}producer.send(msg);}producer.shutdown();}}

自定义类

/*** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.artisan.rocketmq.filter;import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext context) {String property = msg.getProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if (((id % 10) == 0) && (id > 100)) {return true;}}return false;}
}

消费者

package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:45* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/
public class FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");/*** 注册中心*/consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 订阅主题* 一种资源去换取另外一种资源*/consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));/*** 注册监听器,监听主题消息*/consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId() + ", content:"+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Filter Consumer Started.%n");}
}

测试结果

目前 没测试成功,先记录下。

RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息相关推荐

  1. 微信公众号怎么推送消息_微信公众号发送消息

    A.模板消息发送 模板消息仅用于公众号向用户发送重要的服务通知,只能用于符合其要求的服务场景中,如信用卡刷卡通知,商品购买成功通知等.不支持广告等营销类消息以及其它所有可能对用户造成骚扰的消息. 备注 ...

  2. activemq后台管理 看topic消息_「Java」 - SpringBoot amp; ActiveMQ

    一.消息队列 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合.异步消息.流量削锋等问题,实现高性能.高可用.可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件. 目前在生产环境中使用较 ...

  3. 小程序统一服务消息_[miniblog]小程序订阅消息踩坑记

    有阵子没有更新我的mini-blog了,这次把推送消息那块做了些改动,小程序的模板消息即将废弃,订阅消息终于来了. 关于订阅消息 订阅消息分为一次性订阅和长期订阅,长期订阅就不说啦,不是个人号可以染指 ...

  4. java消息通信_原生 Java 客户端进行消息通信

    原生 Java 客户端进行消息通信 Direct 交换器 DirectProducer:direct类型交换器的生产者 NormalConsumer:普通的消费者 MulitBindConsumer: ...

  5. java群发图文消息_使用Java语言开发微信公众平台(四)——图文消息的发送与响应...

    在上一篇文章中,我们实现了被关注回复与关键词回复功能.在用户关注的时候自动推送功能菜单,并根据用户输入的关键词,回复特定信息.但是,我们只能回复文本消息给用户,如何才回复一条图文消息呢?本周,我们一起 ...

  6. rabbitmq怎样确认是否已经消费了消息_阿里Java研发二面:了解RabbitMQ?说说RabbitMQ可靠性投递...

    上期写到高并发下RabbitMq消息中间件你应该介么玩今天给小伙伴说说!有自己看法的也可以在评论区留言探讨,也可以转发关注下我以后会长期分享! 目录: 确保消息发送到RabbitMQ服务器 确保消息被 ...

  7. jgroups传输消息_使用JGroups进行ElasticMQ消息复制

    jgroups传输消息 ElasticMQ是一个消息服务器,具有Scala,Java和与Amazon SQS兼容的接口. 它通过跨服务器群集复制消息来支持有保证的消息传递,并通过日志记录实现消息持久性 ...

  8. java修改配置文件参数_在java类中获取在.properties配置文件中设置的参数

    如何获取.properties配置文件中的参数,我在网上查了半天没弄明白,后来在以前的项目中找到了,就写下来,避免遗忘. 1.配置文件:message_product.properties total ...

  9. rocketmq 初体验(二)AsyncProducer No name server address, please set it.

    AsyncProducer No name server address, please set it. 报错log No name server address, please set it. 原因 ...

最新文章

  1. java 项目加载dll文件,在eclipse java项目中加载dll文件
  2. python条形图数据标签_python – Plotly中用于条形图的单独标记条形图
  3. mysql维护索引_高性能的MySQL(5)索引策略-索引和表的维护
  4. 聊聊Batch Normalization在网络结构中的位置
  5. java代码简单操作Redis数据Jedis jar
  6. MAC算法原理与常用实现
  7. MySQLdb._exceptions.ProgrammingError: (1064, <NULL>)
  8. mount挂载时 no such device_mount系统调用(vfs_kern_mount-gt;mount_fs-gt;fill_super)
  9. 两种思想实现基于jquery的延时导航菜单,可做延时触发器!
  10. Launch和Shut Off操作详解 - 每天5分钟玩转 OpenStack(30)
  11. ArcGIS API for Silverlight开发入门(2):一个基础地图实例
  12. win10 1909更新后无法上网三种解决方法
  13. ISO-9001质量管理体系认证经验分享
  14. 数字图像处理基础知-色度空间(RGB\CMY\CMYK\HSI的详细解释和一些关联性描述)
  15. 计算机专业定向选调,兄弟们,关于定向选调和找工作,JR们能不能给小弟一些建议...
  16. 推荐一个博客:香樟小院-大宝系列,博主多年来坚持记录了一只叫大宝的野猫的生活点滴,大宝后来还有了个小宝.......
  17. 台式计算机硬件办公配置清单,颜值满分的办公台式电脑,分享配置清单
  18. 解决xlrd不能打开xlsx表格以及打开失败问题
  19. Unity 3D基础入门编程_艾孜尔江撰稿
  20. 实现GB28181平台级联到海康平台的级联

热门文章

  1. java wav 波形_java读取wav文件(波形文件)并绘制波形图的方法
  2. encoder decoder 模型理解
  3. ubuntu 安装spark
  4. tcp转串口_浅谈串口转以太网技术
  5. 判断一颗二叉树是否为搜索二叉树和完全二叉树
  6. 机器学习笔记 (聚类) 层次聚类 Agglomerative Clutsering(Single-linkage、Complete-linkage,Group average)
  7. 文巾解题 704. 二分查找
  8. tableau必知必会之学做常用的倾斜图(slopegraph)
  9. 【机器学习算法-python实现】决策树-Decision tree(1) 信息熵划分数据集
  10. LeetCode题组:第9题-回文数