kafka的connect实现数据写入到kafka和从kafka写出
在官网第七步
Step 7: 使用 Kafka Connect 来 导入/导出 数据
从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。
Kafka Connect
是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。在这个快速入门里,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。
首先,我们首先创建一些“种子”数据用来测试,:
echo -e "foo\nbar" > test.txt
windows上:
> echo foo> test.txt
> echo bar>> test.txt
接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
kafka附带了这些示例的配置文件,并且使用了刚才我们搭建的本地集群配置并创建了2个连接器:第一个是源连接器,从输入文件中读取并发布到Kafka主题中,第二个是接收连接器,从kafka主题读取消息输出到外部文件。
在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该读取从
test.txt
和写入到topic
connect-test
,导出连接器从主题
connect-test
读取消息写入到文件
test.sink.txt
. 我们可以通过验证输出文件的内容来验证数据数据已经全部导出:
more test.sink.txtfoobar
注意,导入的数据也已经在Kafka主题
connect-test
里,所以我们可以使用该命令查看这个主题:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器继续处理数据,因此我们可以添加数据到文件并通过管道移动:
echo "Another line" >> test.txt
你应该会看到出现在消费者控台输出一行信息并导出到文件。
出现ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)
解决办法:
1、确保zookeeper和kafka服务启动
2、修改connect-standalone.properties 文件,将bootstrap.servers=localhost:9092改为ip:9092
kafka的connect实现数据写入到kafka和从kafka写出相关推荐
- Kafka精品教学(入门,安装,Springboot整合Kafka)
ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除. 要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Messag ...
- flink源码分析_Flink源码分析之深度解读流式数据写入hive
前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...
- Stream流、FiLe和IO流、IO流(字节流-拷贝文件_和_字符流-读取文本中的数据写入文本文件中)9-10-11
package com.streamdemo; import java.util.ArrayList; import java.util.List; /*** 体验Stream流** 创建一个集合,存 ...
- python 写入excel数据xlwt_用python包xlwt将数据写入Excel中
一般用两种格式的数据写入,不多说放上demo. 1.列表形式写入 import xlwt def data_write(file_path, datas): f = xlwt.Workbook() s ...
- kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...
- Logstash读取Kafka数据写入HDFS详解
强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃 通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用lo ...
- 1.30.Flink SQL案例将Kafka数据写入hive
1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...
- logstash读取Elasticsearch数据保存为json,logstash接收log数据写入kafka生产者
[提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...
- Kafka工作原理-数据写入、ACK、查询、消费原理
为什么需要消息队列 周末无聊刷着手机,某宝网APP突然蹦出来一条消息"为了回馈老客户,女朋友买一送一,活动仅限今天!".买一送一还有这种好事,那我可不能错过!忍不住立马点了去.于是 ...
- flink实时消费kafka中oracle的DML数据写入mysql
1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...
最新文章
- mac apache 配置
- 目标检测的二十年发展史—从传统方法到深度学习
- VMware虚拟机里面无法直接访问主机磁盘怎么办?
- random输出1到10之间_第43P,随机数,Python内置库之random
- boost::prior用法的测试程序
- hello-world
- 深入理解 SAP Fiori Elements 工作原理系列之二:如何给 SAP Fiori Elements 应用添加自定义按钮
- node 简繁体转换_简体繁体转换
- signature=7bfc4e6c1dbcfddf5237122a73885e6d,Bending receiver using heat-shrinkable film
- [BZOJ3717] [PA2014] Pakowanie [状态压缩][dp]
- mysql高性能学习笔记03_【MySQL】《高性能MySQL》 学习笔记,第三章,服务器性能剖析...
- nums和nums[:]
- android 自动亮屏解锁,android 点亮手机屏幕与屏幕解锁方法
- Android案例手册 - 定位点圆形水波纹和椭圆水波纹
- DM达梦数据库使用druid时提示:dbType not support : dm
- 基于 Webpack4 的可插拔式微前端架构 - Puzzle
- Week 4.2 | Lecture 10 | 多态性设计通用方法、比较器 | CS61B-Spring-2018
- 系统日志管理——journalctl
- 黑马旅游网——旅游路线详情展示和旅游路线收藏功能(完结)
- python | 批量文件名字汉字转拼音