一、camus配置

camus.job.name=Camus Job#hdfs存放路径
etl.destination.path=/user/hive/warehouse/binlog.db
#offsets, error logs, and count files存放路径
etl.execution.base.path=/camus/exec
#完成的jobs的输出路径
etl.execution.history.path=/camus/exec/history#kafka日志解析,因为通过canal存放的日志是json格式的
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# 即core-site.xml中的fs.defaultFS参数
fs.defaultFS=hdfs://nameservice1# 落地到HDFS时的写入器,默认支持Avro、SequenceFile和字符串
# 这里我们采用一个自定义的WriterProvider,代码在后面
#etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.CanalBinlogRecordWriterProvidercamus.message.timestamp.field=es
# 时间戳字段的格式
camus.message.timestamp.format=unix_milliseconds# 时间分区的类型和格式,默认支持小时、天,也可以自定义时间
etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner
etl.destination.path.topic.sub.dirformat='pt_hour'=YYYYMMddHH# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=-1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1#黑白名单
kafka.blacklist.topics=Report2,Trade,caojia,ddmgServer,default-flume-topic,ddmg_biz.test_canal1,ddmg_biz.test_canal
kafka.whitelist.topics=ddmg_biz.test_canal2
log4j.configuration=true#连接名
kafka.client.name=camus#kafka地址
kafka.brokers=xxxxetl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10etl.hourly=hourly
etl.daily=dailyetl.ignore.schema.errors=false# configure output compression for deflate or snappy. Defaults to deflate
mapred.output.compress=false
#etl.output.codec=deflate
#etl.deflate.level=6
#etl.output.codec=snappyetl.default.timezone=Asia/Shanghai
#etl.output.file.time.partition.mins=60
#etl.keep.count.files=false
#etl.execution.history.max.of.quota=.8# Configures a customer reporter which extends BaseReporter to send etl data
#etl.reporter.classmapred.map.max.attempts=1kafka.client.buffer.size=20971520
#kafka.client.buffer.size=20480
kafka.client.so.timeout=60000

二、camus自定义解析代码

