AI前线导读:Kafka Connect是一个简单但功能强大的工具,可用于Kafka和其他系统之间的集成。人们对Kafka Connect最常见的误解之一是它的转换器。这篇文章将告诉我们如何正确地使用消息的序列化格式,以及如何在Kafka Connect连接器中对其进行标准化。

Kafka Connect是Apache Kafka的一部分,为其他数据存储和Kafka提供流式集成。对于数据工程师来说,他们只需要配置一下JSON文件就可以了。Kafka提供了一些可用于常见数据存储的连接器,如JDBC、Elasticsearch、IBM MQ、S3和BigQuery,等等。

对于开发人员来说,Kafka Connect提供了丰富的API,如果有必要还可以开发其他连接器。除此之外,它还提供了用于配置和管理连接器的REST API。

Kafka Connect是一种模块化组件,提供了一种非常强大的集成方法。一些关键组件包括:

  • 连接器——定义如何与数据存储集成的JAR文件;
  • 转换器——处理数据的序列化和反序列化;
  • 变换——可选的运行时消息操作。

人们对Kafka Connect最常见的误解与数据的序列化有关。Kafka Connect使用转换器处理数据序列化。接下来让我们看看它们是如何工作的,并说明如何解决一些常见问题。

Kafka消息都是字节

Kafka消息被保存在主题中,每条消息就是一个键值对。当它们存储在Kafka中时,键和值都只是字节。Kafka因此可以适用于各种场景,但这也意味着开发人员需要决定如何序列化数据。

在配置Kafka Connect时,序列化格式是最关键的配置选项之一。你需要确保从主题读取数据时使用的序列化格式与写入主题的序列化格式相同,否则就会出现混乱和错误!

序列化格式有很多种,常见的包括:

  • JSON;
  • Avro;
  • Protobuf;
  • 字符串分隔(如CSV)。

选择序列化格式

选择序列化格式的一些指导原则:

  • schema。很多时候,你的数据都有对应的schema。你可能不喜欢,但作为开发人员,你有责任保留和传播schema。schema为服务之间提供了一种契约。某些消息格式(例如Avro和Protobuf)具有强大的schema支持,而其他消息格式支持较少(JSON)或根本没有(CVS)。
  • 生态系统兼容性。Avro是Confluent平台的一等公民,拥有来自Confluent Schema Registry、Kafka Connect、KSQL的原生支持。另一方面,Protobuf依赖社区为部分功能提供支持。
  • 消息大小。JSON是纯文本的,并且依赖了Kafka本身的压缩机制,Avro和Protobuf都是二进制格式,序列化的消息体积更小。
  • 语言支持。Avro在Java领域得到了强大的支持,但如果你的公司不是基于Java的,那么可能会觉得它不太好用。

如果目标系统使用JSON,Kafka主题也必须使用JSON吗?

完全不需要这样。从数据源读取数据或将数据写入外部数据存储的格式不需要与Kafka消息的序列化格式一样。

Kafka Connect中的连接器负责从源数据存储(例如数据库)获取数据,并以数据内部表示将数据传给转换器。然后,Kafka Connect的转换器将这些源数据对象序列化到主题上。

在使用Kafka Connect作为接收器时刚好相反——转换器将来自主题的数据反序列化为内部表示,传给连接器,以便能够使用特定于目标的适当方法将数据写入目标数据存储。

也就是说,主题数据可以是Avro格式,当你将数据写入HDFS时,指定接收器的连接器使用HDFS支持的格式即可。

配置转换器

Kafka Connect默认使用了worker级别的转换器配置,连接器可以对其进行覆盖。由于在整个管道中使用相同的序列化格式通常会更好,所以一般只需要在worker级别设置转换器,而不需要在连接器中指定。但你可能需要从别人的主题拉取数据,而他们使了用不同的序列化格式——对于这种情况,你需要在连接器配置中设置转换器。即使你在连接器的配置中进行了覆盖,它仍然是执行实际任务的转换器。

好的连接器一般不会序列化或反序列化存储在Kafka中的消息,它会让转换器完成这项工作。

请记住,Kafka消息是键值对字节,你需要使用key.converter和value.converter为键和值指定转换器。在某些情况下,你可以为键和值使用不同的转换器。

这是使用String转换器的一个示例。

\u0026quot;key.converter\u0026quot;: \u0026quot;org.apache.kafka.connect.storage.StringConverter\u0026quot;,

