metaq的简单封装dataChange解读
为什么80%的码农都做不了架构师?>>>
整体流程:
消息变化-->zookeeper监听-->metaq集群服务器--->通过filter过滤--->消息接收者-->执行操作.
一些配置:
发送者:
引入datachange-client
<dependency><groupId>com.pan.datachange</groupId><artifactId>pan-datachange-client</artifactId><version>1.0.0-SNAPSHOT</version> </dependency>
环境配置:
<!-- 加载DataChange的环境变量 --><beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><!-- webserver环境 --><property name="location"value="file:c:/dataChange-receiver1.properties" /><property name="ignoreResourceNotFound" value="true" /><property name="ignoreUnresolvablePlaceholders" value="true" /><!-- dataChange配置 --><property name="properties" ref="localDataChangeProperties" /></bean><bean name="localDataChangeProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"><property name="location" value="classpath:exampleConfigDefault/dataChange-receiver.properties" /></bean>
dataChange-receiver.properties
dataChange.zkConnect=127.0.0.1:2181
dataChange.zkSessionTimeoutMs=3000
dataChange.zkConnectionTimeoutMs=10000
dataChange.zkSyncTimeMs=3000
dataChange.sender=testSenderdataChangeThreadExecutor.poolSize=10
然后发送方法:
@Resource
@Qualifier("defaultDataChangeManager")
private DataChangeManager dataChangeManager ;public void send(){DataChangeDO dataChangeDO = new DataChangeDO();dataChangeDO.setDataId(200);//过滤规则dataChangeDO.setDataType(DataType.member);dataChangeDO.setChangeType(ChangeTypeConstant.DATA_INSERT);dataChangeManager.sendMessage(dataChangeDO);}
接收者配置:
一样的需要引入dataChange-client
<dependency><groupId>com.pan.datachange</groupId><artifactId>pan-datachange-client</artifactId><version>1.0.0-SNAPSHOT</version>
</dependency>
实现dataChange的DataChangeListener接口
public class MessageConsumer implements DataChangeListener {public void receiveMessage(DataChangeDO dataChangeDO) {System.out.println("consumerMessage:" + dataChangeDO);}
}
过滤配置:
1.dataChangeHandleMapping配置:
首先实现dataChangeListener接口
public class ReceivedDataChangeListener implements DataChangeListener {protected Logger logger = LoggerFactory.getLogger(getClass());protected ExecutorService dataChangePool = Executors.newFixedThreadPool(100);public void receiveMessage(DataChangeDO dataChangeDO) {//接收消息进行的一些操作}@PreDestroypublic void destroy() {dataChangePool.shutdown();}
}
然后在配置bean:
<bean name="receivedDataChangeHandleMapping" class="com.pan.datachange.client.domain.ChangeHandleMapping"><property name="sender" value="发送者"></property><property name="receiver" value="接收者"></property><property name="dataChangeListener"><!--这里加载的是自己实现DataChangeListener接口的类><bean class="com.pan.jcob.msgcenterjob.datachange.ReceivedDataChangeListener" /></property><!--过滤规则,只有符合这种规则的数据才接收--><property name="filterConfigList"><list><bean class="com.pan.datachange.common.domain.FilterConfig"><property name="dataType" value="过滤格式"></property><property name="changeType" value="变化类型"></property></bean></list></property></bean>
2.ReceivedManager配置:
<bean name="receiveManager" class="com.pan.datachange.client.receive.ReceiveManager" destroy-method="destroy"><property name="zkDataChangeRoot" value="/datachange/metaq" /><property name="zkConnect" value="zkAddr" /><property name="changeHandleMappingList"><list><!-- 一个topic一个mapping,这里就是填写之前的bean --><ref bean="receivedDataChangeHandleMapping"></ref> </list></property><!-- 只处理发送时间在这个日期之后的消息,这样可以防止新应用接收到很久以前的消息 --><property name="acceptMsgUpdateTimeAfterThis"><bean factory-bean="dataChangeDateFormat" factory-method="parse"><constructor-arg value="2015-12-16" /></bean></property><!--配置不接收消息的bean--><property name="bConsumeFromMaxOffset" value="true" /></bean>
转载于:https://my.oschina.net/lmxy1990/blog/779980
metaq的简单封装dataChange解读相关推荐
- Android APP更新下载工具类——简单封装DownloadManager
几乎所有APP都包含了检查更新功能,更新下载功能的实现方式常用的有两种:1.使用App网络框架的文件下载请求:2.使用自带的DownloadManager类:本文介绍第二种,简单封装一下Downloa ...
- 简单封装浏览器 cookie 工具类
版权声明:本文首发 http://asing1elife.com ,转载请注明出处. https://blog.csdn.net/asing1elife/article/details/8265571 ...
- 小程序简单封装 request 请求
我在这里做了详细的介绍 : 小程序简单封装 request 请求 转载于:https://www.cnblogs.com/biangz/p/9984340.html
- 简单封装 HTTP 请求
2017-2-19 更新到第二版: 源码地址:http://git.oschina.net/sp42/ajaxjs/tree/master/ajaxjs-base/src/com/ajaxjs/net ...
- 【JDBC】实现对JDBC 连接的简单封装
package util;import java.sql.Connection; import java.sql.DriverManager;/**** 实现对JDBC 的封装* @author mq ...
- Android AsyncTask 深度理解、简单封装、任务队列分析、自定义线程池
前言:由于最近在做SDK的功能,需要设计线程池.看了很多资料不知道从何开始着手,突然发现了AsyncTask有对线程池的封装,so,就拿它开刀,本文将从AsyncTask的基本用法,到简单的封装,再到 ...
- ios开发之使用多文件上传的简单封装最原始的
ios开发之使用多文件上传的简单封装最原始的 // // ViewController.m // 18-上传多个文件 // // Created by 鲁军 on 2021/2/13. //#impo ...
- Spring jdbc 对象Mapper的简单封装
一般查询实体的时候,都需要这么使用/** * 根据id查询 * * @return */ public Emp queryEmpById(Intege ...
- Google图片加载库Glide的简单封装GlideUtils
Google图片加载库Glide的简单封装GlideUtils
最新文章
- idea远程调试修改代码_使用IDEA远程调试线上代码
- 石川es6课程---1-2、ES6简介
- rep movsd + rep movsb 内联实现 strcpy
- 正则表达式的常用操作符
- Hibernate陷阱
- 一元线性回归决定系数_回归分析|笔记整理(1)——引入,一元线性回归(上)...
- C++中依赖受限名称定义编译无法通过的问题
- 腾讯推出基于区块链存证的“点亮莫高窟”活动
- Linux---生产者与消费者模型
- leetcode 292 NimGame
- 肯德尔系数怎么分析_2020LPL春季赛3月15日比赛数据的数据分析(Python)
- 我所知道坦克大战(单机版)之 让子弹飞、让坦克控制子弹开炮
- 怎么下载高清无水印的视频号作品?视频号下载要用什么工具?1分钟解决微信视频号下载
- 物理内存是什么是计算机的显卡内存吗,物理内存可用数_可用内存和物理内存是什么意思?_可用物理内存...
- Ogre Giles
- flutter 登录和退出登录_Flutter从入门到奔溃(一):撸一个登录界面
- 2020最全微服务:SpringBoot+Cloud+Docker
- php易信短信接口,易信公众平台demo代码php(含验证接口)
- 毕业设计—心灵小屋健康管理系统(Vue+FastAPI+MySQL)一
- postgresql 数据库 报错 FATAL: the database system is shutting down 解决方法
热门文章
- RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)
- 部门内 IDEA 分享,超实用技巧!
- 使用Netty如何做到单机秒级接收35万个对象
- 浅谈树形结构的特性和应用(上):多叉树,红黑树,堆,Trie树,B树,B+树......
- Java高并发之设计模式,设计思想
- 这样规范写代码,同事直呼“666”
- SpringBoot第十五篇:Springboot整合RabbitMQ
- 过来人的肺腑之言,攻读ML和CV硕士给我带来了什么?
- 半年 5 战 5 金:Kaggle 史上最快 GrandMaster 是如何炼成的?
- 悉尼科技大学入选 CVPR 2021 的 9 篇论文,都研究什么?