package com.linkedin.camus.etl.kafka.common;import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map.Entry;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.etl.IEtlKey;
import com.linkedin.camus.etl.RecordWriterProvider;
import com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat;
import org.apache.hadoop.conf.Configuration;public class CanalBinlogRecordWriterProvider implements RecordWriterProvider {protected String recordDelimiter = null;public static final String ETL_OUTPUT_RECORD_DELIMITER = "etl.output.record.delimiter";public static final String DEFAULT_RECORD_DELIMITER = "\n";private boolean isCompressed = false;private CompressionCodec codec = null;private String extension = "";public CanalBinlogRecordWriterProvider(TaskAttemptContext context) {Configuration conf = context.getConfiguration();if (recordDelimiter == null) {recordDelimiter = conf.get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);}isCompressed = FileOutputFormat.getCompressOutput(context);if (isCompressed) {Class<? extends CompressionCodec> codecClass = null;if ("snappy".equals(EtlMultiOutputFormat.getEtlOutputCodec(context))) {codecClass = SnappyCodec.class;} else if ("gzip".equals((EtlMultiOutputFormat.getEtlOutputCodec(context)))) {codecClass = GzipCodec.class;} else {codecClass = DefaultCodec.class;}codec = ReflectionUtils.newInstance(codecClass, conf);extension = codec.getDefaultExtension();}}static class CanalBinlogRecordWriter extends RecordWriter<IEtlKey, CamusWrapper> {private DataOutputStream outputStream;private String fieldDelimiter;private String rowDelimiter;public CanalBinlogRecordWriter(DataOutputStream outputStream, String fieldDelimiter, String rowDelimiter) {this.outputStream = outputStream;this.fieldDelimiter = fieldDelimiter;this.rowDelimiter = rowDelimiter;}@Overridepublic void write(IEtlKey key, CamusWrapper value) throws IOException, InterruptedException {if (value == null) {return;}String recordStr = (String) value.getRecord();JSONObject record = JSON.parseObject(recordStr, Feature.OrderedField);if (record.getString("isDdl").equals("true")) {return;}JSONArray data = record.getJSONArray("data");for (int i = 0; i < data.size(); i++) {JSONObject obj = data.getJSONObject(i);if (obj != null) {StringBuilder fieldsBuilder = new StringBuilder();fieldsBuilder.append(record.getLong("id"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(record.getLong("es"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(record.getLong("ts"));fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(record.getString("type"));for (Entry<String, Object> entry : obj.entrySet()) {fieldsBuilder.append(fieldDelimiter);fieldsBuilder.append(entry.getValue());}fieldsBuilder.append(rowDelimiter);outputStream.write(fieldsBuilder.toString().getBytes());}}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {outputStream.close();}}@Overridepublic String getFilenameExtension() {return "";}@Overridepublic RecordWriter<IEtlKey, CamusWrapper> getDataRecordWriter(TaskAttemptContext context,String fileName,CamusWrapper data,FileOutputCommitter committer) throws IOException, InterruptedException {Configuration conf = context.getConfiguration();String rowDelimiter = conf.get("etl.output.record.delimiter", "\n");Path path = new Path(committer.getWorkPath(), EtlMultiOutputFormat.getUniqueFile(context, fileName, getFilenameExtension()));FileSystem fs = path.getFileSystem(conf);FSDataOutputStream outputStream = fs.create(path, false);return new CanalBinlogRecordWriter(outputStream, "\t", rowDelimiter);}
}

canal 主配置

# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# zk地址
canal.zkServers = xxx
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = /opt/canal/data
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30#监控过滤
# binlog filter config
#canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false# binlog format/image check
#canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.format = ROW
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256#配置文件夹下的子文件夹名
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml#### 消息队列配置
canal.mq.servers = mq地址
canal.mq.retries = 1
canal.mq.batchSize = 4096
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 10
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all#canal.mq.properties. =
#canal.mq.producerGroup = test   #kafka无意义
# Set this value to "cloud", if you want open message trace feature in aliyun.
#canal.mq.accessChannel = local  #kafka无意义
# aliyun mq namespace
#canal.mq.namespace =

canal 实例配置

## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1234
# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address= mysql 地址和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# username/password 数据库用户名和密码
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=ddmg_biz\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=ddmg_biz
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=.*\\..*
canal.mq.partition=0

结果

使用kafka-consumer进行消费

kafka-console-consumer --bootstrap-server  cdh6:9092,cdh7:9092,cdh8:9092  --from-beginning --topic ddmg_biz.test_canal2

结果如下

{"data":null,"database":"ddmg_biz","es":1585796772000,"id":144,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `ddmg_biz`.`test_canal2`  (\r\n  `id` int(0) NOT NULL,\r\n  `osr` varchar(255) NULL,\r\n  `tesposr` varchar(255) NULL,\r\n  PRIMARY KEY (`id`)\r\n)","sqlType":null,"table":"test_canal2","ts":1585796772708,"type":"CREATE"}
{"data":[{"id":"1","osr":"dfs","tesposr":"bcc"}],"database":"ddmg_biz","es":1585806475000,"id":178,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585806476174,"type":"INSERT"}
{"data":[{"id":"2","osr":"dfs","tesposr":"dcxv"}],"database":"ddmg_biz","es":1585809008000,"id":234,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809008337,"type":"INSERT"}
{"data":[{"id":"3","osr":"dsdsd","tesposr":"dsfs"}],"database":"ddmg_biz","es":1585809856000,"id":238,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809856890,"type":"INSERT"}
{"data":[{"id":"3","osr":"aaa","tesposr":"dsfs"}],"database":"ddmg_biz","es":1585809936000,"id":239,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":[{"osr":"dsdsd"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809936902,"type":"UPDATE"}
{"data":[{"id":"4","osr":"dfs","tesposr":"dddd"}],"database":"ddmg_biz","es":1585809945000,"id":240,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809945419,"type":"INSERT"}
{"data":[{"id":"2","osr":"dfs","tesposr":"dcxv"}],"database":"ddmg_biz","es":1585809949000,"id":241,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585809949729,"type":"DELETE"}
{"data":[{"id":"5","osr":"dsf","tesposr":"aaa"}],"database":"ddmg_biz","es":1585814771000,"id":263,"isDdl":false,"mysqlType":{"id":"int","osr":"varchar(255)","tesposr":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"osr":12,"tesposr":12},"table":"test_canal2","ts":1585814771154,"type":"INSERT"}

运行解析结果如下

 hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar  com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
263  1585814771000   1585814771154   INSERT  5   dsf aaa

canal kafka camus整合相关推荐

  1. Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步

    因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...

  2. canal+Kafka实现mysql与redis数据同步

    前言 上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用.在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化.如果这时候数据库数据发生变更操作,就不 ...

  3. 玩转Kafka—SpringGo整合Kafka

    玩转Kafka-Spring整合Kafka 1 新建Spring Boot项目,增加依赖 <dependencies><dependency><groupId>or ...

  4. 大数据集群搭建(12)——Flume和Kafka的整合

    Flume和Kafka的整合 1.配置flume,在flume的conf目录下新建文件(flume_kafka.conf)并配置.  ################################# ...

  5. canal+kafka部署测试全记录

    canal地址:https://github.com/alibaba/canal 这里只介绍部署,简介原理参见开源介绍. 一.简介 canal1.1.1版本以后,默认支持将canal server接收 ...

  6. mysql实时监听canal+kafka

    1.首先安装并启动mysql mysql5.7版本安装部署详细步骤_怪只怪满眼尽是人间烟火-CSDN博客最新版mysql下载地址:MySQL :: Download MySQL Community S ...

  7. canal kafka 环境搭建

    kafka环境安装 https://segmentfault.com/a/1190000012730949#articleHeader1 canal 环境安装 https://juejin.im/en ...

  8. 基于Canal+kafka监听数据库变化的最佳实践

    1.前言 工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务.我们通常的操作通过业务埋点.接口的调用或者中间件完成. 但是状态变化的入口 ...

  9. Flume+Kafka+SparkStreaming整合

    目录 1.Flume介绍.2 1.1 Flume数据源以及输出方式.2 1.2 Flume的核心概念.2 1.3 Flume结构.2 1.4 Flume安装测试.3 1.5 启动flume4 2.Ka ...

最新文章

  1. 360浏览器卸载_如何卸载360浏览器,如何卸载360安全浏览器
  2. Oracle.ManagedDataAccess.dll 连接Oracle数据库不需要安装客户端
  3. 【BLE】TLSR8258开发记录之10--更改MTU为245
  4. Abiword 编辑事件设计
  5. 全球多媒体视频内容保护最佳实践
  6. 【详细解析】7-1 两个有序序列的中位数 (25 分)
  7. leetcode270. 最接近的二叉搜索树值
  8. 阿里云数字巡展:“云上峰会”背后的秘密武器
  9. Element ui 中的Upload用法
  10. 新一代华为折叠屏手机MateX2,你会考虑入手吗?
  11. 51Nod-1011 最大公约数GCD【欧几里得算法】
  12. java主线程和子线程区别_主线程异常– Java
  13. pycharm关闭pytest模式
  14. HMC5883L电子罗盘/指南针实现,附带校准方法(附STM32 源码)
  15. 在WordPress中嵌入YouTube视频的六种不同方式
  16. excel表格打印每页都有表头_【Excel】打印超长表格,怎么才能每页都显示表头?...
  17. webstorm直接运行js
  18. 千古奇才---埃舍尔
  19. 脖子酸疼怎么办?初探解决方案
  20. 免费edu邮箱申请注册地址

热门文章

  1. 谷歌翻译用不了修复方法记录
  2. [推荐] Chrome谷歌浏览器实时英文字幕插件
  3. 以太网学习(2)-- 网络协议简介
  4. 信息系统项目管理:项目经理担任什么样的角色?
  5. 简历快投啊!!!!!!!!!!!!!!!(转自水木)
  6. yara 模式匹配 android,恶意软件模式匹配利器 – YARA
  7. 百度地图搜索框在弹出层中无法显示问题
  8. 顶部提示数据信息~echarts奇奇怪怪系列
  9. pipe管道实现进程间的通信
  10. springboot集成cas3.5.2