flinkSql的union all然后group by写入mysql
从两个流里面输入和一个流里面输入,效果一样
1、代码
package flinkSqlimport java.text.SimpleDateFormatimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.types.Row//视频链接 https://www.bilibili.com/video/BV1Qp4y1Y7YN?p=88
//case class FlinkSqlLession3EventTimeSum(name: String, price: Long, ts: Long)object FlinkSqlLession3_EventTime_unionAll_groupby_sum {def main(args: Array[String]): Unit = {val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentexecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200msexecutionEnvironment.setParallelism(1)//ddl形式必须使用blink planer ,2.1 blink版本planer的流处理,有setting的情况val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(executionEnvironment, blinkStreamSettings)// 第一个流val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)val transforStream: DataStream[FlinkSqlLession3EventTimeSum] = stream2.map(data => {val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")val tmpList: Array[String] = data.split(",")val ts = simpleDateFormat.parse(tmpList(2)).getTimeFlinkSqlLession3EventTimeSum(tmpList(0), tmpList(1).toLong, ts)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[FlinkSqlLession3EventTimeSum](Time.seconds(0)) {override def extractTimestamp(t: FlinkSqlLession3EventTimeSum) = t.ts})//sourceTable FlinkSql,从流里面定义eventTime,转为table,执行sqltableEnvironment.createTemporaryView("FlinkSqlLession3EventTimeSumTable", transforStream, 'name, 'price, 'ts.rowtime)// 第二个流val stream3: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 2222)val transforStream3: DataStream[FlinkSqlLession3EventTimeSum] = stream3.map(data => {val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")val tmpList: Array[String] = data.split(",")val ts = simpleDateFormat.parse(tmpList(2)).getTimeFlinkSqlLession3EventTimeSum(tmpList(0), tmpList(1).toLong, ts)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[FlinkSqlLession3EventTimeSum](Time.seconds(0)) {override def extractTimestamp(t: FlinkSqlLession3EventTimeSum) = t.ts})//sourceTable FlinkSql,从流里面定义eventTime,转为table,执行sqltableEnvironment.createTemporaryView("FlinkSqlLession3EventTimeSumTable3", transforStream3, 'name, 'price, 'ts.rowtime)//sinkTableval sinkDDL: String ="""|create table FlinkSqlLession3Sum_test3 (| name string,| price bigint|) with (| 'connector.type' = 'jdbc',| 'connector.url' = 'jdbc:mysql://localhost:3306/mybatis?useSSL=false&allowPublicKeyRetrieval=true',| 'connector.table' = 'FlinkSqlLession3Sum_test5',| 'connector.driver' = 'com.mysql.jdbc.Driver',| 'connector.username' = 'root',| 'connector.password' = 'zhang925717',| 'connector.write.flush.max-rows' = '1'|)""".stripMargintableEnvironment.sqlUpdate(sinkDDL)// group aggregate,会有回撤策略,只能用toRetractStream进行转换,val tumbleSql: String ="""| select name,| sum(price) price_sum| from (| select * from FlinkSqlLession3EventTimeSumTable| union all| select * from FlinkSqlLession3EventTimeSumTable3| )| group by| name|""".stripMarginval sqlTable: Table = tableEnvironment.sqlQuery(tumbleSql)sqlTable.insertInto("FlinkSqlLession3Sum_test3")sqlTable.toRetractStream[Row].print("FlinkSqlLession3_EventTime_sum")executionEnvironment.execute("flink sql")}
}
2、mysql创建表
CREATE TABLE `FlinkSqlLession3Sum_test5` (`name` varchar(10) NOT NULL,`price` bigint DEFAULT NULL,PRIMARY KEY (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
3、数据
Bush,1000,17/05/2015:10:25:41
Carter,1600,17/05/2015:10:25:42
Bush,700,17/05/2015:10:25:43
Bush,300,17/05/2015:10:25:44
Adams,2000,17/05/2015:10:25:45
Carter,1600,17/05/2015:10:25:51
4、数据过程
A流输入:Bush,1000,17/05/2015:10:25:41
输出:FlinkSqlLession3_EventTime_sum> (true,Bush,1000)
mysql:
|Bush | 1000 |
B流输入:Carter,1600,17/05/2015:10:25:42
输出:FlinkSqlLession3_EventTime_sum> (true,Carter,1600)
mysql:
| Bush | 1000 |
| Carter | 1600|
B流输入:Bush,700,17/05/2015:10:25:43
输出:FlinkSqlLession3_EventTime_sum> (false,Bush,1000)
FlinkSqlLession3_EventTime_sum> (true,Bush,1700)
mysql:
| Bush | 1700 |
| Carter | 1600 |
flinkSql的union all然后group by写入mysql相关推荐
- Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...
- scala写入mysql_spark rdd转dataframe 写入mysql的实例讲解
dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者 ...
- SparkStreaming读取Kafka数据源并写入Mysql数据库
SparkStreaming读取Kafka数据源并写入Mysql数据库 一.实验环境 本实验所用到的工具有 kafka_2.11-0.11.0.2: zookeeper-3.4.5: spark-2. ...
- 爬取数据并写入MySQL数据库
1.爬取思路总结概述: a.请求网页,获取json数据:request函数 b.使正则re 模块,提取出 { {--} }信息所在部分的字典 :re.search() c.为了防止连续请求中出现,IP ...
- Python实现maoyan票房数据并写入MySQL
学习记录: 目录 一.目标网址分析 二.代码实现 三.效果展示 一.目标网址分析 网页:aHR0cHM6Ly9waWFvZmFuZy5tYW95YW4uY29tL2Rhc2hib2FyZA== 接口: ...
- pandas读取大文件(chunksize)并通过sqlalchemy写入MySQL数据库
pandas读取大文件(chunksize)并通过sqlalchemy写入MySQL数据库 在pandas中读取表类文件的时候有一个参数chunksize,只要指定了这个参数的数值,那么得到的结果就不 ...
- python爬取新闻并归数据库_Python爬取数据并写入MySQL数据库操作示例
Python爬取数据并写入MySQL数据库的实例 首先我们来爬取 http://html-color-codes.info/color-names/ 的一些数据. 按 F12 或 ctrl+u 审查元 ...
- 关于log4net日志写入mysql数据库记录
网上关于log4net日志写入mysql数据库的博客感觉比较少,所以这边搞定之后先过来记录一下. 首先新建个项目,我命名是log4netDemo,然后需要引入两个dll,一个是mysql.dll,一个 ...
- python爬取mysql数据_Python爬取数据并写入MySQL数据库的实例
Python爬取数据并写入MySQL数据库的实例 来源:中文源码网 浏览: 次 日期:2018年9月2日 [下载文档: Python爬取数据并写入MySQL数据库的实例.txt ] (友 ...
最新文章
- Windows使用MSVC,命令行编译,链接64位dll,Python调用
- animateWithDuration:animations:completion:
- 纯css实现网页侧边栏弹窗滑进滑出
- 剑网三谜题终于揭开,药宗四系开合得当,有输出有治疗
- 实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2
- 利用MPI解决N体问题
- 苹果和谷歌在印度下架数十款中国应用;贾跃亭宣布破产重组完成;Tails 4.8 发布| 极客头条...
- 如何在XenDesktop中映射USB设备
- 2021-03-14Java大数据Week2
- 我真的还是18岁的那个我
- linux使用小命令使用汇集
- float的比较要慎重
- Python中.mat文件的读写操作
- PAT (Basic Level) 1045 柳婼、旭神两大思路分析【测试点】样例
- 超简单的自定义个性化网页鼠标光标样式 html+css+js
- 服务器宕机监控、检测、报警程序(139绑定手机短信报警)monitor_down.sh
- Python3自然语言处理(5)——预处理
- 对视频文件进行简单的加密
- AI经典书单:入门人工智能该读哪些书?
- HEVC编码器设计实战-梅奥-专题视频课程
热门文章
- pygame精灵组有哪些方法_pygame怎样实现精灵的行走及二段跳
- CSS source
- Mysql_sql存储过程
- chrome html导出pdf,使用Selenium实现HTML转PDF
- 将计算机系成绩置零,实验三数据更新操作_计算机软件及应用_IT计算机_专业资料...
- 玩回合制手游《问道》心得
- Glade+GTK+ 实现通讯录信息管理系统图形界面软件开发
- C语言_因数、因子_质数(素数)、合数
- PDAL:OSGeo4W安装配置测试PDAL
- python守护进程去中断子进程_04 Python并发编程(守护进程,进程锁,进程队列)