这里面介绍一下kafka connect的一些使用。

kafka connect的使用

一、在config目录下面复制一个file-srouce.properties并且修改内容

huhx@gohuhx:~/server/kafka_2.11-1.1.0/config$ cp connect-file-source.properties connect-file-source-test.propertieshuhx@gohuhx:~/server/kafka_2.11-1.1.0/config$ cp connect-standalone.properties connect-standalone-test.properties

修改huhx目录下面的connect-standalone-test.properties文件里面的内容如下:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

connect-file-source-test.properties的内容如下:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/home/huhx/Documents/linux.txt
topic=connect-linuxtransforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

指定了topic为connect-test,指定了读取的文件为/home/huhx/Documents/linux.txt。其中linux.txt的内容如下

I love you.
my name is huhx.
code for me?

二、运行connect-standalone.sh命令,可以将对huhx.txt文件处理之后的内容发布到connect-test的topic上

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/connect-standalone.sh config/connect-standalone-test.properties config/connect-file-source-test.properties

运行之后,我们在connect-test主题里面可以看到如下的输出:

huhx@gohuhx:~/server/kafka_2.11-1.1.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic connect-linux    --from-beginning
{"line":"I love you.","data_source":"test-file-source"}
{"line":"my name is huhx.","data_source":"test-file-source"}
{"line":"code for me?","data_source":"test-file-source"}

关于上述配置的transforms可以参考下述的文档https://kafka.apache.org/documentation/#connect

友情链接

转载于:https://www.cnblogs.com/huhx/p/baseusekafkaconnect1.html

kafka----kafka connect的使用(一)相关推荐

  1. [Kafka] Kafka基本架构

    [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 [Kafka] Kafka基本架构 生产者Producer :生产信息: 消费者Consumer :订阅主题.消费信息: 代理B ...

  2. kafka connect_Kafka Connect在MapR上

    kafka connect 在本周的白板演练中,MapR的高级产品营销经理Ankur Desai描述了Apache Kafka Connect和REST API如何简化和提高在处理来自包括旧数据库或数 ...

  3. [kafka]kafka集群实践

    环境 ip hostname server_id 192.168.1.111 UAT04 2 192.168.1.112 UAT03 1 192.168.1.102 UAT05 3 配置hosts: ...

  4. Kafka : Kafka入门教程和JAVA客户端使用

    目录 目录 Kafka简介 环境介绍 术语介绍 消费模式 下载 集群安装配置 命令使用 JAVA实战 参考文献 Kafka简介 由Scala和Java编写,Kafka是一种高吞吐量的分布式发布订阅消息 ...

  5. [Big Data - Kafka] kafka学习笔记:知识点整理

    一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许 ...

  6. linux上卸载kafka,kafka安装在linux上的安装

    kafka安装 第一关 java的安装 捞得嘛,不谈 第二关 zookeeper的安装及配置 1. 直接打开Apach zookeeper进行下载 Tips: source 是源文件,需要编译后才能继 ...

  7. 部署kafka kafka的service容器和zookeeper kafka客户端 Elasticsearch的客户端

    创建network docker network create -d overlay --attachable loc_net   docker stack up -c  kafka.yml kafk ...

  8. [Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  9. 【kafka】kafka Kafka分区leader迁移

    1.概述 本篇文章是:https://articles.zsxq.com/id_7srom6n6b947.html 文章的读后感,文章中写的不详细,想做一个详细的案例. 2.原文 月 2021年10月 ...

  10. 【kafka】JMX 监控kafka kafka rmi NoSuchObjectException no such object in table

    1.背景 使用Jmx监控kafka相关信息,但是运行的时候报错如下 我的代码大致逻辑是 JMXServiceUrl jmx = new JMXServiceUrl(url) JMXConnector ...

最新文章

  1. 《Linux命令行与shell脚本编程大全 第3版》Shell脚本编程基础---43
  2. 如何设计一款大学生找实习的APP?
  3. JVM之类加载器ClassLoader
  4. elasticsearch api中的Delete By Query API操作
  5. linux proftpd mysql_Linux安装ProFTPd、MySQL以及Apache后有感
  6. Hue添加RDBMS(关系型数据库)
  7. 防止头文件重复包含之pragma once与#ifndef
  8. 概率论——随机变量和的期望
  9. ffmpeg转码html5,FFMPEG转码技术在HTML5视频系统中的研究与应用
  10. 三角矩阵的逆矩阵怎么求_「线性代数」求可逆矩阵P,使得相似矩阵对角化
  11. 统一门户建设项目最佳实践
  12. [转载]JSP利用组件实现文件上传的全攻略
  13. linux命令大全 load,linux命令TOP参数load average详解[转]
  14. 有房没房,日子过的都是心态
  15. 初中级前端面试复习总结(浏览器、HTTP、前端安全)
  16. js动态修改浏览器title标题
  17. c strtok函数用法
  18. form表单AJAX提交
  19. 现代统计的思想飞跃,因果推断!
  20. crash处理core文件

热门文章

  1. 系统学习深度学习(三十二)--Double DQN (DDQN)
  2. npoi 未将对象引用设置到对象的实例_new一个对象到底占了多少内存?
  3. 【数据库原理实验(openGauss)】视图
  4. 我们该用什么分布式数据库?
  5. UDP协议的收发操作
  6. clickhouse MergeTree系列引擎
  7. Linux内核开发_内核模块
  8. HTTP协议下GET与POST的区别
  9. Failure to find com.rongpd:rpd:pom:1.0 in xxx was cached in the local repository, resolution will no
  10. Java mongo入门