在前一章节中,Billow介绍了如何通过1.1.1以上的canal配置将binlog数据投递到kafka。在实际的生产环境中,我们的kafka很多都会集成Kerberos作为安全认证。那么在本节,Billow将介绍如何通过修改源码使Canal可配置为投递数据到Kerberos认证的Kafka集群。

##1.导入Canal源码
canal已开源到github。下载地址为:https://github.com/alibaba/canal.git
####1.1 在idea中导入git项目。

导入后的项目目录为:

####1.2 修改canal启动类

canal独立版本的入口类为:com.alibaba.otter.canal.deployer.CanalLauncher

在该类的main方法中,做了以下几件事情:
1、加载配置。
2、根据配置启动CanalStater

...
...
logger.info("## load canal configurations");String conf = System.getProperty("canal.conf", "classpath:canal.properties");Properties properties = new Properties();RemoteConfigLoader remoteConfigLoader = null;if (conf.startsWith(CLASSPATH_URL_PREFIX)) {conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));} else {properties.load(new FileInputStream(conf));}
...
...
final CanalStater canalStater = new CanalStater();canalStater.start(properties);

在CanalStater.start方法中,通过配置项初始化MQ的生产者。此处Billow配置为Kafka,因此我们只关注kafka。

在初始化CanalKafkaProducer之后,会读取配置文件中的mq配置。

在canal.properties中的mq配置如下:

##################################################
#########            MQ              #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = falsecanal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "/usr/keytab/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "/usr/keytab/jaas.conf"

其中canal.mq.kafka.kerberos为前缀的配置是Billow的自定义kerberos配置项。说明:

  • canal.mq.kafka.kerberos.enable
    此配置项为true跟false,为true时表示kafka集群开启了kerberos认证,那么会读取接下来的两个配置项内容。

  • canal.mq.kafka.kerberos.krb5FilePath
    此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为kerberos集群中的krb5.conf文件。示例:

[logging]default = FILE:/var/log/krb5libs.logkdc = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.log[libdefaults]default_realm = HADOOP.COMdns_lookup_realm = falsedns_lookup_kdc = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = true[realms]BETA.COM = {kdc = hadoop1.comadmin_server = hadoop1.com}[domain_realm].hadoop1.com = HADOOP.COMhadoop1.com = HADOOP.COM
  • canal.mq.kafka.kerberos.jaasFilePath
    此配置项当canal.mq.kafka.kerberos.enable为true时才会读取,配置为连接kafka时的jaas配置项。示例:
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="E:/resources/billow.keytab"principal="billow@HADOOP.COM"client=true;
};

此处Billow在配置文件中配置了自定义的配置项,那么在代码中,需要添加这几项配置项的读取。
CanalStater的buildMQProperties方法中添加配置项的读取。

/*** 构造MQ对应的配置* * @param properties canal.properties 配置* @return*/private static MQProperties buildMQProperties(Properties properties) {MQProperties mqProperties = new MQProperties();............String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);if (!StringUtils.isEmpty(kafkaKerberosEnable)) {mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));}String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);}String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);}return mqProperties;}

对应的CanalConstants类中,添加常量信息配置:

/*** 启动常用变量** @author jianghang 2012-11-8 下午03:15:55* @version 1.0.0*/
public class CanalConstants {
...
...public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE    = ROOT + "." + "mq.kafka.kerberos.enable";public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
...
...

####1.3 配置CanalKafkaProducer
上一小节中,Billow介绍了如何添加关于Kerberos的开关配置。在这节我们来看看如何配置kafkaProducer为安全模式。

观察源码发现,在CanalStater的start方法中初始化了一个CanalKafkaProducer对象。在此对象的init方法里面,有关于kafkaproduct的相关配置。
在此处,Billow添加了判断,如果配置文件中开启了kerberos认证,那么就会配置kafkaProperty为安全模式。并添加了系统环境配置。

