ksql kafka
1、KSQL语法:https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html
KSQL kafka实战:https://my.oschina.net/guol/blog/2236817
KSQL REST API:https://docs.confluent.io/current/ksql/docs/developer-guide/api.html
2、基础语法
ssh -p5022 bdp_jt@10.138.46.222
-p 'bdp_jt'
kafka目录
/home/bdp_jt/confluent-5.1.2
手动生成topic
./bin/kafka-topics --create --zookeeper 10.138.225.199:2181/kafka_confluent_512 --replication-factor 1 --partitions 1 --topic zqdtest
查看kafka的topic
./bin/kafka-topics --zookeeper 10.138.225.199:2181/kafka_confluent_512 --list
生成生产者
(1)./bin/kafka-console-producer --broker-list 10.138.225.199:9092 --topic zqdtest
(2)./bin/ksql-datagen bootstrap-server=10.138.225.199:9092 quickstart=orders format=json topic=orders maxInterval=2000(自动生成topic并随机造数或往topic造数)
生成消费者
./bin/kafka-console-consumer --bootstrap-server 10.138.225.199:9092 --topic zqdtest
连接ksql
./bin/ksql http://10.138.225.199:8088
创建stream和table
(1)根据topic pageviews创建一个stream pageviews_original,value_format为DELIMITED
ksql>CREATE STREAM pageviews_original (userid varchar, pageid varchar) WITH \(kafka_topic='pageviews', value_format='DELIMITED');
(2)根据topic users创建一个table users_original,value_format为json
ksql>CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH \(kafka_topic='users', value_format='JSON', key = 'userid');
查询数据
ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
持久化查询
ksql> CREATE STREAM pageviews2 AS SELECT userid FROM pageviews_original;
查询steam
ksql> SHOW STREAMS;
查询执行任务
ksql> SHOW QUERIES;
消费新数据
cd /opt/programs/confluent_5.0.0/bin
./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic PAGEVIEWS2
终止查询任务
ksql> TERMINATE query_id;
显示所有字段
DESCRIBE MONITOR_ORIGINAL;
{"userid":2,"username":"cvcv"}
10.138.225.199
ssh bdp_jt@10.138.225.198
ssh bdp_jt@10.138.46.224
./bin/kafka-topics --zookeeper 10.138.225.199:2181/kafka_confluent_512 --topic zqdtest
insert into MONITOR_ORIGINAL select 1 as MONITORID, '{''FACTORYCODE'':'''+FACTORYCODE+''','+'''PROD_CODE'':'''+PROD_CODE+''','+'''BARCODE'':'''+BARCODE+'''}' as MONITORCONTENT from JIEPAI_ORIGINAL where cast(BARCODE_TIMEDIFF as int) > 300;
3、ksql启动查询
@Override
public void saveKsql(DatastreamEntity mon) {logger.debug("saveKsql");String streamname = "";//根据选中的数据流,拼接所有表名String column = "";//根据选中的多个数据流,拼接所有要查询的字段//根据数据流ID获取所有选择的数据流for (DatastreamBymetaEntity streams : mon.getMetastreamidlist()) {//streams.getMeta_stream_id() 2385VTableMetaDataJson datajson = tableMetaDataService.findDetailByTableID(streams.getIntMetaStreamID());streamname += datajson.getStrTableNameEN() + ",";for (VTableColumnJson str : datajson.getvTableColumns()) {column += str.getStrColumnNameEN() + ",";}}streamname = streamname.substring(0, streamname.length() - 1);column = column.substring(0, column.length() - 1);//String column="ID,TX_NAME";//String streamname="TEST_HDP0118";//把要查询的字段拼接成Ksql需要的格式String[] strs = column.split(",");String strjson = "'{";for (String str : strs) {strjson += "''" + str + "'':'''+ cast(" + str + " as string ) +''','+'";}strjson = strjson.substring(0, strjson.length() - 4);strjson += "}'";System.out.println(strjson);for (DatastreamRuleconfigEntity rule : mon.getRulelist()) {String ksql = "insert into MONITOR_ORIGINAL2 "+ " select " + rule.getIntID() + " as monitorid, " + strjson + " as monitorcontent "+ " from " + streamname+ " where " + rule.getStrDefinedCondition() + ";";System.out.println(ksql);//获取query_idString query_id=ksqlServerService.createInsert(ksql);rule.setStrQueryID(query_id);rule.setStrKsqlContent(ksql);rule.setIntStreamID(mon.getIntID());monitorRuleDao.saveAndFlush(rule);}//logger.error();
}
@Override
public String createInsert(String ksql) {//得到当前的query 列表KSQLQueries ksqlQuery = this.getKSQLQuery();List<KSQLQuery> queries = ksqlQuery.getQueries();Set queryIDs = new HashSet<String>();for (int i = 0; i < queries.size(); i++) {queryIDs.add(queries.get(i));}System.out.println(ksql);String resJson = this.executeKSQL(ksql);JSONObject rep = JSON.parseObject(resJson);String status = rep.getJSONObject("commandStatus").getString("status");if (SUCCESSS.equals(status)) {//成功后,得到当前的query列表,与上一个进行比较,取出当前的query_id。//当前系统中,不允许重复创建相同的query内容KSQLQueries ksqlQuery2 = this.getKSQLQuery();List<KSQLQuery> queries2 = ksqlQuery2.getQueries();for (int i = 0; i < queries2.size(); i++) {KSQLQuery ksqlQuery1 = queries2.get(i);if (!queryIDs.contains(ksqlQuery1.getId()) && ksql.toUpperCase().equals(ksqlQuery1.getQueryString().toUpperCase())) {return ksqlQuery1.getId();}}return SUCCESSS;} else {logger.error(rep.getJSONObject("commandStatus").toString());return null;}
}
private String executeKSQL(String ksql) {HttpHeaders headers = new HttpHeaders();headers.set("Content-Type", "application/vnd.ksql.v1+json; charset=utf-8");//具体执行的ksqlJSONObject postData = new JSONObject();postData.put("ksql", ksql);//返回值HttpEntity<String> requestEntity = new HttpEntity<String>(postData.toJSONString(), headers);String resJson = restTemplate.postForEntity(ksqlServerAddress, requestEntity, String.class).getBody();String resJson2 = resJson.substring(1, resJson.length() - 1).replaceAll("@", "a");return resJson2;
}private KSQLQueries getKSQLQuery() {String resJson = this.executeKSQL("show queries;");KSQLQueries ksqlQueries = JSON.parseObject(resJson, KSQLQueries.class);return ksqlQueries;
}
4、根据query_id终止查询
/*** 停掉单个规则配置的ksql语句** @param id*/
@Override
public void stopKsql(Integer id) {RestTemplate restTemplate = new RestTemplate();HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON_UTF8);JSONObject postData = new JSONObject();DatastreamRuleconfigEntity rule = monitorService.findDatastreamRuleByid(id);postData.put("ksql", "TERMINATE " + rule.getStrQueryID() + ";");String resJson = restTemplate.postForEntity(ksqlServerAddress, postData, String.class).getBody();String resJson2 = resJson.substring(1, resJson.length() - 1).replaceAll("@", "a");JSONObject rep = JSON.parseObject(resJson2);String status = rep.getJSONObject("commandStatus").getString("status");if (SUCCESSS.equals(status)) {monitorRuleDao.updateState1(id);//把数据流状态修改为未启动1} else {logger.error("stopKsql.停止Ksql", rep.getJSONObject("commandStatus").toString());}
}
5、kafka Topic创建、删除
import com.sdjictec.bdmpextend.monitor.service.KafkaServerService;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.*;@Service
public class KafkaServerServiceImpl implements KafkaServerService {protected static Logger logger = LoggerFactory.getLogger(KafkaServerServiceImpl.class);@Value("${kafka.bootstrap.servers}")private String bootstrapServers;@Value("${kafka.bootstrap.numPartitions}")private Integer numPartitions;@Value("${kafka.bootstrap.numReplicationFactor}")private Integer numReplicationFactor;@Overridepublic Integer createKafkaTopic(String topicName, Integer replicationFactor, Integer partitions, String bootstarpServers) {logger.info("topicName:"+topicName);logger.info("replicationFactor:"+replicationFactor);logger.info("partitions:"+partitions);logger.info("bootstarpServers:"+bootstarpServers);try {AdminClient adminClient = getAdminClient(bootstarpServers);NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor.shortValue());Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);adminClient.createTopics(newTopicList);adminClient.close();return 0;} catch (Exception e) {logger.error(e.getLocalizedMessage(), e);e.printStackTrace();return 920007;}}@Overridepublic Integer createKafkaTopic(String topicName, Integer replicationFactor, Integer partitions) {return this.createKafkaTopic(topicName, partitions, replicationFactor, bootstrapServers);}@Overridepublic Integer createKafkaTopic(String topicName) {return this.createKafkaTopic(topicName, numPartitions, numReplicationFactor, bootstrapServers);}@Overridepublic Integer deleteKafkaTopic(String topicName, String bootstarpServers) {try {AdminClient adminClient = getAdminClient(bootstarpServers);Collection<String> topicList = new ArrayList<>();topicList.add(topicName);adminClient.deleteTopics(topicList);adminClient.close();return 0;} catch (Exception e) {logger.error(e.getLocalizedMessage(), e);e.printStackTrace();return 920008;}}@Overridepublic Integer deleteKafkaTopic(String topicName) {return this.deleteKafkaTopic(topicName, bootstrapServers);}private AdminClient getAdminClient(String bootstarpServers) {try {Properties properties = new Properties();properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstarpServers);AdminClient adminClient = AdminClient.create(properties);return adminClient;} catch (Exception e) {logger.error(e.getLocalizedMessage(), e);e.printStackTrace();return null;}}
}
6、stream创建、修改、删除
public String createStream(MetaStream metaStream,Integer userID){//List<MetaColumn> metaColumnList=new ArrayList<>();//生成KStream创建语句String sql="CREATE "+metaStream.getStrTypeName()+" "+metaStream.getStrStreamName()+" (";for(MetaColumn metaColumn:metaStream.getMetaColumnList()){sql+=metaColumn.getStrColumnName()+" "+metaColumn.getStrColumnType()+",";}sql=sql.substring(0,sql.length()-1);sql+=") WITH (kafka_topic='"+metaStream.getStrTopicName()+"', value_format='JSON');";//创建topicInteger flag=kafkaServerService.createKafkaTopic(metaStream.getStrTopicName());if(flag!=0){return ResponseMsgEnum.getMsg(flag);}//创建StreamString info=ksqlServerService.createInsert(sql);if(null==info){return "元数据创建失败!";}//保存数据流metaStream.setUserCreate(userService.findUserByID(userID));metaStream.setTimCreateTime(new Timestamp(System.currentTimeMillis()));metaStream.setIntIsDel(0);metaStream=metaStreamDao.save(metaStream);for(MetaColumn metaColumn:metaStream.getMetaColumnList()){metaColumn.setIntStreamID(metaStream.getIntID());metaColumn.setIntCreateUserID(userID);metaColumn.setTimCreateTime(new Timestamp(System.currentTimeMillis()));metaColumn.setIntIsDel(0);metaColumnDao.save(metaColumn);}//生成权限Gson gson = new Gson();Type type = new TypeToken<List<Integer>>() {}.getType();List<Integer> groupList = gson.fromJson(metaStream.getStrGroupList(), type);streamGroupService.createUserGroupTaskList(metaStream.getIntID(), groupList, metaStream.getUserCreate().getIntID(),0);return "0"; }public String updateStream(MetaStream metaStream,Integer userID){//生成KStream创建语句String sql="DROP "+metaStream.getStrTypeName()+" "+metaStream.getStrStreamName()+";";//删除StreamString info=ksqlServerService.createInsert(sql);if(null==info){return "元数据删除错误,更新失败!";}//生成KStream创建语句sql="CREATE "+metaStream.getStrTypeName()+" "+metaStream.getStrStreamName()+" (";for(MetaColumn metaColumn:metaStream.getMetaColumnList()){sql+=metaColumn.getStrColumnName()+" "+metaColumn.getStrColumnType()+",";}sql=sql.substring(0,sql.length()-1);sql+=") WITH (kafka_topic='"+metaStream.getStrTopicName()+"', value_format='JSON');";//创建Streaminfo=ksqlServerService.createInsert(sql);if(null==info){return "元数据创建错误,更新失败!";}//保存数据流metaStream.setUserUpdate(userService.findUserByID(userID));metaStream.setTimUpdateTime(new Timestamp(System.currentTimeMillis()));metaStream=metaStreamDao.save(metaStream);metaColumnDao.deleteByStreamID(metaStream.getIntID());for(MetaColumn metaColumn:metaStream.getMetaColumnList()){metaColumn.setIntStreamID(metaStream.getIntID());metaColumn.setIntCreateUserID(userID);metaColumn.setTimCreateTime(new Timestamp(System.currentTimeMillis()));metaColumn.setIntIsDel(0);metaColumnDao.save(metaColumn);}//生成权限Gson gson = new Gson();Type type = new TypeToken<List<Integer>>() {}.getType();List<Integer> groupList = gson.fromJson(metaStream.getStrGroupList(), type);streamGroupService.createUserGroupTaskList(metaStream.getIntID(), groupList, metaStream.getUserCreate().getIntID(),0);return "0"; }public String deleteStream(Integer streamID,Integer userID){MetaStream metaStream=metaStreamDao.findOne(streamID);//生成KStream创建语句String sql="DROP "+metaStream.getStrTypeName()+" "+metaStream.getStrStreamName()+";";//删除StreamString info=ksqlServerService.createInsert(sql);if(null==info){return "元数据删除失败!";}//删除topicInteger flag=kafkaServerService.deleteKafkaTopic(metaStream.getStrTopicName());if(flag!=0){return ResponseMsgEnum.getMsg(flag);}//删除Stream数据metaStream.setIntIsDel(1);metaStream.setUserUpdate(userService.findUserByID(userID));metaStream.setTimUpdateTime(new Timestamp(System.currentTimeMillis()));metaStreamDao.save(metaStream);//删除Column数据List<MetaColumn> metaColumnList=metaColumnDao.findMetaColumnByStreamID(streamID);for(MetaColumn metaColumn:metaColumnList){metaColumn.setIntUpdateUserID(userID);metaColumn.setTimUpdateTime(new Timestamp(System.currentTimeMillis()));metaColumn.setIntIsDel(1);metaColumnDao.save(metaColumn);}return "0"; }
ksql kafka相关推荐
- 大数据发行版本+组件中的竞品/等同地位关系(持续更新中)
开源大数据版本 公司/开源组织 是否付费 备注 Apache Hadoop Apache 否 大数据组件的最初发型版 CDH Cloudera's Distribution Including Apa ...
- 数据可视化分析平台开源方案集锦
B/S 架构的数据可视化分析平台开源方案不完全集锦,供各位参考. 排名不分先后.欢迎补充. kibana Elasticsearch 专用的数据分析检索仪表盘.ELK Stack 中的 K. 日志系统 ...
- 如何解决数据科学家、数据工程师和生产工程师的阻抗失配问题
构建一个可扩展.可靠和高性能的机器学习(ML)基础架构并不容易.这比用Python构建一个分析模型要花费更多的精力. Uber已经为许多生产中的用例运行了其可扩展和不依赖框架的机器学习平台Michel ...
- Kafka团队修改KSQL开源许可,怒怼云厂商
AI前线导读: 今天,Confluent公司(为Apache Kafka开源软件提供商业化服务支持的初创公司,由Kafka的几位创立者离开LinkedIn后成立)联合创始人兼CEO Jay Kreps ...
- 重磅开源 KSQL:用于 Apache Kafka 的流数据 SQL 引擎 2017.8.29
Kafka 的作者 Neha Narkhede 在 Confluent 上发表了一篇博文,介绍了Kafka 新引入的KSQL 引擎--一个基于流的SQL.推出KSQL 是为了降低流式处理的门槛,为处理 ...
- springboot 物联网_Confluent Kafka,KSQL,Spring Boot和分布式SQL开发物联网实战
引言 展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序. 场景 - 支持物联网的车队管理 一家货 ...
- 大数据Hadoop之——EFAK和Confluent KSQL简单使用(kafka listeners 和 advertised.listeners)
文章目录 一.EFAK概述和安装 二.listeners和advertised.listeners配置详解 三.KSQL使用 1)KSQL架构 2)Confluent安装(ZK/KAFKA/KSQL) ...
- KSQL:Apache Kafka的流式SQL
更新:KSQL 现在可作为Confluent Platform的一个组件提供. 我很高兴地宣布KSQL,为Apache kafka流SQL引擎®.KSQL降低了流处理世界的入口,提供了一个简单而完全 ...
- Kafka KSQL实战
点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! ...
最新文章
- python控制git版本库
- 使用MyEclipse简单调用WebServices
- WPF依赖属性(续)(1)
- MVC设计模式-学习笔记
- rocketmq怎么保证数据不会重复_阿里架构师亲授:Kafka和RocketMQ的消息复制实现的差异点在哪?...
- mysql中如何迁移数据文件,迁移mysql数据文件存放位置
- Vue项目中使用浏览器同步测试工具 browersync
- JVM-类加载、GC回收机制
- 为了拿Ph.D而做出的诺贝尔奖
- python地理空间_Python批量下载地理空间数据云数据!Python无所不能!-站长资讯中心...
- 状态转移矩阵 matlab,状态转移矩阵计算.PPT
- ROS学习之工作空间与创建过程
- Arcgis中图像裁剪
- 数据挖掘总结之消极学习与积极学习
- 中职计算机应用专业(云计算方向)建设实践
- AKAZE/KAZE局部特征
- 不写代码?程序员必看的那些电影
- 如何能成为一个自由职业者?先做好这几方面
- [u(x)v(x)]的n阶导数,莱布尼兹公式,利用python简化
- Oracle数据库经典案例之学生选课四表联合查询
热门文章
- java抠图人物背景图片_如何进行人物抠图?让你快速完成复杂背景人像的在线抠图...
- 华为交换机SEP双半环设计方案及配置详细步骤
- 悼念:黑白素色网页背景设置教程
- Tensorflow-- 第一天使用过程中的报错records
- Productivity Power Tools工具
- ElasticSearch学习(四)—— 中文按拼音排序拼音检索
- 完美用Nlite添加ACHI SATA驱动至XP镜像
- 一起飞系列之:腾讯云配置Ubuntu16.04, Nginx, PHP 7, MySql, PhpMyAdmin, 域名
- oracle 创建索引 CREATE INDEX
- Unity 骨骼动画