目前关于databus的相关资料较少,特别是针对mysql的文档尤为稀少。本篇文章中介绍了databus相关组件及实现原理,初步实现了databus对mysql数据库的数据抓取,希望对后续使用者能提供一定的参考作用。

1. 简介

Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。

Databus有以下特点:

数据源和消费者之间的隔离。

数据传输能保证顺序性和至少一次交付的高可用性。

从变化流的任意时间点进行消费,包括通过bootstrap获取所有数据。

分区消费

源一致性保存,消费不成功会一直消费直到消费成功

2. 功能&特性

来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。

可扩展、高度可用:Databus能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。

事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。

低延迟、支持多种订阅机制:数据源变更完成后,Databus能在毫秒级内将事务提交给消费者。同时,消费者使用Databus中的服务器端过滤功能,可以只获取自己需要的特定数据。

无限回溯:对消费者支持无限回溯能力,例如当消费者需要产生数据的完整拷贝时,它不会对数据库产生任何额外负担。当消费者的数据大大落后于来源数据库时,也可以使用该功能。

3. 使用场景举例

BUSSINESS1 和 BUSSINESS2 是两个不同的业务逻辑,他们的变更需要同时写入到 DB 和 CACHE ,那么当他们同时修改同一个数据的时候是否能保证数据的一致性呢?可以发现如果按照下图标明的顺序进行操作并不能保证数据的一致性!

还有一个问题是变更完DB之后,更新CACHE失败怎么办?如果忽略的话,会造成后续读取到CACHE中旧的数据,如果重试的话,业务代码会写得更加复杂。针对这些场景,如果没有一个强一致协议是很难解决掉的。如果要业务逻辑去实现这些晦涩的一致性协议,却又是不现实的。

现在,有了Databus,上面提到的这些一致性问题就都没有了,并且那些冗长的双写逻辑也可以去掉了,如下图所示:

4. 系统整体架构与主要组件

4.1 系统整体架构

上图中介绍了Databus系统的构成,包括Relays、bootstrap服务和Client lib等。Bootstrap服务中包括Bootstrap Producer和Bootstrap Server。快速变化的消费者直接从Relay中取事件。如果一个消费者的数据更新大幅落后,它要的数据就不在Relay的日志中,而是需要请求Bootstrap服务,返回的将会是自消费者上次处理变更之后的所有数据变更快照。

Source Databases:MySQL以及Oracle数据源

Relays:负责抓取和存储数据库变更,全内存存储,也可配置使用mmap内存映射文件方式

Schema Registry:数据库数据类型到Databus数据类型的一个转换表

Bootstrap Service:一个特殊的客户端,功能和Relays类似,负责存储数据库变更,主要是磁盘存储

Application:数据库变更消费逻辑,从Relay中拉取变更,并消费变更

Client Lib:提供挑选关注变更的API给消费逻辑

Consumer Code:变更消费逻辑,可以是自身消费或者再将变更发送至下游服务

4.2 主要组件及功能

上图系统整体架构图画的比较简略,下载源码观察项目结构后不难发现databus的主要由以下四个组件构成:

Databus Relay:

从源数据库中的Databus源中读取变化的行并序列化为Databus变化事件保存到内存缓冲区中。

监听Databus客户端的请求(包括引导程序的请求)并传输Databus数据变化事件。

Databus Client:

在Relay上检查新的数据变化事件和处理特定的业务逻辑的回调。

如果它们在relay后面落下太远,到引导程序服务运行一个追溯查询。

单独的客户端可以处理全部的Databus流,它们也可以作为集群的一部分而每个客户端处理一部分流。

Databus Bootstrap Producer:

只是一个特殊的客户端。

检查Relay上的新的数据变化事件。

保存数据变化事件到Mysql数据库,Mysql数据库用于引导程序和为了客户端追溯数据。

Databus Bootstrap Server:

监听来自Databus客户端的请求并为了引导和追溯返回一个超长的回溯的数据变化事件。

5. Databus Relay和Databus Client详细分析

5.1 Databus Relay

5.1.1 架构与组件功能

Databus Event Producer(DBEP):定期从数据库中查询变更,如果检测到变更,它将读取数据库中的所有已更改的行,并将其转换为Avro记录。因为数据库数据类型和Databus数据类型不一致,因此需要 Schema Registry 做转换。

