一、背景介绍

公司自成立以来,一直以做项目为主,算是经累经验吧,自去年以来,我们部门准备将以前的项目做成产品,大概细分了几个小的产品,部们下面又分了几个团队,分别负责产品的研发,而我们属于平台团队,负责公用组件、开发平台的研发。

前期各个项目组使用的技术、框架等都不一样,想把技术、框架统一起来比较困难,并且在早期项目研发的时,各自为战,没有形成合力,有些共性的东西,都是各自做自己的,现在转将项目做成产品时,首先就是要将共性的东西,抽取出来,做成组件,通过SOA架构,将组件的服务和能力暴露出来,提高组件的重用性,例如邮件服务,任务一个产品或者系统通过标准的接口,即可发送邮件,不需要重新编写邮件的代码,短信服务、权限服务等

由于几个项目之间有些数据是共有的,例如人员、组织,HR系统已经有人员、组织的功能,在做其它项目时,人员、组织也需要,例如4A平台,这就需要将人员、组织的数据同步,将来的目标,是由ESB同步,由于时间紧,暂时选择了ActiveMQ的方式,HR系统中的人员、组织的数据项很多,而其它系统需要的很少,可能只需要人员和组织的名称及其标识列,并且数据量不大,不会一次性发送上百个人员或者组织的信息,基于这个考虑,通过将人员、组织信息的数据放在消息内放到消息中件上,各个系统通过订阅的方式获取消息中的数据。

二、实现

1、安装ActiveMQ

到ActiveMQ的官方网站下载ActiveMQ,我下载的5.7.0版,解压,例如D盘,打开bin目录,执行acticemq.bat,启动ActiveMQ。

我是基于spring编写的,新建两个Java工程,将Spring和ActiveMQ的包导入工程中,以下是pom文件:

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.0.5.RELEASE</version></dependency><!-- https://mvnrepository.com/artifact/javax.jms/jms --><dependency><groupId>javax.jms</groupId><artifactId>jms</artifactId><version>1.1</version></dependency><!-- https://mvnrepository.com/artifact/log4j/log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.13.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.13.0</version></dependency></dependencies>

2、发送接收的前提

为发送和接收方式,把将要发送的信息封装成对象,分别为用户和组织的对象,包括了用户和组织的信息,我们来看看这两个对象

用户对象,BaseModel是一个基类,封装的用ID,创建人,创建时,最后更新人,最后更新时间,这个对象不再单独列出来

