​ Integration系统表引擎主要用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。

1 Kafka

1.1 Kafka引擎

​ 将Kafka Topic中的数据直接导入到ClickHouse。

​ 语法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_row_delimiter = 'delimiter_symbol',][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0]

​ 参数说明:

​ ①必需的参数

参数 说明
kafka_broker_list Kafka broker列表,以逗号分隔
kafka_topic_list Kafka topic列表
kafka_group_name Kafka消费者组,如果不希望消息在集群中重复,使用相同的组名
kafka_format 消息格式。使用与SQL格式函数相同的符号,例如JSONEachRow

​ ②可选参数

参数 说明
kafka_row_delimiter 分隔符字符,用于一行的结束标识符号
kafka_schema 如果kafka_format参数需要schema定义,则通过该参数来支持
kafka_num_consumers 每张表的消费者个数。默认值:1。如果一个使用者的吞吐量不足,则指定更多使用者。使用者的总数不应该超过主题中的分区数,因为每个分区只能分配一个使用者。
kafka_max_block_size 轮询的最大批处理大小
kafka_skip_broken_messages 忽略无效记录的条数。默认值:0
kafka_commit_every_batch 在编写整个块之后提交每个使用和处理的批而不是单个提交(默认值:0)

​ 测试:(1)建表

  CREATE TABLE test_kafka (\timestamp UInt64,\level String,\message String\) ENGINE = Kafka() SETTINGS kafka_broker_list = 'ambari01:6667,ambari02:6667,ambari03:6667',\kafka_topic_list = 'test',\kafka_group_name = 'group1',\kafka_format = 'JSONEachRow',\kafka_row_delimiter = '\n'

​ 注意:如果后面在查询过程中报如下错误。是因为有些引擎版本存在的,消息中数据之间的分割符号未指定,导致无法处理。解决办法: 添加 kafka_row_delimiter = ‘\n’。