SCN(System Change Number):系统改变号,是数据库中非常重要的一个数据结构。SCN用以标识数据库在某个确切时刻提交的版本。在事务提交时,它被赋予一个唯一的标识事务的SCN。

Event Buffers:按照SCN的顺序存储databus事件,buffer可以是纯内存的,也可以是mmap到文件系统的。每个buffer在内存中还有一个对应的SCN Index和一个MaxSCN reader/writer,SCN Index可以加快查询指定事件的速度。

Request Processor:通过监听Netty的channel,实现收发client的请求。

MaxSCN Reader/Writer:用于跟踪DBEP的处理进度;Reader在Databus启动的时候会读取存储的文件上一次DBEP处理的位置,当Databus从DBEP中读取变更存储到Event Buffers时,Writer就会最后一个SCN写入到文件中存储,这样就能保证下次启动可以从正确的位置读取数据库变更。

JMX(Java Management Extensions):支持标准的Jetty容器,databus提供了多个Mbean来监控relay

ContainerStatsMBean

DbusEventsTotalStatsMBean

DbusEventsStatisticsCollectorMBean

RESTFul Interface:Realy提供了相关http接口供外部调用,Client与Relay建立http长连接,并从Relay拉取Event。

5.1.2 源码分析

ServerContainer._globalStatsThread:统计信息的线程

OpenReplicatorEventProducer.EventProducerThread:针对mysql binlog日志的Event生产者线程,每个source一个线程,持有_orListener,管理和数据库的连接,将变更写入到Event Buffer里。

EventProducerThread启动后会初始化类型为OpenReplicator的日志解析对象开始解析日志,同时初始化类型为ORListener的_orListener开始监听,代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public void run()

{

_eventBuffer.start(_sinceScn);

_startPrevScn.set(_sinceScn);

initOpenReplicator(_sinceScn);

try

{

boolean started = false;

while (!started) {

try {

_or.start();

started = true;

}

catch (Exception e) {

_log.error("Failed to start OpenReplicator: " + e);

_log.warn("Sleeping for 1000 ms");

Thread.sleep(1000);

}

}

_orListener.start();

} catch (Exception e)

{

_log.error("failed to start open replicator: " + e.getMessage(), e);

return;

}

}

初始化方法如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

void initOpenReplicator(long scn)

{

int offset = offset(scn);

int logid = logid(scn);

String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);

// we should use a new ORListener to drop the left events in binlogEventQueue and the half processed transaction.

_orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,

_tableUriToSrcNameMap, _schemaRegistryService, 200, 100L);

_or.setBinlogFileName(binlogFile);

_or.setBinlogPosition(offset);

_or.setBinlogEventListener(_orListener);

//must set transport and binlogParser to null to drop the old connection environment in reinit case

_or.setTransport(null);

_or.setBinlogParser(null);

_log.info("Connecting to OpenReplicator " + _or.getUser() + "@" + _or.getHost() + ":" + _or.getPort() + "/"

+ _or.getBinlogFileName() + "#" + _or.getBinlogPosition());

}

EventProducerThread._orListener:监听数据库变更,将变更转换为Avro记录,写入到transaction里面,最终调用_producerThread的onEndTransaction()方法将事务里的事件写入到Event Buffer里,代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

public void onEndTransaction(Transaction txn) throws DatabusException

{

try

{

addTxnToBuffer(txn);

_maxSCNReaderWriter.saveMaxScn(txn.getIgnoredSourceScn()!=-1 ? txn.getIgnoredSourceScn() : txn.getScn());

}

catch (UnsupportedKeyException e)

{

_log.fatal("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer", e);

throw new DatabusException(e);

}

catch (EventCreationException e)

{

_log.fatal("Got EventCreationException exception while adding txn (" + txn + ") to the buffer", e);

throw new DatabusException(e);

}

}

FileMaxSCNHandler负责读写SCN,注意在写入文件时会将原有文件重命名为XXX.temp,原因是为了防止在更新文件的时候发生错误,导致SCN丢失,代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

private void writeScnToFile() throws IOException

