在官网第七步

Step 7: 使用 Kafka Connect 来 导入/导出 数据

从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。

Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。在这个快速入门里,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。

首先,我们首先创建一些“种子”数据用来测试,:

echo -e "foo\nbar" > test.txt

windows上:

> echo foo> test.txt
> echo bar>> test.txt

接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

kafka附带了这些示例的配置文件,并且使用了刚才我们搭建的本地集群配置并创建了2个连接器:第一个是源连接器,从输入文件中读取并发布到Kafka主题中,第二个是接收连接器,从kafka主题读取消息输出到外部文件。

在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该读取从

test.txt

和写入到topic

connect-test

,导出连接器从主题

connect-test

读取消息写入到文件

test.sink.txt

. 我们可以通过验证输出文件的内容来验证数据数据已经全部导出:

more test.sink.txtfoobar

注意,导入的数据也已经在Kafka主题

connect-test

里,所以我们可以使用该命令查看这个主题:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器继续处理数据,因此我们可以添加数据到文件并通过管道移动:

echo "Another line" >> test.txt

你应该会看到出现在消费者控台输出一行信息并导出到文件。

出现ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)

解决办法:

1、确保zookeeper和kafka服务启动

2、修改connect-standalone.properties 文件,将bootstrap.servers=localhost:9092改为ip:9092

kafka的connect实现数据写入到kafka和从kafka写出相关推荐

  1. Kafka精品教学(入门,安装,Springboot整合Kafka)

    ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除. 要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Messag ...

  2. flink源码分析_Flink源码分析之深度解读流式数据写入hive

    前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...

  3. Stream流、FiLe和IO流、IO流(字节流-拷贝文件_和_字符流-读取文本中的数据写入文本文件中)9-10-11

    package com.streamdemo; import java.util.ArrayList; import java.util.List; /*** 体验Stream流** 创建一个集合,存 ...

  4. python 写入excel数据xlwt_用python包xlwt将数据写入Excel中

    一般用两种格式的数据写入,不多说放上demo. 1.列表形式写入 import xlwt def data_write(file_path, datas): f = xlwt.Workbook() s ...

  5. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  6. Logstash读取Kafka数据写入HDFS详解

    强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃 通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用lo ...

  7. 1.30.Flink SQL案例将Kafka数据写入hive

    1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...

  8. logstash读取Elasticsearch数据保存为json,logstash接收log数据写入kafka生产者

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  9. Kafka工作原理-数据写入、ACK、查询、消费原理

    为什么需要消息队列 周末无聊刷着手机,某宝网APP突然蹦出来一条消息"为了回馈老客户,女朋友买一送一,活动仅限今天!".买一送一还有这种好事,那我可不能错过!忍不住立马点了去.于是 ...

  10. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

最新文章

  1. mac apache 配置
  2. 目标检测的二十年发展史—从传统方法到深度学习
  3. VMware虚拟机里面无法直接访问主机磁盘怎么办?
  4. random输出1到10之间_第43P,随机数,Python内置库之random
  5. boost::prior用法的测试程序
  6. hello-world
  7. 深入理解 SAP Fiori Elements 工作原理系列之二:如何给 SAP Fiori Elements 应用添加自定义按钮
  8. node 简繁体转换_简体繁体转换
  9. signature=7bfc4e6c1dbcfddf5237122a73885e6d,Bending receiver using heat-shrinkable film
  10. [BZOJ3717] [PA2014] Pakowanie [状态压缩][dp]
  11. mysql高性能学习笔记03_【MySQL】《高性能MySQL》 学习笔记,第三章,服务器性能剖析...
  12. nums和nums[:]
  13. android 自动亮屏解锁,android 点亮手机屏幕与屏幕解锁方法
  14. Android案例手册 - 定位点圆形水波纹和椭圆水波纹
  15. DM达梦数据库使用druid时提示:dbType not support : dm
  16. 基于 Webpack4 的可插拔式微前端架构 - Puzzle
  17. Week 4.2 | Lecture 10 | 多态性设计通用方法、比较器 | CS61B-Spring-2018
  18. 系统日志管理——journalctl
  19. 黑马旅游网——旅游路线详情展示和旅游路线收藏功能(完结)
  20. python | 批量文件名字汉字转拼音

热门文章

  1. 编译器各个步骤的介绍
  2. 【转】Swig 使用指南
  3. js中map和python中的map
  4. 一个关于finally和return的面试题
  5. 网奇iwms插件之“我浏览过的文章”
  6. Windows服务器nginx多个二级域名配置多端口无效问题的解决方案
  7. web集群之Ngnix相关配置
  8. 考虑云计算来降成本:良药或毒药?
  9. 《淘宝网开店 进货 运营 管理 客服 实战200招》——1.11 开店前要做好调查,预测市场...
  10. 十年工龄的程序员为你揭示最危害程序员职业生涯的三大观念