有些转换器有一些额外的配置。对于Avro,你需要指定Schema Registry。对于JSON,你需要指定是否希望Kafka Connect将schema嵌入到JSON消息中。在指定特定于转换器的配置时,请始终使用key.converter.或value.converter.前缀。例如,要将Avro用于消息载荷,你需要指定以下内容:

\u0026quot;value.converter\u0026quot;: \u0026quot;io.confluent.connect.avro.AvroConverter\u0026quot;,\u0026quot;value.converter.schema.registry.url\u0026quot;: \u0026quot;http://schema-registry:8081\u0026quot;,

常见的转换器包括:

  • Avro——来自Confluent的开源项目
io.confluent.connect.avro.AvroConverter
  • String——Apache Kafka的一部分
org.apache.kafka.connect.storage.StringConverter
  • JSON——Apache Kafka的一部分
org.apache.kafka.connect.json.JsonConverter
  • ByteArray——Apache Kafka的一部分
org.apache.kafka.connect.converters.ByteArrayConverter
  • Protobuf——来自社区的开源项目
com.blueapron.connect.protobuf.ProtobufConverter

JSON和schema

虽然JSON默认不支持嵌入schema,但Kafka Connect提供了一种可以将schema嵌入到消息中的特定JSON格式。由于schema被包含在消息中,因此生成的消息大小可能会变大。

如果你正在设置Kafka Connect源,并希望Kafka Connect在写入Kafka消息包含schema,可以这样:

value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true

生成的Kafka消息看起来像下面这样,其中包含schema和payload节点元素:

{  \u0026quot;schema\u0026quot;: {    \u0026quot;type\u0026quot;: \u0026quot;struct\u0026quot;,    \u0026quot;fields\u0026quot;: [      {        \u0026quot;type\u0026quot;: \u0026quot;int64\u0026quot;,        \u0026quot;optional\u0026quot;: false,        \u0026quot;field\u0026quot;: \u0026quot;registertime\u0026quot;      },      {        \u0026quot;type\u0026quot;: \u0026quot;string\u0026quot;,        \u0026quot;optional\u0026quot;: false,        \u0026quot;field\u0026quot;: \u0026quot;userid\u0026quot;      },      {        \u0026quot;type\u0026quot;: \u0026quot;string\u0026quot;,        \u0026quot;optional\u0026quot;: false,        \u0026quot;field\u0026quot;: \u0026quot;regionid\u0026quot;      },      {        \u0026quot;type\u0026quot;: \u0026quot;string\u0026quot;,        \u0026quot;optional\u0026quot;: false,        \u0026quot;field\u0026quot;: \u0026quot;gender\u0026quot;      }    ],    \u0026quot;optional\u0026quot;: false,    \u0026quot;name\u0026quot;: \u0026quot;ksql.users\u0026quot;  },  \u0026quot;payload\u0026quot;: {    \u0026quot;registertime\u0026quot;: 1493819497170,    \u0026quot;userid\u0026quot;: \u0026quot;User_1\u0026quot;,    \u0026quot;regionid\u0026quot;: \u0026quot;Region_5\u0026quot;,    \u0026quot;gender\u0026quot;: \u0026quot;MALE\u0026quot;  }}

请注意消息的大小,消息由playload和schema组成。每条消息中都会重复这些数据,这也就是为什么说Avro这样的格式会更好,因为它的schema是单独存储的,消息中只包含载荷(并进行了压缩)。

如果你正在使用Kafka Connect消费Kafka主题中的JSON数据,那么就需要知道数据是否包含了schema。如果包含了,并且它的格式与上述的格式相同,那么你可以这样设置:

value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=true

不过,如果你正在消费的JSON数据如果没有schema加payload这样的结构,例如:

{  \u0026quot;registertime\u0026quot;: 1489869013625,  \u0026quot;userid\u0026quot;: \u0026quot;User_1\u0026quot;,  \u0026quot;regionid\u0026quot;: \u0026quot;Region_2\u0026quot;,  \u0026quot;gender\u0026quot;: \u0026quot;OTHER\u0026quot;}

那么你必须通过设置schemas.enable = false告诉Kafka Connect不要查找schema:

value.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false

和之前一样,转换器配置选项(这里是schemas.enable)需要使用前缀key.converter或value.converter。

常见错误

如果你错误地配置了转换器,将会遇到以下的一些常见错误。这些消息将显示在你为Kafka Connect配置的接收器中,因为你试图在接收器中反序列化Kafka消息。这些错误会导致连接器失败,主要错误消息如下所示:

ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)