package model;import java.io.Serializable;
import java.util.Date;/*** 用户对象* * * @author Administrator* @version $Id$*/
public class JmsFaUser  extends BaseModel implements Serializable {private static final long serialVersionUID = 1L;private Long id;private String userNo;private String userName;private String userType;private String identity;private String region;private String userStatus;private String officeEmail;private String employeeWorkNo;private Long orgId;private String description;private String attribute1;private String attribute2;private String attribute3;private Long OId;private String userSex;private String mobileTel;private String officeTel;private String selfEmail;private  Date  lastUpdatedDate;// Constructors/** default constructor */public JmsFaUser() {}public String getUserNo() {return this.userNo;}public void setUserNo(String userNo) {this.userNo = userNo;}public String getUserName() {return this.userName;}public void setUserName(String userName) {this.userName = userName;}public String getUserType() {return this.userType;}public void setUserType(String userType) {this.userType = userType;}public String getIdentity() {return this.identity;}public void setIdentity(String identity) {this.identity = identity;}public String getRegion() {return this.region;}public void setRegion(String region) {this.region = region;}public String getUserStatus() {return this.userStatus;}public void setUserStatus(String userStatus) {this.userStatus = userStatus;}public String getOfficeEmail() {return this.officeEmail;}public void setOfficeEmail(String officeEmail) {this.officeEmail = officeEmail;}public String getEmployeeWorkNo() {return this.employeeWorkNo;}public void setEmployeeWorkNo(String employeeWorkNo) {this.employeeWorkNo = employeeWorkNo;}public String getDescription() {return this.description;}public void setDescription(String description) {this.description = description;}public String getAttribute1() {return this.attribute1;}public void setAttribute1(String attribute1) {this.attribute1 = attribute1;}public String getAttribute2() {return this.attribute2;}public void setAttribute2(String attribute2) {this.attribute2 = attribute2;}public String getAttribute3() {return this.attribute3;}public void setAttribute3(String attribute3) {this.attribute3 = attribute3;}public String getUserSex() {return this.userSex;}public void setUserSex(String userSex) {this.userSex = userSex;}/*** @return the oId*/public Long getOId() {return OId;}/*** @param oId*            the oId to set*/public void setOId(Long oId) {OId = oId;}public String getSelfEmail() {return this.selfEmail;}public void setSelfEmail(String selfEmail) {this.selfEmail = selfEmail;}/*** @param orgId*            the orgId to set*/public void setOrgId(Long orgId) {this.orgId = orgId;}/*** @return the orgId*/public Long getOrgId() {return orgId;}/*** @param officeTel*            the officeTel to set*/public void setOfficeTel(String officeTel) {this.officeTel = officeTel;}/*** @return the officeTel*/public String getOfficeTel() {return officeTel;}/*** @param mobileTel*            the mobileTel to set*/public void setMobileTel(String mobileTel) {this.mobileTel = mobileTel;}/*** @return the mobileTel*/public String getMobileTel() {return mobileTel;}/*** @param id*            the id to set*/public void setId(Long id) {this.id = id;}/*** @return the id*/public Long getId() {return id;}public Date getLastUpdatedDate() {return lastUpdatedDate;}public void setLastUpdatedDate(Date lastUpdatedDate) {this.lastUpdatedDate = lastUpdatedDate;}}

组织对象

package model;import java.io.Serializable;
import java.util.Date;/*** 组织对象* * * @author Administrator* @version $Id$*/
public class JmsOrganize extends  BaseModel  implements Serializable{private static final long serialVersionUID = 1L;  private Long id;  private String orgName; //中文名  private String orgFullName;  private String orgEngName;//英文名  private Long orgTypeNo;  private String orgLevel;  private Long parentOrgId;  private String orgCode;  private String orgDesc;  private String isbranch;private  Date lastUpdatedDate;// Constructors  /** default constructor */  public JmsOrganize() {  }  /** default constructor */  public JmsOrganize(Long id, String orgName) {  this.setId(id);  this.orgName = orgName;  }  public JmsOrganize(Long id, Long parendId) {  this.setId(id);  this.parentOrgId = parendId;  }  /** minimal constructor */  public JmsOrganize(String orgName) {  this.orgName = orgName;  }  public String getOrgName() {  return this.orgName;  }  public void setOrgName(String orgName) {  this.orgName = orgName;  }  public String getOrgFullName() {  return this.orgFullName;  }  public void setOrgFullName(String orgFullName) {  this.orgFullName = orgFullName;  }  public Long getOrgTypeNo() {  return this.orgTypeNo;  }  public void setOrgTypeNo(Long orgTypeNo) {  this.orgTypeNo = orgTypeNo;  }  public String getOrgLevel() {  return this.orgLevel;  }  public void setOrgLevel(String orgLevel) {  this.orgLevel = orgLevel;  }  public String getOrgDesc() {  return this.orgDesc;  }  public void setOrgDesc(String orgDesc) {  this.orgDesc = orgDesc;  }  public String getIsbranch() {  return this.isbranch;  }  public void setIsbranch(String isbranch) {  this.isbranch = isbranch;  }  /** * @param parentOrgId the parentOrgId to set */  public void setParentOrgId(Long parentOrgId) {  this.parentOrgId = parentOrgId;  }  /** * @return the parentOrgId */  public Long getParentOrgId() {  return parentOrgId;  }  /** * @param orgCode the orgCode to set */  public void setOrgCode(String orgCode) {  this.orgCode = orgCode;  }  /** * @return the orgCode */  public String getOrgCode() {  return orgCode;  }  /** * @param orgEngName the orgEngName to set */  public void setOrgEngName(String orgEngName) {  this.orgEngName = orgEngName;  }  /** * @return the orgEngName */  public String getOrgEngName() {  return orgEngName;  }  /** * @param id the id to set */  public void setId(Long id) {  this.id = id;  }  /** * @return the id */  public Long getId() {  return id;  }public Date getLastUpdatedDate() {return lastUpdatedDate;}public void setLastUpdatedDate(Date lastUpdatedDate) {this.lastUpdatedDate = lastUpdatedDate;}
}

由于发送的是对象,所以提供一个转换器,Convertor,该类要继承Spring的MessageConvertor

package com.wc82.util;import java.text.DateFormat;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;import model.JmsFaUser;
import model.JmsOrganize;import org.apache.log4j.Logger;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;public class FaJmsConverter    implements  MessageConverter {private static final Logger logger =Logger.getLogger(FaJmsConverter.class);public Message toMessage(Object obj, Session session)throws JMSException, MessageConversionException {ObjectMessage objMsg=session.createObjectMessage();      if(obj instanceof JmsFaUser){  JmsFaUser user = (JmsFaUser)obj;logger.info("The user message's userId is " + user.getId());  objMsg.setStringProperty("dataFlag", "FaUser");  objMsg.setStringProperty("userName", user.getUserName());  objMsg.setLongProperty("userId", user.getId());  objMsg.setStringProperty("officeEmail", user.getOfficeEmail());  objMsg.setStringProperty("selfEmail", user.getSelfEmail());  if(user.getOfficeTel() != null){  objMsg.setStringProperty("officeTel", user.getOfficeTel());  }else{  objMsg.setLongProperty("officeTel", new Long(0));  }  if(user.getMobileTel() != null){  objMsg.setStringProperty("mobileTel", user.getMobileTel());  }else{  objMsg.setLongProperty("mobileTel", new Long(0));  }  if(user.getLastUpdatedDate() != null){  objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(user.getLastUpdatedDate()));  }  }else if(obj instanceof JmsOrganize){  JmsOrganize org = (JmsOrganize)obj;  logger.info("The org message's userId is " + org.getId());  objMsg.setStringProperty("dataFlag", "Organize");  objMsg.setLongProperty("orgId", org.getId());  objMsg.setStringProperty("orgName", org.getOrgName());  if(org.getLastUpdatedDate() != null){  objMsg.setStringProperty("lastUpdatedDate", DateFormat.getDateTimeInstance().format(org.getLastUpdatedDate()));  }  }  return objMsg;  }public Object fromMessage(Message message) throws JMSException,MessageConversionException {logger.info("from message");ObjectMessage objMessage = (ObjectMessage)message;  String dataFlag = objMessage.getStringProperty("dataFlag");  if("FaUser".equals(dataFlag)){  JmsFaUser user = new JmsFaUser();  user.setId(objMessage.getLongProperty("userId"));  user.setUserName(objMessage.getStringProperty("userName"));  user.setOfficeEmail(objMessage.getStringProperty("officeEmail"));  user.setSelfEmail(objMessage.getStringProperty("selfEmail"));  user.setOfficeTel(objMessage.getStringProperty("officeTel"));  user.setMobileTel(objMessage.getStringProperty("mobileTel"));  String lastDate = objMessage.getStringProperty("lastUpdatedDate");  try {  if(lastDate != null){  user.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate));  }  } catch (Exception e) {  e.printStackTrace();  }  return user;  }else if("Organize".equals(dataFlag)){  Long orgId = objMessage.getLongProperty("orgId");  String orgName = objMessage.getStringProperty("orgName");  JmsOrganize organize = new JmsOrganize(orgId, orgName);  String lastDate = objMessage.getStringProperty("lastUpdatedDate");  try {  organize.setLastUpdatedDate(DateFormat.getDateTimeInstance().parse(lastDate));  } catch (Exception e) {  e.printStackTrace();  }  return organize;  }  return null;}}