{

long scn = _scn.longValue();

File dir = _staticConfig.getScnDir();

if (! dir.exists() && !dir.mkdirs())

{

throw new IOException("unable to create SCN file parent:" + dir.getAbsolutePath());

}

// delete the temp file if one exists

File tempScnFile = new File(_scnFileName + TEMP);

if (tempScnFile.exists() && !tempScnFile.delete())

{

LOG.error("unable to erase temp SCN file: " + tempScnFile.getAbsolutePath());

}

File scnFile = new File(_scnFileName);

if (scnFile.exists() && !scnFile.renameTo(tempScnFile))

{

LOG.error("unable to backup scn file");

}

if (!scnFile.createNewFile())

{

LOG.error("unable to create new SCN file:" + scnFile.getAbsolutePath());

}

FileWriter writer = new FileWriter(scnFile);

writer.write(Long.toString(scn));

writer.write(SCN_SEPARATOR + new Date().toString());

writer.flush();

writer.close();

LOG.debug("scn persisted: " + scn);

}

以源码例子中PersonRelayServer的主类启动为起点,大致的启动流程如下:

PersonRelayServer主方法 -> new DatabusRelayMain实例 -> 调用initProducers方法初始化生产者->根据配置调用addOneProducer增加生产者->new DbusEventBufferAppendable获得Event Buffer->new EventProducerServiceProvider实例->

调用createProducer获得OpenReplicatorEventProducer->OpenReplicatorEventProducer中包含

EventProducerThread->启动线程开始获取Event

5.2 Databus Client

5.2.1 架构与组件功能

Relay Puller:负责从relay拉取数据,具体工作有挑选relay,请求source,请求Register,校验schema,设置dispatcher等。

Dispatcher:从event buffers中读取事件,调用消费逻辑的回调,主要职责有:

判断回调是否正确,回调失败后会进行重试,重试次数超限后抛出异常

监控错误和超时

持久化checkpoint

Checkpoint persistence Provider:checkpoint是消费者消费变更记录点的位置,负责将checkpoint持久化到本地,保证下次启动后可以从正常的位置pull event。

Event Callback:调用消费者自定义业务逻辑代码。

Bootstrap Puller:负责从Bootstrap servers拉取数据,功能类似Relay Puller。

5.2.2 源码分析

执行Client的启动脚本后会调用main方法,main方法会根据命令行参数中指定的属性文件创建StaticConfig类,然后配置类创建dbusHttpClient实例来与Relay进行通信,参数defaultConfigBuilder为默认配置类信息,可以为空,代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

public static DatabusHttpClientImpl createFromCli(String[] args, Config defaultConfigBuilder) throws Exception

{

Properties startupProps = ServerContainer.processCommandLineArgs(args);

if (null == defaultConfigBuilder) defaultConfigBuilder = new Config();

ConfigLoader staticConfigLoader =

new ConfigLoader("databus.client.", defaultConfigBuilder);

StaticConfig staticConfig = staticConfigLoader.loadConfig(startupProps);

DatabusHttpClientImpl dbusHttpClient = new DatabusHttpClientImpl(staticConfig);

return dbusHttpClient;

}

设置要连接的Relay信息,然后通过参数defaultConfigBuilder传递给dbusHttpClient,代码如下:

1

2

3

4

5

DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();

configBuilder.getRuntime().getRelay("1").setHost("localhost");

configBuilder.getRuntime().getRelay("1").setPort(11115);

configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);

}

启动databus client过程如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

protected void doStart()

