本篇文章转自:https://blog.csdn.net/weixin_41461992/article/details/106790507

起因

由于需要做各种数据库摆渡到kafka的组件研究。
其中clickhouse和kafka间的数据摆渡,根据官方给出的kafka引擎文档,便有了我这篇实践记录。
相应的,该配置也非常简单。

官方传送门: kafka engine clickhouse

这边对数据库和kafka环境不再累述。

一、开发环境

kafka 2.4
zookeeper 3.4.5
clickhouse 20.4.5.36
centos7

二、 介绍

clickhouse支持kafka的表双向同步,其中提供的为Kafka引擎。

其大致情况为如下情况:Kafka主题中存在对应的数据格式,Clickhouse创建一个Kafka引擎表(即相当于一个消费者),当主题有消息进入时,获取该消息,将其进行消费,然后物化视图同步插入到MergeTree表中。

该引擎还支持反向写入到Kafka中,即往Kafka引擎表中插入数据,可以同步到Kafka中(同样可以使用物化视图将不同引擎需要的表数据同步插入到Kafka引擎表中)。

下面为Kafka Engine的一些配置:
老版本格式为:

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])

新版本格式为:

Kafka SETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic1,topic2',kafka_group_name = 'group1',kafka_format = 'JSONEachRow',kafka_row_delimiter = '\n',kafka_schema = '',kafka_num_consumers = 2

必填参数(例如topic、kafka集群、消费者组等):

  1. kafka_broker_list – 以逗号分隔的 brokers 列表 (例如kafka1:9092,kafka2:9092,kafka3:9092)。
  2. kafka_topic_list – topic 列表 (你的topic名字,也可以多个)。
  3. kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  4. kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow、CSV、XML等。Formats格式传送门

非必填的参数:

  1. kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
  2. kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
  3. kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。众所周知消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

三、实践

  1. 创建,使用引擎创建一个Kafka消费者并作为一条数据流。