3、发送

Sprng为我们提供了JMSTemplate,基于这个发送消息,我们先来看看Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><!-- 外部属性文件的定义  -->  <bean id="propertyConfigurer"  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  <property name="locations">   <list>   <value>classpath:sender.properties</value>   </list>  </property>  </bean>  <!-- 配置connectionFactory -->  <bean id="jmsSenderFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  destroy-method="stop">  <property name="connectionFactory">  <bean class="org.apache.activemq.ActiveMQConnectionFactory">  <property name="brokerURL" value="${jms.sendBrokerURL}">  </property>  <property name="useAsyncSend" value="true"></property>  <property name="userName" value="admin"></property>  <property name="password" value="admin"></property> </bean>  </property>  <property name="maxConnections" value="100"></property>  </bean>  <bean id="jmsConverter" class="com.wc82.util.FaJmsConverter" />  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  <property name="connectionFactory" ref="jmsSenderFactory"></property>  <property name="defaultDestination" ref="destination" />  <!-- 区别它采用的模式为false是p2p为true是订阅 -->  <property name="pubSubDomain" value="true" />  <property name="messageConverter" ref="jmsConverter"></property>  </bean>  <!-- 发送消息的目的地(一个队列) -->  <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">  <!-- 设置消息队列的名字 -->  <constructor-arg index="0" value="${jms.sendDestinationName}" />  </bean>  <bean id="messageProducer" class="message.MessageProducer">  <property name="jmsTemplate" ref="jmsTemplate"></property>  </bean>  </beans>