{

_controlLock.lock();

try

{

// 绑定并开始接收来到的连接

int portNum = getContainerStaticConfig().getHttpPort();

_tcpChannelGroup = new DefaultChannelGroup();

_httpChannelGroup = new DefaultChannelGroup();

_httpServerChannel = _httpBootstrap.bind(new InetSocketAddress(portNum));

InetSocketAddress actualAddress = (InetSocketAddress)_httpServerChannel.getLocalAddress();

_containerPort = actualAddress.getPort();

// 持久化端口号 (文件名对容器来说必须唯一)

File portNumFile = new File(getHttpPortFileName());

portNumFile.deleteOnExit();

try {

FileWriter portNumFileW = new FileWriter(portNumFile);

portNumFileW.write(Integer.toString(_containerPort));

portNumFileW.close();

LOG.info("Saving port number in " + portNumFile.getAbsolutePath());

} catch (IOException e) {

throw new RuntimeException(e);

}

_httpChannelGroup.add(_httpServerChannel);

LOG.info("Serving container " + getContainerStaticConfig().getId() +

" HTTP listener on port " + _containerPort);

if (_containerStaticConfig.getTcp().isEnabled())

{

int tcpPortNum = _containerStaticConfig.getTcp().getPort();

_tcpServerChannel = _tcpBootstrap.bind(new InetSocketAddress(tcpPortNum));

_tcpChannelGroup.add(_tcpServerChannel);

LOG.info("Serving container " + getContainerStaticConfig().getId() +

" TCP listener on port " + tcpPortNum);

}

_nettyShutdownThread = new NettyShutdownThread();

Runtime.getRuntime().addShutdownHook(_nettyShutdownThread);

// 5秒后开始producer线程

if (null != _jmxConnServer && _containerStaticConfig.getJmx().isRmiEnabled())

{

try

{

_jmxShutdownThread = new JmxShutdownThread(_jmxConnServer);

Runtime.getRuntime().addShutdownHook(_jmxShutdownThread);

_jmxConnServer.start();

LOG.info("JMX server listening on port " + _containerStaticConfig.getJmx().getJmxServicePort());

}

catch (IOException ioe)

{

if (ioe.getCause() != null && ioe.getCause() instanceof NameAlreadyBoundException)

{

LOG.warn("Unable to bind JMX server connector. Likely cause is that the previous instance was not cleanly shutdown: killed in Eclipse?");

if (_jmxConnServer.isActive())

{

LOG.warn("JMX server connector seems to be running anyway. ");

}

else

{

LOG.warn("Unable to determine if JMX server connector is running");

}

}

else

{

LOG.error("Unable to start JMX server connector", ioe);

}

}

}

_globalStatsThread.start();

}

catch (RuntimeException ex)

{

LOG.error("Got runtime exception :" + ex, ex);

throw ex;

}

finally

{

_controlLock.unlock();

}

}

6. Databus for Mysql实践

6.1 相关解释

实现原理:通过解析mysql的binlog日志来获取变更事件,解析过程利用Java开源工具OpenReplicator,Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用,所有的Event实现了BinlogEventV4接口。

binlog 格式:Databus设计为针对Row格式日志进行解析

Statement:基于SQL语句的复制(statement-based replication,SBR)

Row:基于行的复制(row-based replication,RBR)

Mixed:混合模式复制(mixed-based replication,MBR)

SCN的确定:64bits组成,高32位表示binlog的文件序号,低32位代表event在binlog文件的offset,例如在 mysql-bin.000001文件中 offset为 4的scn表示为(1 << 32) | 4 = 4294967300

6.2 数据库环境配置

安装mysql数据库,本次使用mysql-5.5.56版本。

查看数据库是否开启binlog,如果binlog没有开启,可以通过set sql_log_bin=1命令来启用;如果想停用binlog,可以使用set sql_log_bin=0。

配置数据库binlog_format=ROW, show variables like ‘binlog_format‘可查看日志格式, set globle binlog_format=ROW’可设置,通过修改my.cnf文件也可以,增加或修改行binlog_format=ROW即可。

binlog_checksum设置为空,show global variables like ‘binlog_checksum’命令可查看,set binlog_checksum=none可设置。

在mysql上创建名为or_test的数据库,or_test上创建表名为person的表,定义如下:

6.3 Demo配置与运行

6.3.1 下载源码

database:数据库模拟相关的脚本和工具

databus2-example-bst-producer-pkg:bootstrap producer的属性配置文件夹,包括bootstrap producer和log4j属性文件,build脚本以及bootstrap producer的启动和停止脚本。

databus2-example-client-pkg:client的属性配置文件夹,包括各种属性文件和启动和停止脚本。

databus2-example-client:client源代码,包含启动主类和消费者代码逻辑。

databus2-example-relay-pkg:relay的属性配置文件夹,包含监控的表的source信息和Avro schema。

databus2-example-relay:relay的启动主类。

schemas_registry:存放表的avsc文件。

6.3.2 Relay端的操作