Cannot parse input: expected { before: \0: (at row 2)

​ (2)在kafka建立一个新的topic

sh kafka-topics.sh --create --zookeeper ambari01:2181,ambari02:2181,ambari03:2181 --replication-factor 1 --partitions 3 --topic test

​ (3)在kafka建立发布者console-producer

sh kafka-console-producer.sh --broker-list ambari01:6667,ambari02:6667,ambari03:6667 --topic test

​ (4)发送数据

{"timestamp":1515897460,"level":"one","message":"aa"}

​ 注意:由于一个kafka的partition 只能由一个 group consumer 消费,所以clickhouse 节点数需要大于 topic 的 partition 数。

​ (5)第一次查询

SELECT *
FROM test_kafka ┌──timestamp─┬─level─┬─message─┐
│ 1515897460 │ one   │ aa      │
└────────────┴───────┴─────────┘

​ (6)第二次查询

SELECT *
FROM test_kafka Ok.

​ 发现第二次查询的时候没有数据了,因为 Kafka引擎 表只是 kafka 流的一个视图而已,当数据被 select 了一次之后,这个数据就会被认为已经消费了,下次 select 就不会再出现。所以Kafka表单独使用是没什么用的,一般是用来和 MaterialView 配合,将Kafka表里面的数据自动导入到 MaterialView 里面。

​ (7)与 MaterialView 集成

​ 我们现在每一节点建一个 MaterialView 保存 Kafka 里面的数据, 再建一个全局的Distributed表。

CREATE MATERIALIZED VIEW test_kafka_view ENGINE = SummingMergeTree() PARTITION BY day ORDER BY  (day, level) AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as count FROM test_kafka GROUP BY day, level;

​ (6)再次发送数据

{"timestamp":1515897461,"level":"2","message":'bb'}
{"timestamp":1515897462,"level":"3","message":'cc'}
{"timestamp":1515897462,"level":"3","message":'ee'}
{"timestamp":1515897463,"level":"4","message":'dd'}

​ (7)查询数据

SELECT *
FROM test_kafka Ok.0 rows in set. Elapsed: 2.686 sec.
---------------------------------------
SELECT *
FROM test_kafka_view Ok.0 rows in set. Elapsed: 0.002 sec.

​ 发现没有数据,原因:kafka 引擎默认消费根据条数与时间进行入库,不然肯定是没效率的。其中对应的参数有两个。 max_insert_block_size(默认值为: 1048576),stream_flush_interval_ms(默认值为: 7500)这两个参数都是全局性的。

​ 业务系统需要从kafka读取数据,按照官方文档建好表后,也能看到数据,但是延时很高。基本要延时15分钟左右。kafka的数据大约每秒50条左右。基本规律是累计到65535行以后(最小的块大小)才会在表中显示数据。尝试更改stream_flush_interval_ms 没有作用,但是有不想改max_block_size,因为修改以后影响到全局所有表,并且影响搜索效率。希望能每N秒保证不管block有没有写满都flush一次。

​ 虽然ClickHouse和 Kafka的配合可以说是十分的便利,只有配置好,但是相当的局限性对 kafka 数据格式的支持也有限。下面介绍WaterDrop这个中间件将Kafka的数据接入ClickHouse。

1.2 WaterDrop

​ WaterDrop: 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark和 Apache Flink之上。github地址:https://github.com/InterestingLab/waterdrop

​ ①下载并解压

wget https://github.com/InterestingLab/waterdrop/releases/download/v1.4.3/waterdrop-1.4.3.zip
unzip waterdrop-1.4.3.zip

​ ②修改配置文件waterdrop-env.sh

vim /opt/module/waterdrop-1.4.3/config/waterdrop-env.sh
SPARK_HOME=/usr/jdp/3.2.0.0-108/spark2  #配置为spark的路径

​ ③增加配置文件test.conf

spark {spark.streaming.batchDuration = 5spark.app.name = "test_waterdrop"spark.ui.port = 14020spark.executor.instances = 3spark.executor.cores = 1spark.executor.memory = "1g"
}input {kafkaStream  {topics = "test_wd"consumer.bootstrap.servers = "10.0.0.50:6667,10.0.0.52:6667,10.0.0.53:6667"consumer.zookeeper.connect = "10.0.0.50:2181,10.0.0.52:2181,10.0.0.53:2181"consumer.group.id = "group1"consumer.failOnDataLoss = falseconsumer.auto.offset.reset = latestconsumer.rebalance.max.retries = 100}
}
filter {json{source_field = "raw_message"}
}output {clickhouse {host = "10.0.0.50:8123"database = "test"table = "test_wd"fields = ["act","b_t","s_t"]username = "admin"password = "admin"retry_codes = [209, 210 ,1002]retry = 10bulk_size = 1000}
}

​ ④创建Clickhouse表

create table test.test_wd( act String, b_t String, s_t Date) ENGINE = MergeTree() partition by s_t order by s_t;

​ ⑤启动写入程序

cd /data/work/waterdrop-1.4.1
sh /opt/module/waterdrop-1.4.3/bin/start-waterdrop.sh --master yarn --deploy-mode client --config /opt/module/waterdrop-1.4.3/config/test.conf

​ ⑥插入数据

{"act":"aaaa","b_t":"100","s_t":"2019-12-22"}
{"act":"bxc","b_t":"200","s_t":"2020-01-01"}
{"act":"dd","b_t":"50","s_t":"2020-02-01"}

​ ⑦查看表数据

SELECT *
FROM test_wd ┌─act─┬─b_t─┬────────s_t─┐
│ dd  │ 50  │ 2020-02-01 │
└─────┴─────┴────────────┘
┌─act──┬─b_t─┬────────s_t─┐
│ aaaa │ 100 │ 2019-12-22 │
└──────┴─────┴────────────┘
┌─act─┬─b_t─┬────────s_t─┐
│ bxc │ 200 │ 2020-01-01 │
└─────┴─────┴────────────┘

2 MySQL

​ 将Mysql作为存储引擎,可以对存储在远程 MySQL 服务器上的数据执行 select查询

​ 语法:

MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

​ 参数说明

参数 说明
host:port MySQL 服务器地址
database 数据库的名称
table 表名称
user 数据库用户
password 用户密码
replace_query 将 INSERT INTO 查询是否替换为 REPLACE INTO 的标志。如果 replace_query=1,则替换查询
on_duplicate_clause 将 ON DUPLICATE KEY UPDATE on_duplicate_clause 表达式添加到 INSERT 查询语句中。

​ 测试:

​ 在Mysql中建表,并插入数据

CREATE TABLE `user` (`id` int(11) NOT NULL,`username` varchar(50) DEFAULT NULL,`sex` varchar(5) DEFAULT NULL
)INSERT INTO user values(11,"zs","0");
INSERT INTO user values(12,"ls","0");
INSERT INTO user values(13,"ww","0");
INSERT INTO user values(14,"ll","1");

​ 创建ClickHouse表,insert_time字段为默认字段

CREATE TABLE test.from_mysql(\id UInt64,\username String,\sex String,\insert_time Date DEFAULT toDate(now())\
) ENGINE = MergeTree()\
PARTITION BY insert_time \
ORDER BY (id,username)

​ 插入数据

INSERT INTO test.from_mysql (id,username,sex) SELECT id, username,sex FROM mysql('10.0.0.42:3306','test', 'user', 'root', 'admin');

​ 查询数据

SELECT *
FROM from_mysql ┌─id─┬─username─┬─sex─┬─insert_time─┐
│ 11 │ zs       │ 0   │  2020-05-24 │
│ 12 │ ls       │ 0   │  2020-05-24 │
│ 13 │ ww       │ 0   │  2020-05-24 │
│ 14 │ ll       │ 1   │  2020-05-24 │
└────┴──────────┴─────┴─────────────┘4 rows in set. Elapsed: 0.003 sec. 

3 HDFS

​ 用户通过执行SQL语句,可以在ClickHouse中直接读取HDFS的文件,也可以将读取的数据导入到ClickHouse本地表。

​ HDFS引擎:ENGINE = HDFS(URI, format)。URI:HDFS的URI,format:存储格式,格式链接https://clickhouse.tech/docs/en/interfaces/formats/#formats

3.1 查询文件

​ 这种使用场景相当于把HDFS做为ClickHouse的外部存储,当查询数据时,直接访问HDFS的文件,而不是把HDFS文件导入到ClickHouse再进行查询。相对于ClickHouse的本地存储查询,速度较慢。

​ 在HDFS上新建一个数据文件:user.csv,上传hadoop fs -cat /user/test/user.csv,内容如下:

1,zs,18
2,ls,19
4,wu,25
3,zl,22

​ 在ClickHouse上创建一个访问user.csv文件的表:

CREATE TABLE test_hdfs_csv(\id UInt64,\name String,\age UInt8\
)ENGINE = HDFS('hdfs://ambari01:8020/user/test/user.csv', 'CSV')

​ 查询hdfs_books_csv表

SELECT *
FROM test_hdfs_csv ┌─id─┬─name─┬─age─┐
│  1 │ zs   │  18 │
│  2 │ ls   │  19 │
│  4 │ wu   │  25 │
│  3 │ zl   │  22 │
└────┴──────┴─────┘

3.2 从HDFS导入数据

​ 从HDFS导入数据,数据在ClickHouse本地表,建本地表

CREATE TABLE test_hdfs_local(\id UInt64,\name String,\age UInt8\
)ENGINE = Log

​ 在数据存储目录下可以找到这个表的文件夹

/data/clickhouse/data/test/test_hdfs_local

​ 从HDFS导入数据

INSERT INTO test_hdfs_local SELECT * FROM test_hdfs_csv

​ 查询

SELECT *
FROM test_hdfs_local ┌─id─┬─name─┬─age─┐
│  1 │ zs   │  18 │
│  2 │ ls   │  19 │
│  4 │ wu   │  25 │
│  3 │ zl   │  22 │
└────┴──────┴─────┘

ClickHouse表引擎之Integration系列相关推荐

  1. clickhouse表引擎-合并树系列

    目录 1 clickhouse表引擎-合并树系列简介 2 MergeTree引擎 2.1 建表语法 2.2 创建最简单的MergerTree引擎表 2.3 插入数据 2.4 查看目录结构 2.5 指定 ...

  2. 【Clickhouse】Clickhouse 表引擎之 Log系列

    文章目录 1.概述 5.1 TinyLog引擎(数据不分快) 5.2 StripeLog 5.3 Log 1.概述 Log家族具有最小功能的轻量级引擎.当您需要快速写入许多小表(最多约100万行)并在 ...

  3. ClickHouse表引擎到底怎么选

    引言 表引擎在ClickHouse中的作用十分关键,直接决定了数据如何存储和读取.是否支持并发读写.是否支持index.支持的query种类.是否支持主备复制等. ClickHouse提供了大约28种 ...

  4. 大数据培训ClickHouse表引擎

    表引擎 表引擎(即表的类型)决定了: 1)数据的存储方式和位置,写到哪里以及从哪里读取数据 2)支持哪些查询以及如何支持. 3)并发数据访问. 4)索引的使用(如果存在). 5)是否可以执行多线程请求 ...

  5. 3、ClickHouse表引擎-MergeTree引擎

    ClickHouse系列文章 1.ClickHouse介绍 2.clickhouse安装与简单验证(centos) 3.ClickHouse表引擎-MergeTree引擎 4.clickhouse的L ...

  6. ClickHouse 表引擎 ClickHouse性能调优

    https://clickhouse.com/ 引子 什么是"更快"? 顺序读/写吞吐量? 随机读/写延迟? 特定并行性和工作负载下的IOPS. 显然RAM可能比磁盘慢,例如单个c ...

  7. ClickHouse表引擎详解

    ClickHouse 表引擎 ClickHouse 的表引擎是 ClickHouse 服务的核心,它们决定了 ClickHouse 的以下行为: 1.数据的存储方式和位置. 2.支持哪些查询操作以及如 ...

  8. 【clickhouse】ClickHouse表引擎 MergeTree 数据生命周期

    1.概述 转载:ClickHouse表引擎 MergeTree 数据生命周期 TTL(Time To Live)表示数据的存活时间,在 Merge 中可以为某个字段或者整个表设置TTL. 如果设置列级 ...

  9. 【clickhouse】ClickHouse表引擎 MergeTree 索引与数据存储方式 一级索引 二级索引

    1.概述 转载:ClickHouse表引擎 MergeTree 索引与数据存储方式 2.一级索引 MergeTree 主键使用 primary key 定义,定义主键后,会将数据依据 index_gr ...