在错误消息的后面,你将看到堆栈信息,描述了发生错误的原因。请注意,对于连接器中的任何致命错误,都会抛出上述异常,因此你可能会看到与序列化无关的错误。要快速查看错误配置可能会导致的错误,请参考下表:

问题:使用JsonConverter读取非JSON数据

如果你的源主题上有非JSON数据,但尝试使用JsonConverter读取它,你将看到:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:…org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)

这有可能是因为源主题使用了Avro或其他格式。

解决方案:如果数据是Avro格式的,那么将Kafka Connect接收器的配置改为:

\u0026quot;value.converter\u0026quot;: \u0026quot;io.confluent.connect.avro.AvroConverter\u0026quot;,\u0026quot;value.converter.schema.registry.url\u0026quot;: \u0026quot;http://schema-registry:8081\u0026quot;,

或者,如果主题数据是通过Kafka Connect填充的,那么你也可以这么做,让上游源也发送JSON数据:

\u0026quot;value.converter\u0026quot;: \u0026quot;org.apache.kafka.connect.json.JsonConverter\u0026quot;,\u0026quot;value.converter.schemas.enable\u0026quot;: \u0026quot;false\u0026quot;,

问题:使用AvroConverter读取非Avro数据

这可能是我在Confluent Community邮件组和Slack组等地方经常看到的错误。当你尝试使用Avro转换器从非Avro主题读取数据时,就会发生这种情况。这包括使用Avro序列化器而不是Confluent Schema Registry的Avro序列化器(它有自己的格式)写入的数据。

org.apache.kafka.connect.errors.DataException: my-topic-nameat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)…org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

解决方案:检查源主题的序列化格式,修改Kafka Connect接收器连接器,让它使用正确的转换器,或将上游格式切换为Avro。如果上游主题是通过Kafka Connect填充的,则可以按如下方式配置源连接器的转换器:

\u0026quot;value.converter\u0026quot;: \u0026quot;io.confluent.connect.avro.AvroConverter\u0026quot;,\u0026quot;value.converter.schema.registry.url\u0026quot;: \u0026quot;http://schema-registry:8081\u0026quot;,

问题:没有使用预期的schema/payload结构读取JSON消息

如前所述,Kafka Connect支持包含载荷和schema的JSON消息。如果你尝试读取不包含这种结构的JSON数据,你将收到这个错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \u0026quot;schema\u0026quot; and \u0026quot;payload\u0026quot; fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

需要说明的是,当schemas.enable=true时,唯一有效的JSON结构需要包含schema和payload这两个顶级元素(如上所示)。

如果你只有简单的JSON数据,则应将连接器的配置改为:

\u0026quot;value.converter\u0026quot;: \u0026quot;org.apache.kafka.connect.json.JsonConverter\u0026quot;,\u0026quot;value.converter.schemas.enable\u0026quot;: \u0026quot;false\u0026quot;,

如果要在数据中包含schema,可以使用Avro(推荐),也可以修改上游的Kafka Connect配置,让它在消息中包含schema:

\u0026quot;value.converter\u0026quot;: \u0026quot;org.apache.kafka.connect.json.JsonConverter\u0026quot;,\u0026quot;value.converter.schemas.enable\u0026quot;: \u0026quot;true\u0026quot;,

故障排除技巧

查看Kafka Connect日志

要在Kafka Connect中查找错误日志,你需要找到Kafka Connect工作程序的输出。这个位置取决于你是如何启动Kafka Connect的。有几种方法可用于安装Kafka Connect,包括Docker、Confluent CLI、systemd和手动下载压缩包。你可以这样查找日志的位置:

  • Docker:docker logs container_name;
  • Confluent CLI:confluent log connect;
  • systemd:日志文件在/var/log/confluent/kafka-connect;
  • 其他:默认情况下,Kafka Connect将其输出发送到stdout,因此你可以在启动Kafka Connect的终端中找到它们。

查看Kafka Connect配置文件

  • Docker——设置环境变量,例如在Docker Compose中:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverterCONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverterCONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  • Confluent CLI——使用配置文件etc/schema-registry/connect-avro-distributed.properties;
  • systemd(deb/rpm)——使用配置文件/etc/kafka/connect-distributed.properties;
  • 其他——在启动Kafka Connect时指定工作程序的属性文件,例如:
$ cd confluent-5.0.0$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

检查Kafka主题

假设我们遇到了上述当中的一个错误,并且想要解决Kafka Connect接收器无法从主题读取数据的问题。

