RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
文章目录
- 概述
- 集群信息
- 项目结构
- 生产者
- 自定义类
- 消费者
- 测试结果
概述
RocketMQ-初体验RocketMQ(10)-过滤消息_SQL92表达式筛选消息 通过SQL92的方式,消费者可以过滤到自己想要的消息,其实RocketMQ还提供了一个更为抢到的功能,支持自定义Java类…
直接来看下如何使用的吧
集群信息
RocketMQ : V4.3.2
集群模式: 互为主备
节点信息: 192.168.18.130 192.168.18.131 双机互为主备
broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重启。
项目结构
生产者
package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:30* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/
public class FilterProducer {/**** TAG-FILTER-1000 ---> 布隆过滤器* 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。* @param args* @throws Exception*/public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 3; i++) {Message msg = new Message("TopicFilter","TAG-FILTER",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. 生产者设置属性,消费者端通过Tag+该属性定制消息msg.putUserProperty("a", String.valueOf(i));if (i % 2 == 0) {msg.putUserProperty("b", "artisan");} else {msg.putUserProperty("b", "smart artisan");}producer.send(msg);}producer.shutdown();}}
自定义类
/*** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.artisan.rocketmq.filter;import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext context) {String property = msg.getProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if (((id % 10) == 0) && (id > 100)) {return true;}}return false;}
}
消费者
package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:45* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/
public class FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");/*** 注册中心*/consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 订阅主题* 一种资源去换取另外一种资源*/consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));/*** 注册监听器,监听主题消息*/consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId() + ", content:"+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Filter Consumer Started.%n");}
}
测试结果
目前 没测试成功,先记录下。
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息相关推荐
- 微信公众号怎么推送消息_微信公众号发送消息
A.模板消息发送 模板消息仅用于公众号向用户发送重要的服务通知,只能用于符合其要求的服务场景中,如信用卡刷卡通知,商品购买成功通知等.不支持广告等营销类消息以及其它所有可能对用户造成骚扰的消息. 备注 ...
- activemq后台管理 看topic消息_「Java」 - SpringBoot amp; ActiveMQ
一.消息队列 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合.异步消息.流量削锋等问题,实现高性能.高可用.可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件. 目前在生产环境中使用较 ...
- 小程序统一服务消息_[miniblog]小程序订阅消息踩坑记
有阵子没有更新我的mini-blog了,这次把推送消息那块做了些改动,小程序的模板消息即将废弃,订阅消息终于来了. 关于订阅消息 订阅消息分为一次性订阅和长期订阅,长期订阅就不说啦,不是个人号可以染指 ...
- java消息通信_原生 Java 客户端进行消息通信
原生 Java 客户端进行消息通信 Direct 交换器 DirectProducer:direct类型交换器的生产者 NormalConsumer:普通的消费者 MulitBindConsumer: ...
- java群发图文消息_使用Java语言开发微信公众平台(四)——图文消息的发送与响应...
在上一篇文章中,我们实现了被关注回复与关键词回复功能.在用户关注的时候自动推送功能菜单,并根据用户输入的关键词,回复特定信息.但是,我们只能回复文本消息给用户,如何才回复一条图文消息呢?本周,我们一起 ...
- rabbitmq怎样确认是否已经消费了消息_阿里Java研发二面:了解RabbitMQ?说说RabbitMQ可靠性投递...
上期写到高并发下RabbitMq消息中间件你应该介么玩今天给小伙伴说说!有自己看法的也可以在评论区留言探讨,也可以转发关注下我以后会长期分享! 目录: 确保消息发送到RabbitMQ服务器 确保消息被 ...
- jgroups传输消息_使用JGroups进行ElasticMQ消息复制
jgroups传输消息 ElasticMQ是一个消息服务器,具有Scala,Java和与Amazon SQS兼容的接口. 它通过跨服务器群集复制消息来支持有保证的消息传递,并通过日志记录实现消息持久性 ...
- java修改配置文件参数_在java类中获取在.properties配置文件中设置的参数
如何获取.properties配置文件中的参数,我在网上查了半天没弄明白,后来在以前的项目中找到了,就写下来,避免遗忘. 1.配置文件:message_product.properties total ...
- rocketmq 初体验(二)AsyncProducer No name server address, please set it.
AsyncProducer No name server address, please set it. 报错log No name server address, please set it. 原因 ...
最新文章
- java 项目加载dll文件,在eclipse java项目中加载dll文件
- python条形图数据标签_python – Plotly中用于条形图的单独标记条形图
- mysql维护索引_高性能的MySQL(5)索引策略-索引和表的维护
- 聊聊Batch Normalization在网络结构中的位置
- java代码简单操作Redis数据Jedis jar
- MAC算法原理与常用实现
- MySQLdb._exceptions.ProgrammingError: (1064, <NULL>)
- mount挂载时 no such device_mount系统调用(vfs_kern_mount-gt;mount_fs-gt;fill_super)
- 两种思想实现基于jquery的延时导航菜单,可做延时触发器!
- Launch和Shut Off操作详解 - 每天5分钟玩转 OpenStack(30)
- ArcGIS API for Silverlight开发入门(2):一个基础地图实例
- win10 1909更新后无法上网三种解决方法
- ISO-9001质量管理体系认证经验分享
- 数字图像处理基础知-色度空间(RGB\CMY\CMYK\HSI的详细解释和一些关联性描述)
- 计算机专业定向选调,兄弟们,关于定向选调和找工作,JR们能不能给小弟一些建议...
- 推荐一个博客:香樟小院-大宝系列,博主多年来坚持记录了一只叫大宝的野猫的生活点滴,大宝后来还有了个小宝.......
- 台式计算机硬件办公配置清单,颜值满分的办公台式电脑,分享配置清单
- 解决xlrd不能打开xlsx表格以及打开失败问题
- Unity 3D基础入门编程_艾孜尔江撰稿
- 实现GB28181平台级联到海康平台的级联
热门文章
- java wav 波形_java读取wav文件(波形文件)并绘制波形图的方法
- encoder decoder 模型理解
- ubuntu 安装spark
- tcp转串口_浅谈串口转以太网技术
- 判断一颗二叉树是否为搜索二叉树和完全二叉树
- 机器学习笔记 (聚类) 层次聚类 Agglomerative Clutsering(Single-linkage、Complete-linkage,Group average)
- 文巾解题 704. 二分查找
- tableau必知必会之学做常用的倾斜图(slopegraph)
- 【机器学习算法-python实现】决策树-Decision tree(1) 信息熵划分数据集
- LeetCode题组:第9题-回文数