根据上面的配置,只需要获得messageProducer这个Bean,便可以发送,下面我来看看MessageProducer这个类及其接口

package message.face;public interface IJMSMessageProducer {public abstract void converAndSendObjectMessage(Object obj);
}

实现类

package message;import message.face.IJMSMessageProducer;import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;public class MessageProducer implements IJMSMessageProducer {private static final Logger logger =Logger.getLogger(MessageProducer.class);private JmsTemplate jmsTemplate;/*** @param jmsTemplate*            the jmsTemplate to set*/public void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}/** (non-Javadoc)* * @see* com.vispractice.faf.jms.IJMSMessageProducer#converAndSendObjectMessage* (java.lang.Object)* * @date 2012-10-25* * @user*/public void converAndSendObjectMessage(Object obj) {jmsTemplate.convertAndSend(obj);logger.info("The message pub success, the Object is " + obj);}}

发送测试类

package com.wc82.ActivemqTransData;import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;import message.face.IJMSMessageProducer;
import model.JmsFaUser;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SendTest {public static void main(String[] args) {Properties pro=new Properties();try {InputStream input=SendTest.class.getResourceAsStream("/sender.properties");pro.load(input);} catch (FileNotFoundException e) {// TODO Auto-generated catch block
                e.printStackTrace();pro.setProperty("count", new Integer(0).toString());} catch (IOException e) {// TODO Auto-generated catch block
                e.printStackTrace();}Object obj=pro.get("jms.jmsSendConverterClass");ApplicationContext ac = new ClassPathXmlApplicationContext(  "jms-sender.xml");  IJMSMessageProducer messageProducer = (IJMSMessageProducer) ac.getBean("messageProducer");  JmsFaUser user = new JmsFaUser();  user.setUserName("ssss");  user.setId(new Long(111));  messageProducer.converAndSendObjectMessage(user);}
}

那发送到神马位置呢?在发送消息的Spring配置文件里面,有一个jms.sendBrokerURL,这个值是在sender.properties文件中配置的,方便修改,我们来看一下关于发送消息时所以配置的参数信息

jms.sendBrokerURL=tcp\://localhost\:61616
jms.sendDestinationName=faJMS

第一个指地址,因为ActiveMQ部署的是我本机,所以使用localhost,端口号在部署的时候,就是默认的

第二个JMS的名称,这个可以自取

如果发送,可以通过localhost:8161的控制台,看到你所发送的消息,这个地址是ActiveMQ的Web控制台,接下为我们看看接收

4、接收