我们需要检查正在被读取的数据,并确保它使用了正确的序列化格式。另外,所有消息都必须使用这种格式,所以不要假设你现在正在以正确的格式向主题发送消息就不会出问题。Kafka Connect和其他消费者也会从主题上读取已有的消息。

下面,我将使用命令行进行故障排除,当然也可以使用其他的一些工具:

  • Confluent Control Center提供了可视化检查主题内容的功能;
  • KSQL的PRINT命令将主题的内容打印到控制台;
  • Confluent CLI工具提供了consume命令,可用于读取字符串和Avro数据。

如果你的数据是字符串或JSON格式

你可以使用控制台工具,包括kafkacat和kafka-console-consumer。我个人的偏好是使用kafkacat:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1{\u0026quot;registertime\u0026quot;:1493356576434,\u0026quot;userid\u0026quot;:\u0026quot;User_8\u0026quot;,\u0026quot;regionid\u0026quot;:\u0026quot;Region_2\u0026quot;,\u0026quot;gender\u0026quot;:\u0026quot;MALE\u0026quot;}

你也可以使用jq验证和格式化JSON:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq '.'{  \u0026quot;registertime\u0026quot;: 1493356576434,  \u0026quot;userid\u0026quot;: \u0026quot;User_8\u0026quot;,  \u0026quot;regionid\u0026quot;: \u0026quot;Region_2\u0026quot;,  \u0026quot;gender\u0026quot;: \u0026quot;MALE\u0026quot;}

如果你得到一些“奇怪的”字符,你查看的很可能是二进制数据,这些数据是通过Avro或Protobuf写入的:

$ kafkacat -b localhost:9092 -t users-avro -C -c1ڝ���VUser_9Region_MALE

如果你的数据是Avro格式

你应该使用专为读取和反序列化Avro数据而设计的控制台工具。我使用的是kafka-avro-console-consumer。确保指定了正确的Schema Registry URL:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \\                              --property schema.registry.url=http://localhost:8081 \\                              --topic users-avro \\                              --from-beginning --max-messages 1{\u0026quot;registertime\u0026quot;:1505213905022,\u0026quot;userid\u0026quot;:\u0026quot;User_5\u0026quot;,\u0026quot;regionid\u0026quot;:\u0026quot;Region_4\u0026quot;,\u0026quot;gender\u0026quot;:\u0026quot;FEMALE\u0026quot;}

和之前一样,如果要格式化,可以使用jq:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \\                              --property schema.registry.url=http://localhost:8081 \\                              --topic users-avro \\                              --from-beginning --max-messages 1 | \\                              jq '.'{  \u0026quot;registertime\u0026quot;: 1505213905022,  \u0026quot;userid\u0026quot;: \u0026quot;User_5\u0026quot;,  \u0026quot;regionid\u0026quot;: \u0026quot;Region_4\u0026quot;,  \u0026quot;gender\u0026quot;: \u0026quot;FEMALE\u0026quot;}

内部转换器

在分布式模式下运行时,Kafka Connect使用Kafka来存储有关其操作的元数据,包括连接器配置、偏移量等。

可以通过internal.key.converter/internal.value.converter让这些Kafka使用不同的转换器。不过这些设置只在内部使用,实际上从Apache Kafka 2.0开始就已被弃用。你不应该更改这些配置,从Apache Kafka 2.0版开始,如果你这么做了将会收到警告。

将schema应用于没有schema的消息

很多时候,Kafka Connect会从已经存在schema的地方引入数据,并使用合适的序列化格式(例如Avro)来保留这些schema。然后,这些数据的所有下游用户都可以使用这些schema。但如果没有提供显式的schema该怎么办?

或许你正在使用FileSourceConnector从普通文件中读取数据(不建议用于生产环境中,但可用于PoC),或者正在使用REST连接器从REST端点提取数据。由于它们都没有提供schema,因此你需要声明它。

有时候你只想传递你从源读取的字节,并将它们保存在一个主题上。但大多数情况下,你需要schema来使用这些数据。在摄取时应用一次schema,而不是将问题推到每个消费者,这才是一种更好的处理方式。

你可以编写自己的Kafka Streams应用程序,将schema应用于Kafka主题中的数据上,当然你也可以使用KSQL。下面让我们来看一下将schema应用于某些CSV数据的简单示例。

假设我们有一个Kafka主题testdata-csv,保存着一些CSV数据,看起来像这样:

$ kafkacat -b localhost:9092 -t testdata-csv -C1,Rick Astley,Never Gonna Give You Up2,Johnny Cash,Ring of Fire