 if (kafkaProperties.isKerberosEnable()){//kafka集群开启了kerberos认证System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");properties.put("security.protocol", "SASL_PLAINTEXT");properties.put("sasl.kerberos.service.name", "kafka");}

具体位置为:

public class CanalKafkaProducer implements CanalMQProducer {...
...@Overridepublic void init(MQProperties kafkaProperties) {this.kafkaProperties = kafkaProperties;Properties properties = new Properties();properties.put("bootstrap.servers", kafkaProperties.getServers());properties.put("acks", kafkaProperties.getAcks());properties.put("compression.type", kafkaProperties.getCompressionType());properties.put("batch.size", kafkaProperties.getBatchSize());properties.put("linger.ms", kafkaProperties.getLingerMs());properties.put("max.request.size", kafkaProperties.getMaxRequestSize());properties.put("buffer.memory", kafkaProperties.getBufferMemory());properties.put("key.serializer", StringSerializer.class.getName());if(kafkaProperties.getTransaction()){properties.put("transactional.id", "canal-transactional-id");} else {properties.put("retries", kafkaProperties.getRetries());}if (kafkaProperties.isKerberosEnable()){//kafka集群开启了kerberos认证System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");properties.put("security.protocol", "SASL_PLAINTEXT");properties.put("sasl.kerberos.service.name", "kafka");}if (!kafkaProperties.getFlatMessage()) {properties.put("value.serializer", MessageSerializer.class.getName());producer = new KafkaProducer<String, Message>(properties);} else {properties.put("value.serializer", StringSerializer.class.getName());producer2 = new KafkaProducer<String, String>(properties);}if (kafkaProperties.getTransaction()) {if (!kafkaProperties.getFlatMessage()) {producer.initTransactions();} else {producer2.initTransactions();}}}...
...
}

##2、测试
修改好源码后,编译打包。

mvn clean install -Dmaven.test.skip -Denv=release

命令执行成功后会在项目的target文件夹下面生成压缩包:

将deployer包拷贝至服务器,配置好集群环境的krb5.conf、jaas.conf以及canal.properties文件。启动canal,查看日志,并启动kafka消费者进行数据的消费。
Billow已测试成功,有不懂的童鞋可以私信公众号问~

阿里开源Canal--⑤投递到Kerberos认证的Kafka相关推荐

  1. CDH Kerberos 认证下Kafka 消费方式

    集群Kerberos认证安装参考:https://datamining.blog.csdn.net/article/details/98480008 目录 环境: 配置 Java Producer 代 ...

  2. 【kafka】kerberos认证下 kafka 报错Bootstrap broker host:ip (id: -1 rack: null) disconnected

    文章目录 1.概述 1.概述 本博文中的IP都是随便写的. 公司整了一个新的环境,然后我要进行适配,结果发现对方有认证,无法直接使用,使用控制台生产和消费的时候报错. [domain_realm][r ...

  3. matlab 写入 MYSQL_阿里开源MySQL中间件Canal快速入门

    前言 距离上一篇文章发布又过去了两周,这次先填掉上一篇秒杀系统文章结尾处开的坑,介绍一下数据库中间件Canal的使用. 「Canal用途很广,并且上手非常简单,小伙伴们在平时完成公司的需求时,很有可能 ...

  4. 实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

    数据同步一直是一个令人头疼的问题.在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方. 但是随着业务 ...

  5. Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!

    数据同步一直是一个令人头疼的问题.在业务量小,场景不多,数据量不大的情况下我们可能会选择在项目中直接写一些定时任务手动处理数据,例如从多个表将数据查出来,再汇总处理,再插入到相应的地方. 但是随着业务 ...

  6. 阿里开源数据同步组件Canal

    一.简介 canal是阿里开源的数据同步组件 这个是是git地址 二.使用步骤 1.安装配置mysql 安装一个数据库(这个数据库是被监听的对象,我这里用的是mysql5.7) 创建一个用户专门用于数 ...

  7. 开发者干货合集!阿里开源,移动开发,机器学习等海量资源限时开放!...

    2019阿里云云上Hi购季活动已经于2月25日正式开启,从已开放的活动页面来看,活动分为三个阶段: 2月25日-3月04日的活动报名阶段.3月04日-3月16日的新购满返+5折抢购阶段.3月16日-3 ...

  8. 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战

    Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...

  9. 阿里开源:思考,演进和发展

    2016云栖大会·北京峰会于8月9号在国家会议中心拉开帷幕,在云栖社区开发者技术专场中,首位登场的阿里云技术专家唐容为大家带来来题为<阿里开源:思考,演进和发展>的精彩演讲. 关于分享者: ...

最新文章

  1. 非常好用的Python图像增强工具,适用多个框架
  2. 在64位Ubuntu上编译32位程序常见错误
  3. 吉士丁与新潮传媒达成亿级战略合作,打造国产奶酪新势力
  4. 【C语言】数据结构C语言版 实验7 二叉树
  5. dw2020表格不可见_【分享表格模板】手把手教你算小目标实际本金和综合年化收益率...
  6. Python 使用标准库根据进程名获取进程PID
  7. R-CNN学习笔记5:Faster R-CNN
  8. 水泵smart200编程_第453期丨PLC梯形图编程很low?星三角启动,转换角型时电机反转是真的吗?...
  9. c#窗体程序 内嵌浏览器
  10. 用java写图形验证码,超级简单
  11. mac appium环境搭建
  12. 音乐专业如何利用计算机思维,太神奇了!带学生“玩音乐”居然可以打开思维创新...
  13. 直通车点击软件测试自学,如何用直通车测出高点击好图
  14. html中单元格向下合并单元格,html中单元格合并 HTML 怎么给合并单元格设置宽度...
  15. python图像处理(三)波形叠加模拟
  16. Specificity and sensitivity
  17. 不花钱一样可以引流获客?这6招功劳不小
  18. C语言中的整型变量与实行常量
  19. 计算机应用基础教学内容,计算机应用基础教学大纲
  20. 10进制转37进制c语言程序,十进制数37转换成二进制数是( )。A.(100001)2B.(100101)2C.(101001)2D.(110001)2_考题宝...

热门文章

  1. Mac 安装Jupyter Notebook和使用Jupyter Notebook
  2. anaconda :An unexpected error has occurred. Conda has prepared the above report
  3. VCS仿真和多个test用urg工具生成coverage文件verdi查看--转载
  4. c语言异或运算编程,C语言异或运算的一些特性及巧妙应用
  5. python升级或降级pip版本以及镜像源的设置
  6. 怎么给excel表格加密_excel如何给表格加密?
  7. C17500铍钴铜棒C17500铍钴铜锻件C17500圆饼
  8. python 读取pdf图片_使用Python从pdf中提取图像
  9. 解决cmd运行Java程序乱码问题
  10. 机器学习笔记(6.1)