配置Relay属性文件:databus2-example-relay-pkg/conf/relay-or-person.properties的内容如下配置,包括端口号,buffer存储策略,maxScn存放地址等信息:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

databus.relay.container.httpPort=11115

databus.relay.container.jmx.rmiEnabled=false

databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY

databus.relay.eventBuffer.queuePolicy=OVERWRITE_ON_WRITE

databus.relay.eventLogReader.enabled=false

databus.relay.eventLogWriter.enabled=false

databus.relay.schemaRegistry.type=FILE_SYSTEM

databus.relay.schemaRegistry.fileSystem.schemaDir=./schemas_registry

databus.relay.eventBuffer.maxSize=1024000000

databus.relay.eventBuffer.readBufferSize=10240

databus.relay.eventBuffer.scnIndexSize=10240000

databus.relay.physicalSourcesConfigsPattern=../../databus2-example/databus2-example-relay-pkg/conf/sources-or-person.json

databus.relay.dataSources.sequenceNumbersHandler.file.scnDir=/tmp/maxScn

databus.relay.startDbPuller=true

配置被监控表的source信息:databus2-example-relay-pkg/conf/sources-or-person.json的内容如下配置,其中URI format:mysql://username/password@mysql_host[:mysql_port]/mysql_serverid/binlog_prefix,注意%2F为转义字符,用户名为root,数据库密码为123。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

{

"name" : "person",

"id" : 1,

"uri" : "mysql://root%2F123@localhost:3306/1/mysql-bin",

"slowSourceQueryThreshold" : 2000,

"sources" :

[

{

"id" : 40,

"name" : "com.linkedin.events.example.or_test.Person",

"uri": "or_test.person",

"partitionFunction" : "constant:1"

}

]

}

databus2-example-relay-pkg/schemas_registry/下定义person的Avro schema文件

com.linkedin.events.example.or_test.Person.1.avsc,其中1表示版本(Databus目前没有针对mysql提供生成Avro schema文件的工具,所以只能手工编写)具体内容如下所示:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

{

"name" : "Person_V1",

"doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",

"type" : "record",

"meta" : "dbFieldName=person;pk=id;",

"namespace" : "com.linkedin.events.example.or_test",

"fields" : [ {

"name" : "id",

"type" : [ "long", "null" ],

"meta" : "dbFieldName=ID;dbFieldPosition=0;"

}, {

"name" : "firstName",

"type" : [ "string", "null" ],

"meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"

}, {

"name" : "lastName",

"type" : [ "string", "null" ],

"meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"

}, {

"name" : "birthDate",

"type" : [ "long", "null" ],

"meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"

}, {

"name" : "deleted",

"type" : [ "string", "null" ],

"meta" : "dbFieldName=DELETED;dbFieldPosition=4;"

} ]

}

注册Avro schema到index.schemas_registry文件,databus2-example-relay-pkg/schemas_registry/index.schemas_registry文件中添加行com.linkedin.events.example.or_test.Person.1.avsc ,每定义一个Avro schema都需要添加进去,relay运行时会到此文件中查找表对应的定义的Avro schema。

6.3.3 Client端的操作

配置Client属性文件:databus2-example-client-pkg/conf/client-person.properties的内容如下配置,包括端口号,buffer存储策略,checkpoint持久化等信息:

1

2

3

4

5

6

7

8

9

10

11

12

databus.relay.container.httpPort=11125

databus.relay.container.jmx.rmiEnabled=false

databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY

databus.relay.eventBuffer.queuePolicy=BLOCK_ON_WRITE

databus.relay.schemaRegistry.type=FILE_SYSTEM

databus.relay.eventBuffer.maxSize=10240000

databus.relay.eventBuffer.readBufferSize=1024000

databus.relay.eventBuffer.scnIndexSize=1024000

databus.client.connectionDefaults.pullerRetries.initSleep=1

databus.client.checkpointPersistence.fileSystem.rootDirectory=./personclient-checkpoints

databus.client.checkpointPersistence.clearBeforeUse=false

databus.client.connectionDefaults.enablePullerMessageQueueLogging=true

databus2-example-client/src/main/java下的PersonConsumer类是消费逻辑回调代码,主要是取出每一个event后依次打印每个字段的名值对,主要代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