我们可以猜测它有三个字段,可能是:

  • ID
  • Artist
  • Song

如果我们将数据保留在这样的主题中,那么任何想要使用这些数据的应用程序——无论是Kafka Connect接收器还是自定义的Kafka应用程序——每次都需要都猜测它们的schema是什么。或者,每个消费应用程序的开发人员都需要向提供数据的团队确认schema是否发生变更。正如Kafka可以解耦系统一样,这种schema依赖让团队之间也有了硬性耦合,这并不是一件好事。

因此,我们要做的是使用KSQL将schema应用于数据上,并使用一个新的派生主题来保存schema。这样你就可以通过KSQL检查主题数据:

ksql\u0026gt; PRINT 'testdata-csv' FROM BEGINNING;Format:STRING11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire

前两个字段(11/6/18 2:41:23 PM UTC和NULL)分别是Kafka消息的时间戳和键。其余字段来自CSV文件。现在让我们用KSQL注册这个主题并声明schema:

ksql\u0026gt; CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \\WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');Message----------------Stream created----------------

可以看到,KSQL现在有一个数据流schema:

ksql\u0026gt; DESCRIBE TESTDATA_CSV;Name                 : TESTDATA_CSV Field   | Type------------------------------------- ROWTIME | BIGINT (system) ROWKEY  | VARCHAR(STRING) (system) ID      | INTEGER ARTIST  | VARCHAR(STRING) SONG    | VARCHAR(STRING)-------------------------------------For runtime statistics and query details run: DESCRIBE EXTENDED \u0026lt;Stream,Table\u0026gt;;

可以通过查询KSQL流来检查数据是否符合预期。请注意,这个时候我们只是作为现有Kafka主题的消费者——并没有更改或复制任何数据。

ksql\u0026gt; SET 'auto.offset.reset' = 'earliest';Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'ksql\u0026gt; SELECT ID, ARTIST, SONG FROM TESTDATA_CSV;1 | Rick Astley | Never Gonna Give You Up2 | Johnny Cash | Ring of Fire

最后,创建一个新的Kafka主题,使用带有schema的数据进行填充。KSQL查询是持续的,因此除了将现有的数据从源主题发送到目标主题之外,KSQL还将向目标主题发送未来将生成的数据。

ksql\u0026gt; CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;Message----------------------------Stream created and running----------------------------

使用Avro控制台消费者验证数据:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \\                                --property schema.registry.url=http://localhost:8081 \\                                --topic TESTDATA \\                                --from-beginning | \\                                jq '.'{  \u0026quot;ID\u0026quot;: {    \u0026quot;int\u0026quot;: 1},  \u0026quot;ARTIST\u0026quot;: {    \u0026quot;string\u0026quot;: \u0026quot;Rick Astley\u0026quot;},  \u0026quot;SONG\u0026quot;: {    \u0026quot;string\u0026quot;: \u0026quot;Never Gonna Give You Up\u0026quot;  }}[…]

你甚至可以在Schema Registry中查看已注册的schema:

$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'{  \u0026quot;type\u0026quot;: \u0026quot;record\u0026quot;,  \u0026quot;name\u0026quot;: \u0026quot;KsqlDataSourceSchema\u0026quot;,  \u0026quot;namespace\u0026quot;: \u0026quot;io.confluent.ksql.avro_schemas\u0026quot;,  \u0026quot;fields\u0026quot;: [    {      \u0026quot;name\u0026quot;: \u0026quot;ID\u0026quot;,      \u0026quot;type\u0026quot;: [        \u0026quot;null\u0026quot;,        \u0026quot;int\u0026quot;      ],      \u0026quot;default\u0026quot;: null    },    {      \u0026quot;name\u0026quot;: \u0026quot;ARTIST\u0026quot;,      \u0026quot;type\u0026quot;: [        \u0026quot;null\u0026quot;,        \u0026quot;string\u0026quot;      ],      \u0026quot;default\u0026quot;: null    },    {      \u0026quot;name\u0026quot;: \u0026quot;SONG\u0026quot;,      \u0026quot;type\u0026quot;: [        \u0026quot;null\u0026quot;,        \u0026quot;string\u0026quot;      ],      \u0026quot;default\u0026quot;: null    }  ]}

写入原始主题(testdata-csv)的任何新消息都由KSQL自动处理,并以Avro格式写入新的TESTDATA主题。现在,任何想要使用这些数据的应用程序或团队都可以使用TESTDATA主题。你还可以更改主题的分区数、分区键和复制系数。

英文原文:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

