Apache Kafka的流式SQL引擎——KSQL
1. KSQL 介绍
KSQL 引擎——一个基于流的 SQL。推出 KSQL 是为了降低流式处理的门槛,为处理 Kafka 数据提供简单而完整的可交互式 SQL 接口。KSQL 目前可以支持多种流式操作,包括聚合(aggregate)、连接(join)、时间窗口(window)、会话(session),等等。
2. KSQL 与传统数据库的区别
KSQL 与关系型数据库中的 SQL 还是有很大不同的。传统的 SQL 都是即时的一次性操作,不管是查询还是更新都是在当前的数据集上进行。而 KSQL 则不同,KSQL 的查询和更新是持续进行的,而且数据集可以源源不断地增加。KSQL 所做的其实是转换操作,也就是流式处理。
3. KSQL 的适用场景
1). 实时监控
一方面,可以通过 KSQL 自定义业务层面的度量指标,这些指标可以实时获得。底层的度量指标无法告诉我们应用程序的实际行为,所以基于应用程序生成的原始事件来自定义度量指标可以更好地了解应用程序的运行状况。另一方面,可以通过 KSQL 为应用程序定义某种标准,用于检查应用程序在生产环境中的行为是否达到预期。
2). 安全检测
KSQL 把事件流转换成包含数值的时间序列数据,然后通过可视化工具把这些数据展示在 UI 上,这样就可以检测到很多威胁安全的行为,比如欺诈、入侵,等等。KSQL 为此提供了一种实时、简单而完备的方案。
3). 在线数据集成
大部分的数据处理都会经历 ETL(Extract—Transform—Load)这样的过程,而这样的系统通常都是通过定时的批次作业来完成数据处理的,但批次作业所带来的延时在很多时候是无法被接受的。而通过使用 KSQL 和 Kafka 连接器,可以将批次数据集成转变成在线数据集成。比如,通过流与表的连接,可以用存储在数据表里的元数据来填充事件流里的数据,或者在将数据传输到其他系统之前过滤掉数据里的敏感信息。
4). 应用开发
对于复杂的应用来说,使用 Kafka 的原生 Streams API 或许会更合适。不过,对于简单的应用来说,或者对于不喜欢 Java 编程的人来说,KSQL 会是更好的选择。
4. KSQL 架构
KSQL 是一个独立运行的服务器,多个 KSQL 服务器可以组成集群,可以动态地添加服务器实例。集群具有容错机制,如果一个服务器失效,其他服务器就会接管它的工作。KSQL 命令行客户端通过 REST API 向集群发起查询操作,可以查看流和表的信息、查询数据以及查看查询状态。因为是基于 Streams API 构建的,所以 KSQL 也沿袭了 Streams API 的弹性、状态管理和容错能力,同时也具备了仅一次(exactly once)语义。KSQL 服务器内嵌了这些特性,并增加了一个分布式 SQL 引擎、用于提升查询性能的自动字节码生成机制,以及用于执行查询和管理的 REST API。
Kafka+KSQL 要颠覆传统数据库
传统关系型数据库以表为核心,日志只不过是实现手段。而在以事件为中心的世界里,情况却恰好相反。日志成为了核心,而表几乎是以日志为基础,新的事件不断被添加到日志里,表的状态也因此发生变化。将 Kafka 作为中心日志,配置 KSQL 这个引擎,我们就可以创建出我们想要的物化视图,而且视图也会持续不断地得到更新。
5. KSQL 的核心抽象
KSQL 是基于 Kafka 的 Streams API 进行构建的,所以它的两个核心概念是流(Stream)和表(Table)。流是没有边界的结构化数据,数据可以被源源不断地添加到流当中,但流中已有的数据是不会发生变化的,即不会被修改也不会被删除。表即是流的视图,或者说它代表了可变数据的集合。它与传统的数据库表类似,只不过具备了一些流式语义,比如时间窗口,而且表中的数据是可变的。KSQL 将流和表集成在一起,允许将代表当前状态的表与代表当前发生事件的流连接在一起。
6. Quick start
Important: This release is a developer preview and is free and open-source from Confluent under the Apache 2.0 license. Do not run KSQL against a production cluster.
环境准备: Docker环境与非Docker环境
以下采用第二种环境方式,但是kafka也是搭建在Docker环境呢,上述环境区别是:
Docker环境,镜像会包含一份confluentinc公司的docker发行版和ksql环境。此次测试非Docker环境采用kafka官方版本做成的容器,ksql为单独编译版本
Kafka地址192.168.102.51:9092
Zookeeper地址 192.168.102.51:2181
Topic:test1
1). 代码新建测试所用topic
object CreateTopic {def main(args: Array[String]): Unit = {val config = new java.util.Properties()config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.102.51:9092")val admin = AdminClient.create(config)val topic = new NewTopic("test1", 1, 1)admin.createTopics(Collections.singletonList(topic))// for (tt <- admin.listTopics().names().get()) {// println(tt)// }admin.close()}
}
2). 进去ksql命令行新建一个stream
CREATE stream stream_test (key string, c1 bigint,c2 bigint,c3 bigint) WITH (kafka_topic='test1', value_format='JSON');
制定格式为json, 与kafka producer发送消息类型要对应
3). 给kafka topic 发送消息
object KsqlTest {def main(args: Array[String]): Unit = {val topic = "test1"val props = new Properties();props.put("bootstrap.servers", "192.168.102.51:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")// props.put("property", "parse.key=true")// props.put("property", "key.separator=:")val producer = new KafkaProducer[String, String](props)while (true) {(1 to 3).foreach(i => {val date = new java.util.Date()val record = new ProducerRecord[String, String](topic, "{\"key\":\"" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)+ "\",\"c1\":" + i + ",\"c2\":" + i + ",\"c3\":" + i + "}")producer.send(record)producer.flush()})Thread.sleep(1000)}}
}
发送的消息如下:
{"key":"2017-09-19 14:45:26","c1":2,"c2":2,"c3":2}
{"key":"2017-09-19 14:45:26","c1":3,"c2":3,"c3":3}
{"key":"2017-09-19 14:45:26","c1":3,"c2":3,"c3":3}
标准的json格式与stream里的字段对应
4). 命令行执行 select * from stream_test;
5). 查看stream
Show streams: 查看所有stream
Describe (stream name): 查看制定stream的类型
6). 根据已有的stream建新的stream
create stream stream_test2 as select date, c1 from stream_test WINDOW TUMBLING (size 10 second);
Show queries; 结果
Select * from stream_test2 结果
7). 支持简单的函数
- -暂不支持自定义函数
7. ksql-clickstream-demo
https://github.com/confluentinc/ksql/tree/0.1.x/ksql-clickstream-demo
8. 代码部分
1). 消息获取部分
Ksqlcontext
public void sql(String sql) throws Exception {List<QueryMetadata> queryMetadataList = ksqlEngine.buildMultipleQueries(false, sql, Collections.emptyMap());for (QueryMetadata queryMetadata: queryMetadataList) {if (queryMetadata instanceof PersistentQueryMetadata) {PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;persistentQueryMetadata.getKafkaStreams().start();ksqlEngine.getPersistentQueries().put(persistentQueryMetadata.getId(), persistentQueryMetadata);} else {System.err.println("Ignoring statemenst: " + sql);System.err.println("Only CREATE statements can run in KSQL embedded mode.");log.warn("Ignoring statemenst: {}", sql);log.warn("Only CREATE statements can run in KSQL embedded mode.");}}
}
直接调用的kafka stream 代码 获取record :
public void run() {log.info("{} Starting", logPrefix);boolean cleanRun = false;try {runLoop();cleanRun = true;} catch (final KafkaException e) {// just re-throw the exception as it should be logged alreadythrow e;} catch (final Exception e) {// we have caught all Kafka related exceptions, and other runtime exceptions// should be due to user application errorslog.error("{} Streams application error during processing: ", logPrefix, e);throw e;} finally {shutdown(cleanRun);}
}/*** Main event loop for polling, and processing records through topologies.*/
private void runLoop() {long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;consumer.subscribe(sourceTopicPattern, rebalanceListener);while (stillRunning()) {timerStartedMs = time.milliseconds();// try to fetch some records if necessaryfinal ConsumerRecords<byte[], byte[]> records = pollRequests();if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);addRecordsToTasks(records);final long totalProcessed = processAndPunctuate(activeTasks, recordsProcessedBeforeCommit);if (totalProcessed > 0) {final long processLatency = computeLatency();streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,timerStartedMs);recordsProcessedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,processLatency, commitTimeMs);}}maybeCommit(timerStartedMs);maybeUpdateStandbyTasks(timerStartedMs);maybeClean(timerStartedMs);}log.info("{} Shutting down at user request", logPrefix);
}/*** Get the next batch of records by polling.* @return Next batch of records or null if no records available.*/
private ConsumerRecords<byte[], byte[]> pollRequests() {ConsumerRecords<byte[], byte[]> records = null;try {records = consumer.poll(pollTimeMs);} catch (final InvalidOffsetException e) {resetInvalidOffsets(e);}if (rebalanceException != null) {if (!(rebalanceException instanceof ProducerFencedException)) {throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException);}}return records;
}
2). sql解析
Antlr
3) API
暂未发现有java api调用,quickstart为命令行形式
最后:
推荐文章1:Introducing KSQL: Streaming SQL for Apache Kafka | Confluent
推荐文章2:KSQL Tutorials and Examples - 知乎
快速入门指南:https://github.com/confluentinc/ksql/tree/0.1.x/docs/quickstart#quick-start
GitHub 地址:https://github.com/confluentinc/ksql/issues
Apache Kafka的流式SQL引擎——KSQL相关推荐
- 重磅开源 KSQL:用于 Apache Kafka 的流数据 SQL 引擎 2017.8.29
Kafka 的作者 Neha Narkhede 在 Confluent 上发表了一篇博文,介绍了Kafka 新引入的KSQL 引擎--一个基于流的SQL.推出KSQL 是为了降低流式处理的门槛,为处理 ...
- KSQL:Apache Kafka的流式SQL
更新:KSQL 现在可作为Confluent Platform的一个组件提供. 我很高兴地宣布KSQL,为Apache kafka流SQL引擎®.KSQL降低了流处理世界的入口,提供了一个简单而完全 ...
- kafka处理流式数据_通过Apache Kafka集成流式传输大数据
kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...
- 通过Apache Kafka集成流式传输大数据
从实时实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka日益集成到各种系统和解决方案中. 使用CData Sync ,可以轻松地将此类解决方案应用于 ...
- 为什么阿里会选择 Flink 作为新一代流式计算引擎?
本文由 [AI前线]原创,ID:ai-front,原文链接:t.cn/ROISIr3 [AI前线导读]2017 年 10 月 19日,阿里巴巴的高级技术专家王绍翾(花名"大沙")将 ...
- 大数据之Flink流式计算引擎
Flink DataFlow 数据的分类 有界数据 离线的计算 效率:硬件相同的情况下:时间 无界数据 实时的计算 效率:硬件环境相同的情况下,吞吐量:数据处理的数量.延迟:结果输出的时间-数据接收的 ...
- Flink系列之Flink流式计算引擎基础理论
声明: 文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除.感谢.转载请注明出处,感谢. By luoye ...
- 深度解读!时序数据库HiTSDB:分布式流式聚合引擎
阿里妹导读:高性能时间序列数据库 (High-Performance Time Series Database , 简称 HiTSDB) 是一种高性能,低成本,稳定可靠的在线时序数据库服务, 提供高效 ...
- TDengine3.0流式计算引擎语法规则介绍
小 T 导读:TDengine 3.0 引入了全新的流式计算引擎,既支持时间驱动的流式计算,也支持事件驱动的流式计算.本文将对新的流式计算引擎的语法规则进行详细介绍,方便开发者及企业使用. TDeng ...
最新文章
- 华为在5.5G未来技术演进的六个方向!
- android class newinstance 构造函数 参数,android Fragment里的newInstance和构造函数
- 阿里云交通数据中台解决方案,打造“数字化生产力”
- 听说面试又挂在计算机操作系统了?仔细看看这个!!!【强烈推荐】
- 5万字长文:Stream和Lambda表达式最佳实践-附PDF下载
- 中国量子云计算机,量子云平台“中国版”拉开帷幕:国际首个基于核磁共振的量子计算云平台 | Science Bulletin...
- unity多人联机插件_Mirror ---Unity多人联机游戏API(一)
- 从0开始的Python学习006流程控制
- 大学计算机课代表竞选稿,课代表竞选演讲稿
- Java多线程 ReentrantLock、Condition 实现生产者、消费者协作模式
- PBRT-V3体渲染笔记
- 工业3D互联网可视化三维数字化智能工厂管理系统
- word里双横线怎么打_在word中怎么画直线、双直线、虚线
- 二进制数与十进制数的相互转换、二进制数的乘除运算、移位运算
- 智慧医疗管理系统解决方案:医药电商系统实现智能化改造
- ppt怎么设置页面比例为4:3
- 获取微信聊天窗口的小程序入口参数
- Linux——boot lodaer:grub2核心载入工具
- dbeaver连接gaussdb
- 分布式存储系统 之 数据备份
热门文章
- 计算机网络教程第五版|微课版 - 第五章 运输层 - 习题【补充】
- 毕业旅行 | 用一场纽约5日游告别难忘青葱岁月
- 企业——nginx的图片压缩、https模块、重写规则、盗链
- 845透色android10,和平精英TCA845透色
- 数据可视化之大数据可视化
- Win10环境下安装TensorFlow 2.0简明教程
- 汽车制动盘的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
- 高等代数 具有度量的线性空间(第10章)4 酉空间,酉变换,埃尔米特变换,正规变换
- 应用在汽车倒车影像中的环境光传感芯片
- 如何用css只做一个div鼠标点击变色