private ConsumerCallbackResult processEvent(DbusEvent event, DbusEventDecoder eventDecoder)

{

GenericRecord decodedEvent = eventDecoder.getGenericRecord(event, null);

try {

Utf8 firstName = (Utf8)decodedEvent.get("firstName");

Utf8 lastName = (Utf8)decodedEvent.get("lastName");

Long birthDate = (Long)decodedEvent.get("birthDate");

Utf8 deleted = (Utf8)decodedEvent.get("deleted");

LOG.info("firstName: " + firstName.toString() +

", lastName: " + lastName.toString() +

", birthDate: " + birthDate +

", deleted: " + deleted.toString());

} catch (Exception e) {

LOG.error("error decoding event ", e);

return ConsumerCallbackResult.ERROR;

}

return ConsumerCallbackResult.SUCCESS;

}

databus2-example-client/src/main/java下的PersonClient类是relay的启动主类,主要是设置启动Client的配置信息,将消费者实例注册到监听器中,后续可对其进行回调,主要代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public static void main(String[] args) throws Exception

{

DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();

//Try to connect to a relay on localhost

configBuilder.getRuntime().getRelay("1").setHost("localhost");

configBuilder.getRuntime().getRelay("1").setPort(11115);

configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);

//Instantiate a client using command-line parameters if any

DatabusHttpClientImpl client = DatabusHttpClientImpl.createFromCli(args, configBuilder);

//register callbacks

PersonConsumer personConsumer = new PersonConsumer();

client.registerDatabusStreamListener(personConsumer, null, PERSON_SOURCE);

client.registerDatabusBootstrapListener(personConsumer, null, PERSON_SOURCE);

//fire off the Databus client

client.startAndBlock();

}

6.3.4 build-启动-测试

Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令gradle -Dopen_source=true assemble即可完成build,成功后在databus根目录下生成名为build的文件夹

启动Relay:

cd build/databus2-example-relay-pkg/distributions

tar -zxvf databus2-example-relay-pkg.tar.gz解压

执行启动脚本 ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json

启动Client:

cd build/databus2-example-client-pkg/distributions

tar -zxvf databus2-example-client-pkg.tar.gz解压

执行启动脚本 ./bin/start-example-client.sh person

测试:

Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了,现在向person表插入一条如下记录:

databus2-example-relay-pkg/distributions/logs下的relay.log记录如下:

databus2-example-client-pkg/distributions/logs下的client.log记录如下:

可以看到已经可以抓取到改变的数据了!

7. 总结

遇到的问题:

主要是属性文件的配置问题,包括source-or-person.json, schemas_registry的文件缺失或配置错误。

脚本方式启动时JVM无法创建,由于脚本启动时包含了自定义的JVM参数,与系统环境不符导致启动失败,去掉相关参数后正常启动。

Relay可以获取增删改查的Event,但Client只能解析到更新操作的Event,主要原因是Mysql默认的binlog_format=MIXED,而databus的设计是针对ROW格式的binlog,修改格式后可正常解析。

Windows平台无法使用,启动方式是用脚本启动,脚本启动时包含命令行参数较多,启动后无法进行调试,只能通过对日志观察的方式来进行。

需要进一步实验:

使用bootstrap produces和bootstrap servers模式来进行大批量事件的获取

配置多个relay进行事件抓取

结合zookeeper来配置客户端集群进行消费

参考资料

