从两个流里面输入和一个流里面输入,效果一样

1、代码

package flinkSqlimport java.text.SimpleDateFormatimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.types.Row//视频链接 https://www.bilibili.com/video/BV1Qp4y1Y7YN?p=88
//case class FlinkSqlLession3EventTimeSum(name: String, price: Long, ts: Long)object FlinkSqlLession3_EventTime_unionAll_groupby_sum {def main(args: Array[String]): Unit = {val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentexecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200msexecutionEnvironment.setParallelism(1)//ddl形式必须使用blink planer ,2.1 blink版本planer的流处理,有setting的情况val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(executionEnvironment, blinkStreamSettings)//  第一个流val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)val transforStream: DataStream[FlinkSqlLession3EventTimeSum] = stream2.map(data => {val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")val tmpList: Array[String] = data.split(",")val ts = simpleDateFormat.parse(tmpList(2)).getTimeFlinkSqlLession3EventTimeSum(tmpList(0), tmpList(1).toLong, ts)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[FlinkSqlLession3EventTimeSum](Time.seconds(0)) {override def extractTimestamp(t: FlinkSqlLession3EventTimeSum) = t.ts})//sourceTable  FlinkSql,从流里面定义eventTime,转为table,执行sqltableEnvironment.createTemporaryView("FlinkSqlLession3EventTimeSumTable", transforStream, 'name, 'price, 'ts.rowtime)// 第二个流val stream3: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 2222)val transforStream3: DataStream[FlinkSqlLession3EventTimeSum] = stream3.map(data => {val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")val tmpList: Array[String] = data.split(",")val ts = simpleDateFormat.parse(tmpList(2)).getTimeFlinkSqlLession3EventTimeSum(tmpList(0), tmpList(1).toLong, ts)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[FlinkSqlLession3EventTimeSum](Time.seconds(0)) {override def extractTimestamp(t: FlinkSqlLession3EventTimeSum) = t.ts})//sourceTable  FlinkSql,从流里面定义eventTime,转为table,执行sqltableEnvironment.createTemporaryView("FlinkSqlLession3EventTimeSumTable3", transforStream3, 'name, 'price, 'ts.rowtime)//sinkTableval sinkDDL: String ="""|create table FlinkSqlLession3Sum_test3 (| name string,| price bigint|) with (| 'connector.type' = 'jdbc',| 'connector.url' = 'jdbc:mysql://localhost:3306/mybatis?useSSL=false&allowPublicKeyRetrieval=true',| 'connector.table' = 'FlinkSqlLession3Sum_test5',| 'connector.driver' = 'com.mysql.jdbc.Driver',| 'connector.username' = 'root',| 'connector.password' = 'zhang925717',| 'connector.write.flush.max-rows' = '1'|)""".stripMargintableEnvironment.sqlUpdate(sinkDDL)//    group aggregate,会有回撤策略,只能用toRetractStream进行转换,val tumbleSql: String ="""| select name,|   sum(price) price_sum| from (|   select * from FlinkSqlLession3EventTimeSumTable|   union all|   select * from FlinkSqlLession3EventTimeSumTable3| )| group by|   name|""".stripMarginval sqlTable: Table = tableEnvironment.sqlQuery(tumbleSql)sqlTable.insertInto("FlinkSqlLession3Sum_test3")sqlTable.toRetractStream[Row].print("FlinkSqlLession3_EventTime_sum")executionEnvironment.execute("flink sql")}
}

2、mysql创建表