CREATE TABLE queue (q_date Date,level String,message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'k1:9092,k2:9092,k3:9092',kafka_topic_list = 'my_topic',kafka_group_name = 'kafka_group_test',kafka_format = 'CSV',kafka_num_consumers = 4;

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

消费组可以灵活配置并且在集群之间同步。例如,如果集群中有10个主题和5个表副本,则每个副本将获得2个主题。如果副本数量发生变化,主题将自动在副本中重新分配。

  1. 创建一个结构表
CREATE TABLE daily (day Date,level String,message String
) ENGINE = MergeTree(day, (day, level), 8192);
  1. 创建物化视图,该视图会在后台转换引擎中的数据并将其放入之前创建的表中。
CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT q_date AS day, level, messageFROM queue;

其中AS后面的语句是自己根据实际需求进行调整的。
为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。

  1. 停止或者修改转换逻辑
    detach物化视图
DETACH TABLE consumer;
ATTACH TABLE consumer;
  1. 在创建表前,需要在kafka创建一个topic用于测试。
./bin/kafka-topic.sh --create --topic my_topic --partitions 3 --replication-factor 3

此处的topic需要与之前的kafka_topic_list对应,既然该参数为list,则可以配置多个topic

  1. 生产者往topic中生产数据
./bin/kafka-console-producer.sh --topic my_topic --broker-list k1:9092

按照CSV格式,生产数据输入

2020-06-28 level1 message
2020-06-28 level2 message
2020-06-28 level3 message

此时查看表daily,数据已同步

select * from daily
|--- day --- level --- message ---|
|--- 2020-06-28 --- level1 --- message ---|
|--- 2020-06-28 --- level2 --- message ---|
|--- 2020-06-28 --- level3 --- message ---|

同理,向kafka引擎表中插入数据,也可以在my_topic中可以消费到插入数据。

  1. 数据量测试
    在该版本,我在kafka生产者客户端进行for循环2000w数据进行测试,延迟不高,基本在10秒内同步完成,不知道在表字段数量和复杂sql语句时情况如何。还需要进一步进行实际使用

所遇问题

  1. 表结构变更
    对于数据同步问题,其中一个就是表同步之间的结构对应问题。
    由于表创建的时候,已经固定,所以ck的kafka引擎在遇到字段改变的时候,依然需要删表重建,或者修改物化视图进行不同的sql操作。

  2. 延迟问题
    在之前的版本中,社区有人提出该同步延迟太高,特别是数据量大的时候,但是在我实际测试中,大约2000w简单单表同步延迟可以接受。具体性能需要进一步测试。

  3. format格式
    对于自己规定的格式,一定要正确,比如csv就是csv,json就是json格式,不然会报错。

  4. 消费问题
    前面说过,kafka引擎其实是一个或者多个消费者进行topic的消费,那必然就涉及到消费问题,如何重新消费,如何在需要修改业务的时候重新连接。重置偏移量。
    场景:丢失数据,重新消费

首先,我们可以将daily中的消息干掉,手动导致消息丢失(自己整一个消息丢失)TRUNCATE TABLE daily此时,daily数据被删除了,同步的数据丢失。
接下来,我们停止kafka引擎表,在clickhouse中执行DETACH TABLE queue最后在kafka中,执行重置偏移量的命令./bin/kafka-consumer-groups --bootstrap-server k1:9092 --topic my_topic --group kafka_group_test --reset-offsets --to-earliest --excute然后,重连queue表ATTACH TABLE queue

这样,就开始重新消费啦

  1. 错误日志
    clickhouse日志查看为ck目录下的log文件夹的clickhouse-server.log中。

Clickhouse Engine kafka 将kafka数据同步clickhouse相关推荐

  1. Hive 数据同步ClickHouse

    需求:按条件筛选Hive表中的数据同步到ClickHouse中 方法一: 按照ClickHouse 官方文档提供的方法,在ClickHouse 中创建HDFS引擎表,读取Hive的数据,将读取的数据插 ...

  2. kafka中副本数据同步策略 ,acknowledge的发送策略,kafka的数据可靠性保证

    ack(acknowledge)简介 为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的消息后,都需要向producer发送 ...

  3. 【大数据】如何将数据导入ClickHouse?

    最近在研究如何提升InfluxDB+Grafana的查询速度,因为随着数据量的上涨,查询速度逐渐变慢,采用了连续查询,也没有得到明显的效果,其实单从数据量上,采用连续查询,按道理是可以得到很好的效果的 ...

  4. canal+Kafka实现mysql与redis数据同步

    前言 上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用.在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化.如果这时候数据库数据发生变更操作,就不 ...

  5. Kafka实时数据同步

    目录 1 概述 2 捕获Oracle数据到Kafka 2.1 数据捕获设置 2.2 数据发布设置 2.3 捕获到发布数据流映射 2.4 查看任务执行日志 3 订阅Kafka数据到ClickHouse ...

  6. clickhouse + ProxySQL 单机部署及增量同步数据

    背景: 随着数据量的上升,OLAP一直是被讨论的话题,虽然druid,kylin能够解决OLAP问题,但是druid,kylin也是需要和hadoop全家桶一起用的,异常的笨重.故引进clickhou ...

  7. 利用OGG实现Oracle到Kafka到Greenplum的增量数据同步

    墨墨导读:本文来自墨天轮用户 肖杰 的投稿,介绍用OGG实现Oracle到Kafka到Greenplum的增量数据同步的全过程. 墨天轮主页:https://www.modb.pro/u/6722 背 ...

  8. ogg oracle 测试kafka_利用ogg实现oracle到kafka的增量数据实时同步

    前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标 ...

  9. 【PostgreSQL逻辑复制数据同步到kafka】

    目前CDC(Change Data Capture)工具还是比较多的 ,现在有部分增量数据同步工具是基于触发器实现的,把数据库的变化记录到某个系统表中,然后在客户端建立缓冲,并定期将变化push到接收 ...

最新文章

  1. windows server 2016 安装指南
  2. python 分类变量xgboost_如何用XGBoost做时间序列预测?
  3. phpstudy php日志,phpstudy开启网站Apache日志并且按照日期划分创建
  4. had oop 链接mysql_php – 将MySQL连接查询与OOP和对象相关联的最佳实践方法
  5. 贪吃蛇程序设计报告python_20192116 2019-2020-2 《Python程序设计》实验四报告
  6. ubuntu命令之dpkg
  7. 物体抓取位姿估計算法綜述_3D视觉技术在机器人抓取作业中的应用
  8. 条件表达式计算个人所得税c语言,个税的计算C语言实现,结果为什么是负的?...
  9. Log4j日志使用记录
  10. Spark on Yarn查看删除日志
  11. Microsoft Office2010 安装包和安装方法
  12. 3个月的产品实习生,还不会画原型和做UI设计
  13. 计算机原理74181芯片,计算机组成原理
  14. HTML:tab页签
  15. PFC电源设计与电感设计计算学习笔记
  16. 进击的DApp:区块链上将长出怎么样的新事物?
  17. 【毕业设计】大数据股票分析与预测系统 - python LSTM
  18. BZOJ 3699 GAL的数组
  19. Android开发:手机震动工具类
  20. 27-什么是自旋锁?自旋的好处和后果是什么呢?

热门文章

  1. 计算机隐藏用户设置,在开始屏幕(欢迎屏幕、控制面板的用户账户设置)上隐藏用户账户和开机自动登录某个账户...
  2. adobe flash player plugin_Adobe的LOGO升级了! ps图标没有描边了
  3. 《推荐系统实践》附上Reference 中的干货 (Paper,Blog等资料的链接)
  4. 面向对象编程 object oriented programming(OOP)
  5. OpenCASCADE:建立Body
  6. 宏BOOST_TEST_TRAIT_TRUE的用法实例
  7. boost::contract模块实现name list名单的测试程序
  8. Boost:同步化的测试程序
  9. ITK:从二进制图像中的对象计算距离图
  10. VTK:Texture之TextureCutQuadric