深入理解Kafka Connect:转换器和序列化相关推荐

  1. 新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~

    新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展到目前的 2.x 版本,笔者也见证了Kafka的蜕变,比如旧版客户端的淘汰.新版客户端的设计.Kafka 控制器的迭代 ...

  2. Kafka Connect官网说明

    文章目录 Kafka Connect Overview 运行Kafka Connect 配置 Connectors Transformations REST API Kafka Connect 原文地 ...

  3. 1.3 Quick Start中 Step 7: Use Kafka Connect to import/export data官网剖析(博主推荐)

    不多说,直接上干货! 一切来源于官网 http://kafka.apache.org/documentation/ Step 7: Use Kafka Connect to import/export ...

  4. Kafka: Connect

    转自:http://www.cnblogs.com/f1194361820/p/6108025.html Kafka Connect 简介 Kafka Connect 是一个可以在Kafka与其他系统 ...

  5. SQL Server CDC配合Kafka Connect监听数据变化

    写在前面 好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备.组建.招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇. 进入主题, ...

  6. 深入理解Kafka核心设计与实践原理_01

    深入理解Kafka核心设计与实践原理_01 01_初识Kafka 1.1 基本概念 1.2 安装与配置 1.3 生产与消费 1.4 服务端参数配置 01_初识Kafka 1.1 基本概念 一个典型的 ...

  7. 【Kafka Connect】

    Kafka Connect 概念 特点 组件 Connectors Tasks Workers Converters Transforms Dead Letter Queue rebalance触发场 ...

  8. 《深入理解Kafka:核心设计与实践原理》笔误及改进记录

    2019年2月下旬笔者的有一本新书--<深入理解Kafka:核心设计与实践原理>上架,延续上一本<RabbitMQ实战指南>的惯例,本篇博文用来记录现在发现的一些笔误,一是给购 ...

  9. Kafka科普系列 | 轻松理解Kafka中的延时操作

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-basic-knowledge-of-delay-operation/ 本文起源于之前去面试的一道面试题,面试题大 ...

最新文章

  1. 图像的放大与缩小(2)——双线性插值放大与均值缩小
  2. 关于cn.jedisoft.framework.annotations 的增删改查
  3. 汇编程序:将字符串中所有大写字符转为小写
  4. web大作业介绍自己的家乡_襄阳市恒大名都小学2018—2019年度寒假实践作业
  5. php kibana查询,搜索您的数据 | Kibana 用户手册 | Elastic
  6. java定义一个course类,java集合,定义两个类,学生Student和课程Course,课程被学生选修,请在课程类中提供以下功能:...
  7. Android 系统性能优化(52)---移动端性能监控方案Hertz
  8. java 异常 出口_java语言中,下列哪一子句是异常处理的出口java语言中,下列哪一子句是异常处理的出口Java语言中,下列哪一子句是异常处理的出口()。...
  9. jdk String类源码解析
  10. 考勤打卡记录数据库表结构_中控zktime5.0考勤管理系统数据库表结构
  11. php劳务派遣系统,劳务派遣系统搭建
  12. 74cms前台getshell漏洞
  13. NMPA已注册肿瘤小Panel试剂盒生物信息学分析内容对比
  14. 实现电脑同时上内网和外网(或通过外网访问到该电脑通过该电脑访问内网)
  15. 螺旋线的画法---matlab代码
  16. 送给女朋友的java程序_逗女朋友开心的玫瑰花Java web程序
  17. npm install安装失败,报错记录之The operation was rejected by your operating system. node-sass无法安装,且禁用淘宝镜像
  18. 前端进阶-ES6函数
  19. 学习通课程章节被关闭时怎样查看已关闭章节内容?
  20. 虚拟机的特点,什么是虚拟机

热门文章

  1. “AI赋能,驱动未来”—— 2018中国人工智能峰会(南京)圆满落幕
  2. 好玩,新版微信除了“炸屎”,还可以和她亲亲
  3. 数据库连接池为什么要用threadlocal呢?不用会怎样?
  4. 你知道怎么分库分表吗?如何做到永不迁移数据和避免热点吗?
  5. 面试官扎心一问:数据量很大,分页查询很慢,有什么优化方案?
  6. 超详细的Guava RateLimiter限流原理解析
  7. 2021数据挖掘赛题方案来了!
  8. Datahwhale第三期集训团队成员
  9. LoveLive!出了一篇AI论文:生成模型自动写曲谱
  10. 95后程序员月薪2万背着电脑送外卖,送单途中改Bug