CREATE TABLE `FlinkSqlLession3Sum_test5` (`name` varchar(10) NOT NULL,`price` bigint DEFAULT NULL,PRIMARY KEY (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

3、数据

Bush,1000,17/05/2015:10:25:41
Carter,1600,17/05/2015:10:25:42
Bush,700,17/05/2015:10:25:43
Bush,300,17/05/2015:10:25:44
Adams,2000,17/05/2015:10:25:45
Carter,1600,17/05/2015:10:25:51

4、数据过程

A流输入:Bush,1000,17/05/2015:10:25:41

输出:FlinkSqlLession3_EventTime_sum> (true,Bush,1000)

mysql:

|Bush |  1000 |

B流输入:Carter,1600,17/05/2015:10:25:42

输出:FlinkSqlLession3_EventTime_sum> (true,Carter,1600)

mysql:

| Bush   |  1000 |

| Carter |  1600|

B流输入:Bush,700,17/05/2015:10:25:43

输出:FlinkSqlLession3_EventTime_sum> (false,Bush,1000)
            FlinkSqlLession3_EventTime_sum> (true,Bush,1700)

mysql:

| Bush   |  1700 |

| Carter |  1600 |

flinkSql的union all然后group by写入mysql相关推荐

  1. Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...

  2. scala写入mysql_spark rdd转dataframe 写入mysql的实例讲解

    dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者 ...

  3. SparkStreaming读取Kafka数据源并写入Mysql数据库

    SparkStreaming读取Kafka数据源并写入Mysql数据库 一.实验环境 本实验所用到的工具有 kafka_2.11-0.11.0.2: zookeeper-3.4.5: spark-2. ...

  4. 爬取数据并写入MySQL数据库

    1.爬取思路总结概述: a.请求网页,获取json数据:request函数 b.使正则re 模块,提取出 { {--} }信息所在部分的字典 :re.search() c.为了防止连续请求中出现,IP ...

  5. Python实现maoyan票房数据并写入MySQL

    学习记录: 目录 一.目标网址分析 二.代码实现 三.效果展示 一.目标网址分析 网页:aHR0cHM6Ly9waWFvZmFuZy5tYW95YW4uY29tL2Rhc2hib2FyZA== 接口: ...

  6. pandas读取大文件(chunksize)并通过sqlalchemy写入MySQL数据库

    pandas读取大文件(chunksize)并通过sqlalchemy写入MySQL数据库 在pandas中读取表类文件的时候有一个参数chunksize,只要指定了这个参数的数值,那么得到的结果就不 ...

  7. python爬取新闻并归数据库_Python爬取数据并写入MySQL数据库操作示例

    Python爬取数据并写入MySQL数据库的实例 首先我们来爬取 http://html-color-codes.info/color-names/ 的一些数据. 按 F12 或 ctrl+u 审查元 ...

  8. 关于log4net日志写入mysql数据库记录

    网上关于log4net日志写入mysql数据库的博客感觉比较少,所以这边搞定之后先过来记录一下. 首先新建个项目,我命名是log4netDemo,然后需要引入两个dll,一个是mysql.dll,一个 ...

  9. python爬取mysql数据_Python爬取数据并写入MySQL数据库的实例

    Python爬取数据并写入MySQL数据库的实例 来源:中文源码网    浏览: 次    日期:2018年9月2日 [下载文档:  Python爬取数据并写入MySQL数据库的实例.txt ] (友 ...

最新文章

  1. Windows使用MSVC,命令行编译,链接64位dll,Python调用
  2. animateWithDuration:animations:completion:
  3. 纯css实现网页侧边栏弹窗滑进滑出
  4. 剑网三谜题终于揭开,药宗四系开合得当,有输出有治疗
  5. 实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2
  6. 利用MPI解决N体问题
  7. 苹果和谷歌在印度下架数十款中国应用;贾跃亭宣布破产重组完成;Tails 4.8 发布| 极客头条...
  8. 如何在XenDesktop中映射USB设备
  9. 2021-03-14Java大数据Week2
  10. 我真的还是18岁的那个我
  11. linux使用小命令使用汇集
  12. float的比较要慎重
  13. Python中.mat文件的读写操作
  14. PAT (Basic Level) 1045 柳婼、旭神两大思路分析【测试点】样例
  15. 超简单的自定义个性化网页鼠标光标样式 html+css+js
  16. 服务器宕机监控、检测、报警程序(139绑定手机短信报警)monitor_down.sh
  17. Python3自然语言处理(5)——预处理
  18. 对视频文件进行简单的加密
  19. AI经典书单:入门人工智能该读哪些书?
  20. HEVC编码器设计实战-梅奥-专题视频课程

热门文章

  1. pygame精灵组有哪些方法_pygame怎样实现精灵的行走及二段跳
  2. CSS source
  3. Mysql_sql存储过程
  4. chrome html导出pdf,使用Selenium实现HTML转PDF
  5. 将计算机系成绩置零,实验三数据更新操作_计算机软件及应用_IT计算机_专业资料...
  6. 玩回合制手游《问道》心得
  7. Glade+GTK+ 实现通讯录信息管理系统图形界面软件开发
  8. C语言_因数、因子_质数(素数)、合数
  9. PDAL:OSGeo4W安装配置测试PDAL
  10. python守护进程去中断子进程_04 Python并发编程(守护进程,进程锁,进程队列)