databus mysql搭建_Databus架构分析与初步实践(for mysql)相关推荐

  1. mysql databus_Databus架构分析与初步实践(for mysql)(上篇)

    作者:徐和东 description: 目前关于databus的相关资料较少,特别是针对mysql的文档尤为稀少.本篇文章中介绍了databus相关组件及实现原理,初步实现了databus对mysql ...

  2. 分布式MySQL数据库TDSQL架构分析

     分布式MySQL数据库TDSQL架构分析 发表于11小时前| 次阅读| 来源程序员电子刊| 0 条评论| 作者雷海林 MySQLTDSQL腾讯架构 width="22" he ...

  3. MySQL 高可用架构 之 MHA (Centos 7.5 MySQL 5.7.18 MHA 0.58)

    目录 简介 环境准备 秘钥互信 安装基础依赖包 安装MHA组件 安装 MHA Node组件 安装 MHA Manager 组件 建立 MySQL 一主三从 初始化 MySQL 启动MySQL 并简单配 ...

  4. MySQL集群架构:MHA+MySQL-PROXY+LVS实现MySQL集群架构高可用/高性能-技术流ken

    MHA简介 MHA可以自动化实现主服务器故障转移,这样就可以快速将从服务器晋级为主服务器(通常在10-30s),而不影响复制的一致性,不需要花钱买更多的新服务器,不会有性能损耗,容易安装,不必更改现有 ...

  5. 阿里高可用mysql搭建方案_阿里云环境迁移记录 - MYSQL高可用搭建

    MYSQL高可用方案主要分为两大类,一类是前置管理,一类是后置管理. 前置管理的思想是利用各种前置管理工具,动态切换或者分发请求到不同的实例并切换master,如keepalived.MHA.pack ...

  6. mysql高可用架构介绍_介绍详细的MySQL高可用方案

    概述 MySQL高可用,顾名思义就是当MySQL主机或服务发生任何故障时能够立马有其他主机顶替其工作,并且最低要求是要保证数据一致性.因此,对于一个MySQL高可用系统需要达到的目标有以下几点: 数据 ...

  7. mysql支持arm64架构吗_arm64(aarch64)下使用mysql

    建议直接考虑docker方案,目前官方的mysql server docker支持arm64,注意只有mysql8支持,也就是latest标签的 这个和在x86下最常用的mysql镜像有点区别,简单翻 ...

  8. MySQL性能优化的21个最佳实践 和 mysql使用索引

    1. 为查询缓存优化查询 当有很多相同的查询被执行了多次的时候,这些查询结果会被放到一个缓存中,这样,后续的相同的查询就不用操作表而直接访问缓存结果了. 2. EXPLAIN   SELECT 查询 ...

  9. MySQL 集群 3副本,Kubernetes经典实践——运行MySQL多副本集群

    JFrog 在线课堂 Kubernetes经典实践--运行MySQL多副本集群 课程背景 Kubernetes以其先进的理念.活跃的社区,已成为当前容器集群化编排.部署和运行的事实标准.越来越多的企业 ...

最新文章

  1. linux 文件拷贝并替换,Linux_cmd replace 文件替换使用说明,帮助信息: 复制代码 代码如 - phpStudy...
  2. 博客堂也遇DotText经典Exception
  3. 141. Linked List Cycle
  4. 角谷定理python每次输出数_角谷定理C++递归问题,求问步数为什么总输出0?
  5. css设置不允许复制文本内容
  6. 超融合和服务器关系_关于超融合一体机,联想有话说
  7. 如何使处于不同局域网的计算机实现远程通信_小区自来水二次加压泵站远程监控系统方案...
  8. SQUID优化重要参数
  9. pyltp实体识别_哈工大 PYLTP 安装 排坑指南
  10. 【Java】恶搞程序实现桌面无限弹窗
  11. 使用软件或Python编程时EIS拟合的底层逻辑(EIS拟合的原理/过程)
  12. 高德地图经纬度检索校验
  13. 【渝粤教育】电大中专建设工程法规_1作业 题库
  14. 1.8万字详解实时数仓建设方案
  15. css js 简单的径向菜单学习笔记
  16. 口袋理财:“来了就是深圳人?”全国均价最高的房租了解一下
  17. 从控制台输入两个英文字母,输出这两个英文字母之间的所有的字母(包含大小写)
  18. [转...转] 国内软件破解下载网站列表!
  19. Flare Network,跨越互操作性三难困境
  20. 连接器选型,一不留神就容易踩到的坑

热门文章

  1. (一)推荐算法概述——以协同过滤为主
  2. (最详细)红米Note 3的USB调试模式在哪里开启的教程
  3. exam_UNIX and relational database(0)
  4. php 停留3秒,php怎么实现停留几秒后跳转
  5. 厦门哪里好玩 这几个地方必去
  6. JS鼠标事件onMouseOver和onMouseOut的坑
  7. MySQL中az是什么意思_Mysql这些语句是什么意思?
  8. WIFI智能电子标牌的优势
  9. 微软收购暴雪,这跟元宇宙有锤子关系?
  10. android常用库