MQ给我提供一种方式,当接收到消息的时候,自动去执行我们业务代码,

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><description>jms receiver configuration</description>  <!-- 外部属性文件的定义  -->  <bean id="propertyConfigurer"  class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  <property name="locations">   <list>   <value>classpath:receiver.properties</value>   </list>  </property>  </bean> <!-- 配置connectionFactory -->  <bean id="jmsReceiverFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  destroy-method="stop">  <property name="connectionFactory">  <bean class="org.apache.activemq.ActiveMQConnectionFactory">  <property name="brokerURL" value="${jms.receiveBrokerURL}" />
<!--                 <property name="userName" value="reader"></property> -->
<!--                 <property name="password" value="readeryxtech"></property> -->  </bean>  </property>  <property name="maxConnections" value="100"></property>  </bean>  <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">  <!-- 设置消息队列的名字 -->  <constructor-arg index="0" value="${jms.receiveDestinationName}" />  </bean>  <bean id="jmsConverter" class="com.wc82.util.FaJmsConverter" />  <!--异步调用消息 -->  <bean id="receive"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">  <property name="connectionFactory" ref="jmsReceiverFactory"></property>  <property name="destination" ref="destination"></property>  <property name="messageListener" ref="messageListener"></property></bean>  <bean id="delegate" class="message.MessageConsumer"></bean><bean id="messageListener"  class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  <property name="delegate" ref="delegate"></property>  <property name="defaultListenerMethod" value="onMessage"></property>  <property name="messageConverter" ref="jmsConverter" />  </bean></beans>

先看一下messageListener,这个监听,是自己写的,其中delegate是当你接收消息之后所执行的业务代码的bean,我这里将这个bean做成配置,方便修改,而defaultListenerMethod是指定执行的方法,这里设置定了,执行omMessage方法,也就是说类你可以指定,但是onMessage方法必须要有,而convertor也是可配的,与发送的convertor是一样的,这个转换器发送与接收都必须要使用

注意:这里为神马要将执行的类的bean做成可配呢?因为我在做这一块的工作时,我不知道是由那个bean,但是类中的方法我可设置,这样做的目地就是在消息的消费者不在去关心这个消息,而只需要写一个类,里面有onMessage方法,在这个方法里做自己的业务逻辑即可,将其关注点放到业务处理上,那肿么样来设计这个方法的名字呢,我们可以设计了一个接口,如果消费者想消费这个消息,就必须实现这上接口,下面我们来看看这个接口

package message.face;public interface FaJmsReceiveListener {public void onMessage(Object baseModel);
}

这个接口中只有一个方法就是onMessage,而实现类则交由具体的消费者,因为消费可能在实现的时候可能引用别的bean,进而处理别的业务,例如入库,所以我们在做接收消息的配置上,只设计处理消息的bean的名字,不设置具体的class,只要消费者配置了这个bean的名字即可,当然这个名字也是可配的,配置在了init.propertis中,我们来看看这个接口的实现类

package message;import org.apache.log4j.Logger;import message.face.FaJmsReceiveListener;
import model.JmsFaUser;
import model.JmsOrganize;public class MessageConsumer   implements  FaJmsReceiveListener{private  static  final  Logger logger=Logger.getLogger(MessageConsumer.class);public void onMessage(Object baseModel) {if(baseModel instanceof JmsFaUser){  JmsFaUser user = (JmsFaUser)baseModel;  logger.info(user.getId());}else if (baseModel instanceof JmsOrganize){  JmsOrganize org = (JmsOrganize)baseModel;  logger.info(org.getId());}  }}

上述的代码可以共用,当发送的类变更时,只需要编写Convertor,并在init.properties中配置上即可,而消息的消费者只需要实现该接口即可

测试类

package com.wc82.ActivemqTransData;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class ReceiveTest {public static void main(String[] args) {  ApplicationContext a2c = new ClassPathXmlApplicationContext("jms-receiver.xml");  System.out.println("receiver");  }
}

接下来,看看接收的receiver.properties

jms.receiveBrokerURL=tcp\://localhost\:61616
jms.receiveDestinationName=faJMS

第一个、第二个、第三个参数不再详细述说,第四就是执行业务逻辑的bean的名字,这个参数可以设定死也可以设定灵活

上述只是初步的能发送和接收消息,后续是考虑安全、性能的问题。

