项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送消息,让B系统后台进行自动数据信息同步。

这个可以根据各企业各自的系统间通讯方式来灵活处理。这里我介绍我运用的处理方式,作为经验总结记录和分享。

深谙spring的实现原理:使用dom4j技术对xml文件进行解析读取扫描注解通过反射机制来进行对象的创建,于是解决上述需求的方案由此得到启发。对于我们实际系统来说,这就是一个小框架,扩展性非常好,后来者只需要专注业务逻辑的实现即可完成数据同步的需求。

下面先贴目录结构

这里运用先缓存业务逻辑处理方法的策略,即在服务器启动的时候就将写好的业务逻辑处理方法缓存到内存中,通过监听器监听到其他系统有发送同步消息时自动调用相应的处理方法来进行数据同步。

要缓存服务,需要用到注解和反射

下面贴上两个自定义注解:分别是类注解和方法注解

package com.zy.isc.common;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** <p>Title: IscClassMapping</p>* <p>Description: 消息业务处理类注解* 用于标识类为消息处理类,和IscMethodMapping方法注解配合使用* spring容器加载完成后会将具休的业务方法缓存起来,用于处理消息。* </p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface IscClassMapping {}
package com.zy.isc.common;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** <p>Title: IscMethodMapping</p>* <p>Description: 消息业务处理方法注解</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface IscMethodMapping {public String name();public String desc() default "";
}

然后定义一个通用的业务处理类,通过这个类来保存注解类对象,然后运用反射机制来调用具体的业务逻辑处理方法

