为什么80%的码农都做不了架构师?>>>   

整体流程:

消息变化-->zookeeper监听-->metaq集群服务器--->通过filter过滤--->消息接收者-->执行操作.

一些配置:

发送者:

引入datachange-client

<dependency><groupId>com.pan.datachange</groupId><artifactId>pan-datachange-client</artifactId><version>1.0.0-SNAPSHOT</version>
</dependency>

环境配置:

  <!-- 加载DataChange的环境变量 --><beanclass="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><!-- webserver环境 --><property name="location"value="file:c:/dataChange-receiver1.properties" /><property name="ignoreResourceNotFound" value="true" /><property name="ignoreUnresolvablePlaceholders" value="true" /><!-- dataChange配置 --><property name="properties" ref="localDataChangeProperties" /></bean><bean name="localDataChangeProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"><property name="location" value="classpath:exampleConfigDefault/dataChange-receiver.properties" /></bean>

dataChange-receiver.properties

dataChange.zkConnect=127.0.0.1:2181
dataChange.zkSessionTimeoutMs=3000
dataChange.zkConnectionTimeoutMs=10000
dataChange.zkSyncTimeMs=3000
dataChange.sender=testSenderdataChangeThreadExecutor.poolSize=10

然后发送方法:


@Resource
@Qualifier("defaultDataChangeManager")
private DataChangeManager dataChangeManager ;public void send(){DataChangeDO dataChangeDO = new DataChangeDO();dataChangeDO.setDataId(200);//过滤规则dataChangeDO.setDataType(DataType.member);dataChangeDO.setChangeType(ChangeTypeConstant.DATA_INSERT);dataChangeManager.sendMessage(dataChangeDO);}

接收者配置:

一样的需要引入dataChange-client

<dependency><groupId>com.pan.datachange</groupId><artifactId>pan-datachange-client</artifactId><version>1.0.0-SNAPSHOT</version>
</dependency>

实现dataChange的DataChangeListener接口

public class MessageConsumer implements DataChangeListener {public void receiveMessage(DataChangeDO dataChangeDO) {System.out.println("consumerMessage:" + dataChangeDO);}
}

过滤配置:

1.dataChangeHandleMapping配置:

首先实现dataChangeListener接口

public class ReceivedDataChangeListener implements DataChangeListener {protected Logger logger = LoggerFactory.getLogger(getClass());protected ExecutorService dataChangePool = Executors.newFixedThreadPool(100);public void receiveMessage(DataChangeDO dataChangeDO) {//接收消息进行的一些操作}@PreDestroypublic void destroy() {dataChangePool.shutdown();}
}

然后在配置bean:

<bean name="receivedDataChangeHandleMapping" class="com.pan.datachange.client.domain.ChangeHandleMapping"><property name="sender" value="发送者"></property><property name="receiver" value="接收者"></property><property name="dataChangeListener"><!--这里加载的是自己实现DataChangeListener接口的类><bean class="com.pan.jcob.msgcenterjob.datachange.ReceivedDataChangeListener" /></property><!--过滤规则,只有符合这种规则的数据才接收--><property name="filterConfigList"><list><bean class="com.pan.datachange.common.domain.FilterConfig"><property name="dataType" value="过滤格式"></property><property name="changeType" value="变化类型"></property></bean></list></property></bean>

2.ReceivedManager配置:

 <bean name="receiveManager" class="com.pan.datachange.client.receive.ReceiveManager" destroy-method="destroy"><property name="zkDataChangeRoot" value="/datachange/metaq" /><property name="zkConnect" value="zkAddr" /><property name="changeHandleMappingList"><list><!-- 一个topic一个mapping,这里就是填写之前的bean --><ref bean="receivedDataChangeHandleMapping"></ref> </list></property><!-- 只处理发送时间在这个日期之后的消息,这样可以防止新应用接收到很久以前的消息 --><property name="acceptMsgUpdateTimeAfterThis"><bean factory-bean="dataChangeDateFormat" factory-method="parse"><constructor-arg value="2015-12-16" /></bean></property><!--配置不接收消息的bean--><property name="bConsumeFromMaxOffset" value="true" /></bean>

转载于:https://my.oschina.net/lmxy1990/blog/779980

metaq的简单封装dataChange解读相关推荐

  1. Android APP更新下载工具类——简单封装DownloadManager

    几乎所有APP都包含了检查更新功能,更新下载功能的实现方式常用的有两种:1.使用App网络框架的文件下载请求:2.使用自带的DownloadManager类:本文介绍第二种,简单封装一下Downloa ...

  2. 简单封装浏览器 cookie 工具类

    版权声明:本文首发 http://asing1elife.com ,转载请注明出处. https://blog.csdn.net/asing1elife/article/details/8265571 ...

  3. 小程序简单封装 request 请求

    我在这里做了详细的介绍 : 小程序简单封装 request 请求 转载于:https://www.cnblogs.com/biangz/p/9984340.html

  4. 简单封装 HTTP 请求

    2017-2-19 更新到第二版: 源码地址:http://git.oschina.net/sp42/ajaxjs/tree/master/ajaxjs-base/src/com/ajaxjs/net ...

  5. 【JDBC】实现对JDBC 连接的简单封装

    package util;import java.sql.Connection; import java.sql.DriverManager;/**** 实现对JDBC 的封装* @author mq ...

  6. Android AsyncTask 深度理解、简单封装、任务队列分析、自定义线程池

    前言:由于最近在做SDK的功能,需要设计线程池.看了很多资料不知道从何开始着手,突然发现了AsyncTask有对线程池的封装,so,就拿它开刀,本文将从AsyncTask的基本用法,到简单的封装,再到 ...

  7. ios开发之使用多文件上传的简单封装最原始的

    ios开发之使用多文件上传的简单封装最原始的 // // ViewController.m // 18-上传多个文件 // // Created by 鲁军 on 2021/2/13. //#impo ...

  8. Spring jdbc 对象Mapper的简单封装

    一般查询实体的时候,都需要这么使用/**      * 根据id查询      *       * @return      */     public Emp queryEmpById(Intege ...

  9. Google图片加载库Glide的简单封装GlideUtils

    Google图片加载库Glide的简单封装GlideUtils  

最新文章

  1. idea远程调试修改代码_使用IDEA远程调试线上代码
  2. 石川es6课程---1-2、ES6简介
  3. rep movsd + rep movsb 内联实现 strcpy
  4. 正则表达式的常用操作符
  5. Hibernate陷阱
  6. 一元线性回归决定系数_回归分析|笔记整理(1)——引入,一元线性回归(上)...
  7. C++中依赖受限名称定义编译无法通过的问题
  8. 腾讯推出基于区块链存证的“点亮莫高窟”活动
  9. Linux---生产者与消费者模型
  10. leetcode 292 NimGame
  11. 肯德尔系数怎么分析_2020LPL春季赛3月15日比赛数据的数据分析(Python)
  12. 我所知道坦克大战(单机版)之 让子弹飞、让坦克控制子弹开炮
  13. 怎么下载高清无水印的视频号作品?视频号下载要用什么工具?1分钟解决微信视频号下载
  14. 物理内存是什么是计算机的显卡内存吗,物理内存可用数_可用内存和物理内存是什么意思?_可用物理内存...
  15. Ogre Giles
  16. flutter 登录和退出登录_Flutter从入门到奔溃(一):撸一个登录界面
  17. 2020最全微服务:SpringBoot+Cloud+Docker
  18. php易信短信接口,易信公众平台demo代码php(含验证接口)
  19. 毕业设计—心灵小屋健康管理系统(Vue+FastAPI+MySQL)一
  20. postgresql 数据库 报错 FATAL: the database system is shutting down 解决方法

热门文章

  1. RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)
  2. 部门内 IDEA 分享,超实用技巧!
  3. 使用Netty如何做到单机秒级接收35万个对象
  4. 浅谈树形结构的特性和应用(上):多叉树,红黑树,堆,Trie树,B树,B+树......
  5. Java高并发之设计模式,设计思想
  6. 这样规范写代码,同事直呼“666”
  7. SpringBoot第十五篇:Springboot整合RabbitMQ
  8. 过来人的肺腑之言,攻读ML和CV硕士给我带来了什么?
  9. 半年 5 战 5 金:Kaggle 史上最快 GrandMaster 是如何炼成的?
  10. 悉尼科技大学入选 CVPR 2021 的 9 篇论文,都研究什么?