注意测试时:要先打开消息接收程序,然后再打开消息发送程序。activemq是实时性的,不会将消息保存下来,发送消息的程序在发送消息时,如果接收程序没有启动,那么这个接收程序就接收不到这个消息。下一节,就是用来解决这个持久订阅的问题。

基于ActiveMQ的Topic的数据同步——初步实现相关推荐

  1. 基于Java方式如何实现数据同步

    本篇内容介绍了"基于Java方式如何实现数据同步"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读, ...

  2. elasticsearch 数据类型_基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    来源;马蜂窝 一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存 ...

  3. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    一.为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数 ...

  4. 基于 MySQL Binlog 的 Elasticsearch 数据同步实践 原

    一.背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品.订单等数据的多维度检索. 使用 Elasticsearch 存储业务数据可以 ...

  5. php阿里的同步工具canal,基于阿里的Canal实现数据同步

    一.开启同步数据库的binlog功能 (1)开启同步数据端的数据库服务(比如我的将一号虚拟机上的mysql数据库作为同步操作数据库) systemctl start mysql.service mys ...

  6. 基于虚拟日志压缩的数据同步方案

    来源:http://wenku.baidu.com/view/61976c005f0e7cd184253698.html 转载于:https://www.cnblogs.com/AngelLee200 ...

  7. 基于Canal的MySQL=>ES数据同步方案

    文章目录 1.MySQL和ES的主要区别? 1.1 功能性 1.2 性能指标 1.3 在搜索业务上的区别 1.3.1 查询 1.3.2 检索 2.为什么要做数据同步 2.1 检索性能 2.2 写入性能 ...

  8. 基于数据库数据增量同步_基于 Flink SQL CDC 的实时数据同步方案

    简介:Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的 ...

  9. cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案

    作者:伍翀 (云邪) 整理:陈政羽(Flink 社区志愿者) Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化?本文由 Apache Flink P ...

最新文章

  1. SQLMAP使用笔记
  2. 如何用python进行量化交易_从零开始学习Python和量化交易
  3. 14周课堂测试---找水王
  4. IE8下submit表单没反应
  5. php 对象 final,PHP7_OOP_对象重载以及魔术方法_对象遍历_final关键字
  6. 上周热点回顾(6.14-6.20)
  7. 反射,System.Type类
  8. 第3课 攀天梯(ladder)--记忆化搜索(python3实现)
  9. prev php,PHP prev() 函数 ——jQuery中文网
  10. react中form可以嵌套一个form吗_Ant-Design从v3升级到v4的Form适配
  11. 本地与服务器文件同步软件哪个好,同步软件哪个好,亲身体验的3款免费同步软件介绍...
  12. 软件测试实例:登录功能怎么设计测试用例
  13. flutter 弹幕 yzl_flutter_bulletchat的使用
  14. laravel:如何快速实现数据填充,创建模拟数据(使用seeder)
  15. 《深度学习从0开始》
  16. 浙江大学计算机学霸作息,浙大顶级学霸作息表曝光:世界本不公平,你有多努力,就有多特殊...
  17. CAE-仿真案例学习
  18. 服务注册eureka上显示ip地址出现的问题
  19. DOTA-PEG-WSW/DOTA-PEG-NGR/DOTA-PEG-R8/DOTA-PEG-YIGSR 大环配体PEG化偶联多肽
  20. 安卓机器人做图软件_绘画机器人andy app下载-美图秀秀绘画机器人v7.0安卓版_5577安卓网...

热门文章

  1. Maven配置之pom.xml(一)
  2. War3窗口限定小工具发布
  3. Eclipse编写第一个Java程序
  4. android 多个语音合成,android实现语音合成
  5. Spring MVC拦截器(Interceptor)的配置及使用
  6. python怎么实现黑客攻击英国_注意!你的隐私就是这样被黑客获取的
  7. python计算在月球的体重_NumPy-快速处理数据--矩阵运算
  8. asp.net 设置 excel alignment_Python 进阶(六): Excel 基本操作
  9. 「译文」你必须掌握的 7 种 JavaScript 错误类型
  10. 关于excel表的生成