package com.zy.isc.common;import java.io.Serializable;
import java.lang.reflect.Method;/*** <p>Title: ServiceBean</p>* <p>Description: 保存到map中的业务bean</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class ServiceBean implements Serializable{private static final long serialVersionUID = 7453372917648514518L;private Method method;private Object object;private String desc;public Method getMethod() {return method;}public void setMethod(Method method) {this.method = method;}public Object getObject() {return object;}public void setObject(Object object) {this.object = object;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}}

spring容器初始化时还需要做的另一件事——将带有注解的类和方法缓存在map中,key值就是方法上面的注解value值,key对应的value就是带注解的对应的业务处理类对象实例

package com.zy.isc.core_receive;import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;import com.zy.isc.common.IscClassMapping;
import com.zy.isc.common.IscMethodMapping;
import com.zy.isc.common.ServiceBean;/*** <p>Title: InitServiceMethodMapping</p>* <p>Description: spring容器启动时调用这里的初始化方法,将带有自定义注解的类和方法缓存在map中</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class InitServiceMethodMapping {private static Logger logger = LoggerFactory.getLogger(InitServiceMethodMapping.class);private static Map<String,ServiceBean> map = null;private InitServiceMethodMapping(){}public static Map<String,ServiceBean> getMethodMap(){return map;}public static synchronized void init() throws Exception{if(map == null){logger.info("initialize biz interface object and save into map start");map = new HashMap<String, ServiceBean>();ApplicationContext context = SpringContextReceiveUtil.getApplicationContext();for(String s : context.getBeanDefinitionNames()){Class<?> c = context.getBean(s).getClass();if(c.getAnnotation(IscClassMapping.class)!=null){Method[]method = c.getDeclaredMethods();ServiceBean serviceBean = null;for(Method m : method){IscMethodMapping mksIscMethodMapping = m.getAnnotation(IscMethodMapping.class);if(mksIscMethodMapping!=null){if(!map.containsKey(mksIscMethodMapping.name())){serviceBean = new ServiceBean();serviceBean.setObject(context.getBean(s));serviceBean.setMethod(m);serviceBean.setDesc(mksIscMethodMapping.desc());map.put(mksIscMethodMapping.name(),serviceBean);logger.info("@biz interface name:["+mksIscMethodMapping.name()+"],already saved in cached map@");}else{throw new Exception("initialize biz interface failed, name:["+mksIscMethodMapping.name()+"]repeated,please modify then try,classpath:"+c.getName());}}}}}logger.info("initialize biz interface object and save into map start,total biz interface count:"+map.size());}}
}

然后在spring容器启动的时候调用上述类中的初始化方法和启动消息监听器

package com.zy.isc.core_receive;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;import com.pingan.isc.ISCMessageBroker;
import com.zy.isc.handler.ServerHandler;public class SpringContextReceiveUtil implements ApplicationContextAware{private Logger logger = LoggerFactory.getLogger(SpringContextReceiveUtil.class);private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {logger.info("initialize spring context start:"+applicationContext);SpringContextReceiveUtil.applicationContext = applicationContext;logger.info("initialize spring context end, bean count:"+applicationContext.getBeanDefinitionCount());try {//这里缓存业务接口时容器还没有完全启动完成,所以使用纯线程来启动消息中心监听程序,以免影响启动超时的情况InitServiceMethodMapping.init();//启动消息监听initMessageHandler();} catch (Exception e) {logger.error("initialize spring context failed",e);}}public static ApplicationContext getApplicationContext(){return applicationContext;}public void initMessageHandler() {try {logger.info("initialize MSG listener start");//启动消息监听int corePoolSize = 10;int maximumPoolSize = 20;int keepAliveTime = 300;int queueSize = 100;ServerHandler handler = new ServerHandler();ISCMessageBroker.MessageExecutor(corePoolSize, maximumPoolSize, keepAliveTime, queueSize, handler);logger.info("initialize MSG listener end");} catch (Exception e) {logger.error("initialize MSG listener exception",e);}}
}

消息监听器启动时需要指定消息处理器,这个处理器实现了MessageHandler接口,一旦有消息从其他系统发过来,监听器监听到消息到来就会调用messageReceived(Object arg0)这个方法,参数即为接收到的消息

package com.zy.isc.handler;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.pingan.isc.core.MessageHandler;
import com.zy.isc.core_receive.ServiceReceiveExecutor;public class ServerHandler implements MessageHandler {private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class);@Overridepublic void messageReceived(Object arg0) throws Exception{try {logger.info("=======invoke biz method start=======");long start = System.currentTimeMillis();ServiceReceiveExecutor.execute(arg0);long end = System.currentTimeMillis();logger.info("=======invoke biz method end=======");logger.info("time cost:"+(end-start)/1000);}catch (Exception e) {logger.error("Message Received Exception"+arg0,e);}}
}

然后在这个类的messageReceived(Object arg0)方法中再调用接收消息执行器将接收到的消息进行处理,解析消息内容得到里面用来标记具体业务逻辑处理方法的值,然后将该值与缓存在map中的key值比对,找到对应的方法用反射来调用。

package com.zy.isc.core_receive;import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.dc.eai.data.CompositeData;
import com.dc.eai.data.Field;
import com.dcfs.esb.client.converter.PackUtil;
import com.zy.isc.common.ServiceBean;/*** <p>Title: ServiceReceiveExcutor</p>* <p>Description:接收ISC消息执行具体的业务方法 </p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class ServiceReceiveExecutor {private static final Logger logger = LoggerFactory.getLogger(ServiceReceiveExecutor.class);//读取配置文件private static ResourceBundle resource;static{resource = ResourceBundle.getBundle("IscConfigPlus");}public static String getValue(String key){return resource.getString(key);}/*** <p>Description:根据消息子主题和业务码执行具体的业务方法 </p>* @param message* @return*/public static Object execute(Object message) throws Exception{logger.info("===== unpack message start =====");Map<String, Object> map = unpackMSG(message);String uniqueId = null;CompositeData compositeData = null;CompositeData body = null;if (map != null && map.size() > 0) {uniqueId = (String) map.get("uniqueId");compositeData = (CompositeData) map.get("compositeData");body = compositeData.getStruct("BODY");}else {logger.info("message is null");return null;}logger.info("===== unpack message end =====");try {if(StringUtils.isBlank(uniqueId)){logger.error("uniqueId is null,no method matches , message infomation:\r\n"+message);throw new Exception("uniqueId is null,no method matches , message infomation:\r\n"+message);}boolean isContainsKey = InitServiceMethodMapping.getMethodMap().containsKey(uniqueId);if (isContainsKey) {ServiceBean serviceBean = InitServiceMethodMapping.getMethodMap().get(uniqueId);logger.info("request biz interface's ID:["+uniqueId+"],biz interface description["+serviceBean.getDesc()+"]");return serviceBean.getMethod().invoke(serviceBean.getObject(),body);}else {logger.info("no method maches the request,message information\r\n" + compositeData );}} catch (Exception e) {logger.error("biz method exception,args:\r\n",e);throw e;}return null;}/*** <p>Description: 标准报文体解包,将报文中的消息子主题和交易码拼接后作为业务逻辑方法的唯一标识</p>* @param message* @return*/public static Map<String,Object> unpackMSG(Object message){Map<String, Object> retMap = new HashMap<String, Object>();if (message != null) {//解析出报文体,存到map中CompositeData compositeData  = PackUtil.unpackXmlStr((String)message);//打印此日志方便查看报文,生产环境去掉logger.debug("message content:\r\n" + compositeData);retMap.put("compositeData", compositeData);CompositeData body = compositeData.getStruct("BODY");retMap.put("body", body);Map<String, Object>dataMap = new HashMap<String, Object>();Enumeration<String> keys = resource.getKeys();while (keys.hasMoreElements()) {String key = (String) keys.nextElement();String value = getValue(key);CompositeData struct = compositeData.getStruct(value);if (struct != null && struct.size()>0) {logger.debug("key:value ——> " + key+":"+value);dataMap.put(key, struct);}}logger.debug("dataMap:\r\n"+dataMap);if (dataMap != null && dataMap.size()>0) {CompositeData iscSysHeadCompositeData = (CompositeData) dataMap.get("iscSysHead");CompositeData iscPubHeadCompositeData = (CompositeData) dataMap.get("iscPubHead");Field subTopicField = iscSysHeadCompositeData.getField("SUB_TOPIC");Field serviceCodeField = iscPubHeadCompositeData.getField("SERVICE_CODE");if (subTopicField != null) {String subTopic = subTopicField.getValue().toString();logger.debug("message subtopic: " + subTopic);retMap.put("subTopic", subTopic);}else {retMap.put("subTopic", "");}if (serviceCodeField != null) {String serviceCode = serviceCodeField.getValue().toString();logger.debug("message serviceCode: " + serviceCode);retMap.put("serviceCode", serviceCode);}else {retMap.put("serviceCode", "");}String subTopic = retMap.get("subTopic").toString();String serviceCode = retMap.get("serviceCode").toString();String uniqueId = subTopic + serviceCode;retMap.put("uniqueId", uniqueId);}else {logger.info("dataMap is null,uniqueId is null");}}return retMap;}
}

这个ServiceReceiveExecutor类会将消息中解析出来的报文body通过反射参数传到具体的业务逻辑处理类中,最后就是具体的业务逻辑处理类了,这个类或者方法可以按相同的方式进行任意扩展

package com.zy.isc.service;import java.util.HashMap;
import java.util.Map;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import com.dc.eai.data.CompositeData;
import com.dc.eai.data.Field;
import com.zy.isc.common.IscClassMapping;
import com.zy.isc.common.IscMethodMapping;/*** <p>Title: UserInfoService</p>* <p>Description: 接收BECIF广播用户信息Service</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
@Service
@IscClassMapping
public class UserInfoService {private static final Logger logger = LoggerFactory.getLogger(UserInfoService.class);//服务类//@Resource(name = "userService")//private UserService userService;//name 唯一标识 = 子主题(20005)+ 交易码(000012)@IscMethodMapping(name="20005000012",desc="xxx业务需求描述")public void userInfoCombine(CompositeData compositeData) throws Exception{Field clientNoField = compositeData.getField("CLIENT_NO");Field clientNo1Field = compositeData.getField("CLIENT_NO1");String clientNo = null;String clientNo1 = null;if (clientNoField != null) {clientNo = clientNoField.getValue().toString();logger.info("combine becif:" + clientNo);}if (clientNo1Field != null) {clientNo1 = clientNo1Field.getValue().toString();logger.info("combined becif: " + clientNo1);}  if (StringUtils.isNotBlank(clientNo1) && StringUtils.isNotBlank(clientNo)) {Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("clientNo", clientNo);paramMap.put("aClentNo", clientNo1);//boolean flag = scfpUserService.checkBecifExist(clientNo1);boolean flag = false;logger.info("becif exist:" + flag);if (flag) {try {//scfpUserService.combineUserBecif(paramMap);} catch (Exception e) {logger.error("userInfoCombine() method exception:" + e.getMessage());}}}}
}

最后还有一个容器销毁时释放缓存的监听器

package com.zy.isc.listener;import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.pingan.isc.ISCMessageBroker;/*** <p>Title: StartupListener</p>* <p>Description:</p>* <p>Company: * @author kjkfb_zy  2017-7-31 * <p>Just do it!!!</p>* @version v1.0*/
public class StartupListener implements ServletContextListener {private static final Logger LOGGER = LoggerFactory.getLogger(StartupListener.class);@Overridepublic void contextInitialized(ServletContextEvent arg0) {LOGGER.info("===== MSG listener preparation =====");}@Overridepublic void contextDestroyed(ServletContextEvent arg0) {ISCMessageBroker.destroyed();LOGGER.info("===== ISCMessageBroker destroyed,resource release =====");}}

这样整个小框架就完毕了,使用时只需要在spring的配置文件中将SpringContextReceiveUtil这个类的bean配置好,在web.xml中配置StartupListener这个监听器就可以使用了。接收消息按UserInfoService类方法上面注解唯一标识来区分。后续还有其他消息要接收,直接按照这种注解方式在UserInfoService类中扩展或者另外新增类似UserInfoService类都可以。

系统间通讯实现数据信息实时同步解决方案相关推荐

  1. java数据同步处理_[java数据库同步解决方案]系统间通讯实现数据信息实时同步解决方案...

    项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步.当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发 ...

  2. rsync+sersync实现数据文件实时同步

    rsync+sersync实现数据文件实时同步 使用rsync+sersync,实现nfs与backup服务器间实时数据同步 一.数据同步工具介绍 rsync是一款开源的.快速的.多功能的.可实现全量 ...

  3. Oracle 数据怎么实时同步到 MySQL | 亲测干货分享建议收藏

      摘要: 这段时间负责一个老项目开发的数据库管理工作,这个项目中开发库与测试数据库分离,其中有些系统表数据与基础资料数据经常需要进行同步,相信很多 DBA 同学经常会遇到要从一个数据库实时同步到另一 ...

  4. Oracle 数据怎么实时同步到 PgSQL | 亲测干货分享建议收藏

    摘要: 这段时间负责一个老项目开发的数据库管理工作,这个项目中开发库与测试数据库分离,其中有些系统表数据与基础资料数据经常需要进行同步,相信很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数 ...

  5. Oracle 数据怎么实时同步到 Kafka | 亲测干货分享建议收藏

     摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以非常方便地完成 Oracl ...

  6. Oracle 数据怎么实时同步到 SQL Server | 亲测干货分享建议收藏

    摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以非常方便地完成 Oracle ...

  7. Oracle 数据怎么实时同步到 MongoDB | 亲测干货分享建议收藏

    摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以非常方便地完成 Oracle ...

  8. Oracle 数据怎么实时同步到 Elasticsearch | 亲测干货建议收藏

    摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,很快实现了Oracle 数据实时 ...

  9. DataPipeline与TiDB推出异构数据实时同步解决方案,共筑安全可信基础设施

    近日,DataPipeline数见科技与PingCAP正式宣布,经过联合测评双方已完成DataPipeline企业级实时数据融合平台与TiDB分布式数据库企业版联合解决方案的兼容认证,旨在为全球客户在 ...

  10. Oracle 数据怎么实时同步到 DM DB 达梦数据库 | 亲测干货建议收藏

    摘要 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以很方便地完成 Oracle 数 ...

最新文章

  1. redis报错Windows error 0x70--记录一次服务器雪崩异常
  2. 面向对象的 JavaScript 编程及其 Scope 处理
  3. PTA L1_002 打印沙漏(C语言)
  4. [转]Android中dp,px,sp概念梳理以及如何做到屏幕适配
  5. BugkuCTF-PWN题pwn3-read_note超详细讲解
  6. Mac平台上的一个MarkDown编辑器和静态博客生成工具-mweb mac最新版下载
  7. 金币(信息学奥赛一本通-T1100)
  8. python计算今年第几天_Python三种方法计算指定日期是今年的第几天
  9. Spire.Doc for Java的jar包、maven库-全套free资源
  10. WebStrom 使用淘宝镜像
  11. mysqlbinlog抽取某个表的信息
  12. OpenCV---Canny边缘提取
  13. ubuntu 卸载python_如何在各种环境下正确安装python——Linux
  14. php ip纯真数据库Dat,PHP获取IP地址所在地信息的实例(使用纯真IP数据库qqwry.dat)
  15. jQuery写登录弹窗并居中显示
  16. 生产制造行业ERP系统十大品牌
  17. 网络安全应急响应----10、内存分析
  18. 前端AI语音方面的实现
  19. ubuntu-20.04-desktop-amd64.iso下载/rufus-3.10.exe下载
  20. matlab图片不覆盖,放置MATLAB图例,使其在图上不重叠

热门文章

  1. matlab正方形追及问题,行测常考题型讲解:环形相遇与追及问题
  2. 漫话:希尔排序,这篇还看不懂麻烦找我拿红包
  3. 可重入锁 ReentrantLock
  4. leetcode:1000. 合并石头的最低成本(区间dp,困难)
  5. mysql枚举类型enum用法6_mysql中的枚举类型ENUM的用法
  6. 微信小程序快速提升独立访客方法
  7. int函数使用方法c语言,int函数(int函数的使用方法举例)
  8. C语言醉酒,驾驶人在道路上饮酒醉酒驾驶机动车处3年以上有期徒刑
  9. 启动prometheus问题-opening storage failed
  10. github上Android常用第三方库