最新文章

  1. LED,硅光电池的光能-电能转换是可逆的吗?
  2. k8s 命令 重启_快速入门Kubernetes(K8S)——资源清单
  3. 如何调试bash脚本
  4. VisualStudio配置中文提示
  5. CDOJ--1668
  6. 将ubuntu配置为路由器_“名酒为王”时代来临但资源将尽,看泸州老窖如何进行“名酒资源再配置”丨深度观察...
  7. 【迁移2018-05-08 14:14:27】全局唯一ID生成
  8. github下载慢时可采用码云快速下载资源
  9. mysql linux内核_mysql 5.7.15 vs mysql 5.6.31性能测试以及不同linux内核性能比较
  10. 深入理解jvm虚拟机笔记
  11. uniapp文件体积超过 500KB报错
  12. 坐标拾取器功能实现demo
  13. 【JZOJ】给水(water)
  14. js移除Array中指定元素
  15. 「项目管理」如何制定详细的项目计划?
  16. 电力电子应用技术的matlab仿真
  17. 我的架构梦:(七十) 消息中间件之RabbitMQ的消息可靠性
  18. 充电管理芯片中的一些特性说明
  19. 2021牛客多校1——J:Journey of Railway Stations(线段树)
  20. 人工智能发展的核心——机器学习

热门文章

  1. 175. Combine Two Tables
  2. USB设备驱动之设备初始化(设备枚举)
  3. iOS 5.1.1 设备不能安装AdHoc问题版本号
  4. HDOJ 3415 Max Sum of Max-K-sub-sequence
  5. C#中的类型转换大总结
  6. Android开发实践:常用NDK命令行参数
  7. TCP传输过程中丢包问题
  8. STM32F4 串口DMA
  9. 程序的内存分配模式(堆栈以及静态存储区,文字常量区,代码区)
  10. 硬件基础 —— 电阻