1.19.10.Flink SQL工程案例
1.19.10.1.编写Pom.xml文件
1.19.10.2.java案例
1.19.10.3.案例1:Flink批式处理(建表、查询、插入、jdbc connector的使用)
1.19.10.4.案例2:StreamTableEnvironment的使用案例+数据模拟+自定义函数及其应用
1.19.10.5.案例3:StreamTableEnvironment应用+blink/Flink SQL应用
1.19.10.6.案例4:Flink SQL之Stream + Window窗口计算案例
1.19.10.7.案例5:将DataSet数据转成Table数据(word count)
1.19.10.8.案例6:将Table数据转成DataSet
1.19.10.11.模拟表数据+自定义函数
1.19.10.12.Stream SQL + blink/flink SQL切换使用案例
1.19.10.13.StreamTableExample案例
1.19.10.14.TPCHQuery3Table
1.19.10.15.WordCountSQL
1.19.10.16.WordCountTable
1.19.10.17.SQL案例
1.19.10.18.Flink之单流kafka写入mysql
1.19.10.19.Flink之双流kafka写入mysql
1.19.10.20.Flink之kafka和mysql维表实时关联写入mysql
1.19.10.21.Flink滚动窗口案例
1.19.10.22.Flink滑动窗口案例
1.19.10.23.Mysql-cdc,ElasticSearch connector
1.19.10.24.Flink datagen案例
1.19.10.25.Upsert Kafka SQL连接器
1.19.10.26.Flink Elasticsearch connector相关的案例

1.19.10.Flink SQL工程案例

1.19.10.1.编写Pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toto.test</groupId><artifactId>flink-sql-demo</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--maven properties --><maven.test.skip>false</maven.test.skip><maven.javadoc.skip>false</maven.javadoc.skip><!-- compiler settings properties --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.12.0</flink.version><commons-lang.version>2.5</commons-lang.version><scala.binary.version>2.11</scala.binary.version></properties><distributionManagement><repository><id>releases</id><layout>default</layout><url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>snapshots</name><url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement><repositories><repository><id>releases</id><layout>default</layout><url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url></repository><repository><id>snapshots</id><name>snapshots</name><url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>warn</checksumPolicy></snapshots></repository><repository><id>xxxx</id><name>xxxx</name><url>http://xxx.xxx.xxx/nexus/content/repositories/xxxx/</url></repository><repository><id>public</id><name>public</name><url>http://xxx.xxx.xxx/nexus/content/groups/public/</url></repository><!-- 新加 --><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><!-- 取决于你使用的编程语言,选择Java或者Scala API来构建你的Table API和SQL程序 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!-- 如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner --><!-- Either... (for the old planner that was available before Flink 1.9) --><!--如果遇到:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl问题,解决办法是去掉:<scope>provided</scope>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!-- or.. (for the new Blink planner) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!--内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!-- 如果你想实现自定义格式来解析Kafka数据,或者自定义函数,下面的依赖就足够了,编译出来的jar文件可以直接给SQL Client使用 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><!--***************************** scala依赖 *************************************--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><!--***************************** 用jdbc connector 的时候使用*************************--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.12.0</version></dependency></dependencies><build><finalName>flink-sql-demo</finalName><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala编译插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以设置jar包的入口类(可选) --><mainClass></mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><!--<plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions><configuration><args><arg>-nobootcp</arg></args>&lt;!&ndash; 解决Error:(55, 38) Static methods in interface require -target:jvm-1.8问题 &ndash;&gt;<addScalacArgs>-target:jvm-1.8</addScalacArgs></configuration></plugin>--></plugins></build></project>

1.19.10.2.java案例

1.19.10.3.案例1:Flink批式处理(建表、查询、插入、jdbc connector的使用)

前置条件(创建MySQL的表):

CREATE TABLE `stu` (`name` varchar(60) DEFAULT NULL,`speciality` varchar(60) DEFAULT NULL,`id` bigint(20) NOT NULL AUTO_INCREMENT,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;INSERT INTO stu (name,speciality,id) VALUES('张三','美术',1),('张三','音乐',2),('李四','篮球',3),('小明','美术',4),('李四','美术',5),('小明','音乐',6),('赵六','数学',7),('张三','美术',8),('张三','音乐',9),('李四','篮球',10);
INSERT INTO stu (name,speciality,id) VALUES('小明','美术',11),('李四','美术',12),('小明','音乐',13),('田七','语文',14);CREATE TABLE `stu_tmp` (`name` varchar(60) DEFAULT NULL,`num` bigint(20) DEFAULT '0',`id` bigint(20) NOT NULL AUTO_INCREMENT,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
package flinksqldemo;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;/*** @author tuzuoquan* @version 1.0* @ClassName flinksqldemo.BatchCreateSelectInsertDemoWithJdbcConnector* @description* 1、引包:pom.xml中使用Blink的* 2、流式包 还是 Batch包* 3、jdk为1.8* 4、必须指定主键* 5、使用ROW_NUMBER() OVER (PARTITION BY t.flag ORDER BY t.num DESC) as id方式获取行号* 6、要引入mysql的connector* 7、要引入mysql driver** @date 2021/3/11 17:47**/
public class BatchCreateSelectInsertDemoWithJdbcConnector {public static void main(String[] args) {//        //StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//        //StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
//        TableEnvironment tableEnv = TableEnvironment.create(bsSettings);ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment tableEnv = TableEnvironment.create(bbSettings);// 1、创建表tableEnv.executeSql("CREATE TABLE flink_stu (" +"      id BIGINT, " +"      name STRING, " +"      speciality STRING, " +"      PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +"'connector' = 'jdbc'," +"'url'='jdbc:mysql://xxx.xxx.xxx.xxx:3306/test'," +"'table-name' = 'stu'," +"'username' = 'root', " +"'password' = 'xxxxxx'" +")");//        tableEnv.executeSql("select * from stu");// 2、查询表TableResult tableResult = tableEnv.sqlQuery("SELECT id, name,speciality FROM flink_stu").execute();tableResult.print();CloseableIterator<Row> it = tableResult.collect();while (it.hasNext()) {Row row = it.next();System.out.println(row.getField(0));}TableSchema tableSchema = tableResult.getTableSchema();System.out.println(tableSchema.getFieldName(1).toString());// 3、创建输出表tableEnv.executeSql("CREATE TABLE flink_stu_tmp (" +"    id BIGINT," +"    name STRING, " +"    num BIGINT," +"    PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +"'connector' = 'jdbc'," +"'url'='jdbc:mysql://xxx.xxx.xxx.xxx:3306/test'," +"'table-name' = 'stu_tmp'," +"'username' = 'root', " +"'password' = 'xxxxxx'" +")");tableEnv.executeSql("INSERT INTO flink_stu_tmp(id,name, num) " +"SELECT " +"ROW_NUMBER() OVER (PARTITION BY t.flag ORDER BY t.num DESC) as id, " +"t.name as name, " +"t.num as num " +"from ( " +"select name, count(name) as num, '1' as flag from flink_stu group by name) t");//tableResult1.print();String[] tables = tableEnv.listTables();for(String t : tables) {System.out.println(t);}try {env.execute("jobName");} catch (Exception e) {e.printStackTrace();}}}

1.19.10.4.案例2:StreamTableEnvironment的使用案例+数据模拟+自定义函数及其应用

package flinksqldemo;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;import java.time.LocalDate;
import java.util.HashSet;
import java.util.Set;import static org.apache.flink.table.api.Expressions.*;/*** Example for getting started with the Table & SQL API.** <p>The example shows how to create, transform, and query a table. It should give a first* impression about the look-and-feel of the API without going too much into details. See the other* examples for using connectors or more complex operations.** <p>In particular, the example shows how to** <ul>*   <li>setup a {@link TableEnvironment},*   <li>use the environment for creating example tables, registering views, and executing SQL*       queries,*   <li>transform tables with filters and projections,*   <li>declare user-defined functions,*   <li>and print/collect results locally.* </ul>** <p>The example executes two Flink jobs. The results are written to stdout.*/
public class StreamDataTableAndRowTypeAndUDF {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// BLINK STREAMING QUERYStreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// create a table with example data without a connector requiredfinal Table rawCustomers = tableEnv.fromValues(Row.of("Guillermo Smith",LocalDate.parse("1992-12-12"),"4081 Valley Road","08540","New Jersey","m",true,0,78,3),Row.of("Valeria Mendoza",LocalDate.parse("1970-03-28"),"1239  Rainbow Road","90017","Los Angeles","f",true,9,39,0),Row.of("Leann Holloway",LocalDate.parse("1989-05-21"),"2359 New Street","97401","Eugene",null,true,null,null,null),Row.of("Brandy Sanders",LocalDate.parse("1956-05-26"),"4891 Walkers-Ridge-Way","73119","Oklahoma City","m",false,9,39,0),Row.of("John Turner",LocalDate.parse("1982-10-02"),"2359 New Street","60605","Chicago","m",true,12,39,0),Row.of("Ellen Ortega",LocalDate.parse("1985-06-18"),"2448 Rodney STreet","85023","Phoenix", "f",true,0,78,3));final Table truncatedCustomers = rawCustomers.select(withColumns(range(1, 7)));// name columnsfinal Table namedCustomers =truncatedCustomers.as("name","date_of_birth","street","zip_code","city","gender","has_newsletter");tableEnv.createTemporaryView("customers", namedCustomers);// use SQL whenever you like// call execute() and print() to get insightstableEnv.sqlQuery("SELECT "+ "  COUNT(*) AS `number of customers`, "+ "  AVG(YEAR(date_of_birth)) AS `average birth year` "+ "FROM `customers`").execute().print();// or further transform the data using the fluent Table API// e.g. filter, project fields, or call a user-defined functionfinal Table youngCustomers =tableEnv.from("customers").filter($("gender").isNotNull()).filter($("has_newsletter").isEqual(true)).filter($("date_of_birth").isGreaterOrEqual(LocalDate.parse("1980-01-01"))).select($("name").upperCase(),$("date_of_birth"),call(AddressNormalizer.class, $("street"), $("zip_code"), $("city")).as("address"));rawCustomers.execute().print();System.out.println("============================================================");youngCustomers.execute().print();System.out.println("===========================================================");// use execute() and collect() to retrieve your results from the cluster// this can be useful for testing before storing it in an external systemtry (CloseableIterator<Row> iterator = youngCustomers.execute().collect()) {final Set<Row> expectedOutput = new HashSet<>();expectedOutput.add(Row.of("GUILLERMO SMITH",LocalDate.parse("1992-12-12"),"4081 VALLEY ROAD, 08540, NEW JERSEY"));expectedOutput.add(Row.of("JOHN TURNER",LocalDate.parse("1982-10-02"),"2359 NEW STREET, 60605, CHICAGO"));expectedOutput.add(Row.of("ELLEN ORTEGA",LocalDate.parse("1985-06-18"),"2448 RODNEY STREET, 85023, PHOENIX"));final Set<Row> actualOutput = new HashSet<>();iterator.forEachRemaining(actualOutput::add);if (actualOutput.equals(expectedOutput)) {System.out.println("SUCCESS!");} else {System.out.println("FAILURE!");}}}public static class AddressNormalizer extends ScalarFunction {// the 'eval()' method defines input and output types (reflectively extracted)// and contains the runtime logicpublic String eval(String street, String zipCode, String city) {return normalize(street) + ", " + normalize(zipCode) + ", " + normalize(city);}private String normalize(String s) {return s.toUpperCase().replaceAll("\\W", " ").replaceAll("\\s+", " ").trim();}}}

1.19.10.5.案例3:StreamTableEnvironment应用+blink/Flink SQL应用

package flinksqldemo;import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Arrays;
import java.util.Objects;import static org.apache.flink.table.api.Expressions.$;/*** Simple example for demonstrating the use of SQL on a Stream Table in Java.* <p>Usage: <code>flinksqldemo.StreamSQLExample --planner &lt;blink|flink&gt;</code><br>** <p>This example shows how to: - Convert DataStreams to Tables - Register a Table under a name -* Run a StreamSQL query on the registered Table**/
public class StreamSQLExample {public static void main(String[] args) throws Exception {// 在idea中的 Program arguments上填写:--planner blinkfinal ParameterTool params = ParameterTool.fromArgs(args);String planner = params.has("planner") ? params.get("planner") : "blink";//System.out.println(planner);// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv;if (Objects.equals(planner, "blink")) {EnvironmentSettings settings =EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();tEnv = StreamTableEnvironment.create(env, settings);}// use flink planner in streaming modeelse if (Objects.equals(planner, "flink")) {EnvironmentSettings settings =EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build();tEnv = StreamTableEnvironment.create(env, settings);} else {System.err.println("The planner is incorrect. Please run 'flinksqldemo.StreamSQLExample --planner <planner>', "+ "where planner (it is either flink or blink, and the default is blink) indicates whether the "+ "example uses flink planner or blink planner.");return;}DataStream<Order> orderA =env.fromCollection(Arrays.asList(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2)));DataStream<Order> orderB =env.fromCollection(Arrays.asList(new Order(2L, "pen", 3),new Order(2L, "rubber", 3),new Order(4L, "beer", 1)));// convert DataStream to TableTable tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));// register DataStream as TabletEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));// union the two tablesTable result =tEnv.sqlQuery("SELECT * FROM "+ tableA+ " WHERE amount > 2 UNION ALL "+ "SELECT * FROM OrderB WHERE amount < 2");// System.out.println(result.execute());tEnv.toAppendStream(result, Order.class).print();// after the table program is converted to DataStream program,// we must use `env.execute()` to submit the job.env.execute("jobNameTest");}// *************************************************************************//     USER DATA TYPES// *************************************************************************/** Simple POJO. */public static class Order {public Long user;public String product;public int amount;public Order() {}public Order(Long user, String product, int amount) {this.user = user;this.product = product;this.amount = amount;}@Overridepublic String toString() {return "Order{"+ "user="+ user+ ", product='"+ product+ '\''+ ", amount="+ amount+ '}';}}}

1.19.10.6.案例4:Flink SQL之Stream + Window窗口计算案例

package flinksqldemo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;import java.io.File;
import java.io.IOException;/*** Simple example for demonstrating the use of SQL in Java.** <p>Usage: {@code ./bin/flink run ./examples/table/flinksqldemo.StreamWindowSQLExample.jar}** <p>This example shows how to: - Register a table via DDL - Declare an event time attribute in the* DDL - Run a streaming window aggregate on the registered table**/
public class StreamWindowSQLExample {public static void main(String[] args) throws Exception {// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// write source data into temporary file and get the absolute pathString contents ="1,beer,3,2019-12-12 00:00:01\n"+ "1,diaper,4,2019-12-12 00:00:02\n"+ "2,pen,3,2019-12-12 00:00:04\n"+ "2,rubber,3,2019-12-12 00:00:06\n"+ "3,rubber,2,2019-12-12 00:00:05\n"+ "4,beer,1,2019-12-12 00:00:08";String path = createTempFile(contents);// register table via DDL with watermark,// the events are out of order, hence, we use 3 seconds to wait the late eventsString ddl ="CREATE TABLE orders ("+ "  user_id INT, "+ "  product STRING, "+ "  amount INT, "+ "  ts TIMESTAMP(3), "+ "  WATERMARK FOR ts AS ts - INTERVAL '3' SECOND "+ ") WITH ("+ "  'connector.type' = 'filesystem', "+ "  'connector.path' = '"+ path+ "',"+ "  'format.type' = 'csv' "+ ")";tEnv.executeSql(ddl);// run a SQL query on the table and retrieve the result as a new TableString query ="SELECT"+ "  CAST(TUMBLE_START(ts, INTERVAL '5' SECOND) AS STRING) window_start, "+ "  COUNT(*) order_num, "+ "  SUM(amount) total_amount, "+ "  COUNT(DISTINCT product) unique_products "+ "FROM orders "+ "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";Table result = tEnv.sqlQuery(query);tEnv.toAppendStream(result, Row.class).print();// after the table program is converted to DataStream program,// we must use `env.execute()` to submit the jobenv.execute("Streaming Window SQL Job");// should output:// 2019-12-12 00:00:05.000,3,6,2// 2019-12-12 00:00:00.000,3,10,3}/** Creates a temporary file with the contents and returns the absolute path. */private static String createTempFile(String contents) throws IOException {File tempFile = File.createTempFile("orders", ".csv");tempFile.deleteOnExit();FileUtils.writeFileUtf8(tempFile, contents);return tempFile.toURI().toString();}}

1.19.10.7.案例5:将DataSet数据转成Table数据(word count)

package flinksqldemo;import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import static org.apache.flink.table.api.Expressions.$;/*** @author tuzuoquan* @version 1.0* @ClassName flinksqldemo.WordCountSQL* @description TODO* @date 2021/3/15 16:12**/
public class WordCountSQL {public static void main(String[] args) throws Exception {// set up execution environmentExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input =env.fromElements(new WC("Hello", 1), new WC("Ciao", 1), new WC("Hello", 1));// register the DataSet as a view "WordCount"tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));// run a SQL query on the Table and retrieve the result as a new TableTable table =tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");DataSet<WC> result = tEnv.toDataSet(table, WC.class);result.print();}// *************************************************************************//     USER DATA TYPES// *************************************************************************public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}}}

1.19.10.8.案例6:将Table数据转成DataSet

package flinksqldemo;/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import static org.apache.flink.table.api.Expressions.$;/*** Simple example for demonstrating the use of the Table API for a Word Count in Java.** <p>This example shows how to: - Convert DataSets to Tables - Apply group, aggregate, select, and* filter operations*/
public class WordCountTable {// *************************************************************************//     PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);DataSet<WC> input =env.fromElements(new WC("Hello", 1), new WC("Ciao", 1), new WC("Hello", 1));Table table = tEnv.fromDataSet(input);Table filtered =table.groupBy($("word")).select($("word"), $("frequency").sum().as("frequency")).filter($("frequency").isEqual(2));DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);result.print();}// *************************************************************************//     USER DATA TYPES// *************************************************************************/** Simple POJO containing a word and its respective count. */public static class WC {public String word;public long frequency;// public constructor to make it a Flink POJOpublic WC() {}public WC(String word, long frequency) {this.word = word;this.frequency = frequency;}@Overridepublic String toString() {return "WC " + word + " " + frequency;}}
}

1.19.10.9.Scala案例

1.19.10.10.注意事项

## Flink程序报:
`Error:(55, 38) Static methods in interface require -target:jvm-1.8val env = TableEnvironment.create(settings)`解决办法:
https://blog.csdn.net/tototuzuoquan/article/details/114841671

1.19.10.11.模拟表数据+自定义函数

package org.apache.flink.table.examples.scala.basicsimport java.time.LocalDateimport org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, _}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
import org.apache.flink.util.CloseableIteratorimport scala.collection.JavaConverters._/*** Example for getting started with the Table & SQL API in Scala.** The example shows how to create, transform, and query a table. It should give a first impression* about the look-and-feel of the API without going too much into details. See the other examples for* using connectors or more complex operations.** In particular, the example shows how to*   - setup a [[TableEnvironment]],*   - use the environment for creating example tables, registering views, and executing SQL queries,*   - transform tables with filters and projections,*   - declare user-defined functions,*   - and print/collect results locally.** The example executes two Flink jobs. The results are written to stdout.*/
object GettingStartedExample {def main(args: Array[String]): Unit = {// setup the unified API// in this case: declare that the table programs should be executed in batch modeval settings = EnvironmentSettings.newInstance().inBatchMode().build()val env = TableEnvironment.create(settings)// create a table with example data without a connector requiredval rawCustomers = env.fromValues(row("Guillermo Smith", LocalDate.parse("1992-12-12"), "4081 Valley Road", "08540", "New Jersey", "m", true, 0, 78, 3),row("Valeria Mendoza", LocalDate.parse("1970-03-28"), "1239  Rainbow Road", "90017", "Los Angeles", "f", true, 9, 39, 0),row("Leann Holloway", LocalDate.parse("1989-05-21"), "2359 New Street", "97401", "Eugene", null, true, null, null, null),row("Brandy Sanders", LocalDate.parse("1956-05-26"), "4891 Walkers-Ridge-Way", "73119", "Oklahoma City", "m", false, 9, 39, 0),row("John Turner", LocalDate.parse("1982-10-02"), "2359 New Street", "60605", "Chicago", "m", true, 12, 39, 0),row("Ellen Ortega", LocalDate.parse("1985-06-18"), "2448 Rodney STreet", "85023", "Phoenix", "f", true, 0, 78, 3))// handle ranges of columns easilyval truncatedCustomers = rawCustomers.select(withColumns(1 to 7))// name columnsval namedCustomers = truncatedCustomers.as("name", "date_of_birth", "street", "zip_code", "city", "gender", "has_newsletter")// register a view temporarilyenv.createTemporaryView("customers", namedCustomers)// use SQL whenever you like// call execute() and print() to get insightsenv.sqlQuery("""|SELECT|  COUNT(*) AS `number of customers`,|  AVG(YEAR(date_of_birth)) AS `average birth year`|FROM `customers`|""".stripMargin).execute().print()// or further transform the data using the fluent Table API// e.g. filter, project fields, or call a user-defined functionval youngCustomers = env.from("customers").filter($"gender".isNotNull).filter($"has_newsletter" === true).filter($"date_of_birth" >= LocalDate.parse("1980-01-01")).select($"name".upperCase(),$"date_of_birth",call(classOf[AddressNormalizer], $"street", $"zip_code", $"city").as("address"))// use execute() and collect() to retrieve your results from the cluster// this can be useful for testing before storing it in an external systemvar iterator: CloseableIterator[Row] = nulltry {iterator = youngCustomers.execute().collect()val actualOutput = iterator.asScala.toSetval expectedOutput = Set(Row.of("GUILLERMO SMITH", LocalDate.parse("1992-12-12"), "4081 VALLEY ROAD, 08540, NEW JERSEY"),Row.of("JOHN TURNER", LocalDate.parse("1982-10-02"), "2359 NEW STREET, 60605, CHICAGO"),Row.of("ELLEN ORTEGA", LocalDate.parse("1985-06-18"), "2448 RODNEY STREET, 85023, PHOENIX"))if (actualOutput == expectedOutput) {println("SUCCESS!")} else {println("FAILURE!")}} finally {if (iterator != null) {iterator.close()}}}/*** We can put frequently used procedures in user-defined functions.** It is possible to call third-party libraries here as well.*/class AddressNormalizer extends ScalarFunction {// the 'eval()' method defines input and output types (reflectively extracted)// and contains the runtime logicdef eval(street: String, zipCode: String, city: String): String = {normalize(street) + ", " + normalize(zipCode) + ", " + normalize(city)}private def normalize(s: String) = {s.toUpperCase.replaceAll("\\W", " ").replaceAll("\\s+", " ").trim}}
}

1.19.10.12.Stream SQL + blink/flink SQL切换使用案例

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.flink.table.examples.scala.basicsimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._/*** Simple example for demonstrating the use of SQL on a Stream Table in Scala.** <p>Usage: <code>flinksqldemo.StreamSQLExample --planner &lt;blink|flink&gt;</code><br>** <p>This example shows how to:*  - Convert DataStreams to Tables*  - Register a Table under a name*  - Run a StreamSQL query on the registered Table*/
object StreamSQLExample {// *************************************************************************//     PROGRAM// *************************************************************************def main(args: Array[String]): Unit = {val params = ParameterTool.fromArgs(args)val planner = if (params.has("planner")) params.get("planner") else "blink"// set up execution environmentval env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = if (planner == "blink") {  // use blink planner in streaming modeval settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()StreamTableEnvironment.create(env, settings)} else if (planner == "flink") {  // use flink planner in streaming modeval settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()StreamTableEnvironment.create(env, settings)} else {System.err.println("The planner is incorrect. Please run 'flinksqldemo.StreamSQLExample --planner <planner>', " +"where planner (it is either flink or blink, and the default is blink) indicates whether the " +"example uses flink planner or blink planner.")return}val orderA: DataStream[Order] = env.fromCollection(Seq(Order(1L, "beer", 3),Order(1L, "diaper", 4),Order(3L, "rubber", 2)))val orderB: DataStream[Order] = env.fromCollection(Seq(Order(2L, "pen", 3),Order(2L, "rubber", 3),Order(4L, "beer", 1)))// convert DataStream to Tableval tableA = tEnv.fromDataStream(orderA, $"user", $"product", $"amount")// register DataStream as TabletEnv.createTemporaryView("OrderB", orderB, $"user", $"product", $"amount")// union the two tablesval result = tEnv.sqlQuery(s"""|SELECT * FROM$tableAWHERE amount > 2|UNION ALL|SELECT * FROM OrderB WHERE amount < 2""".stripMargin)result.toAppendStream[Order].print()env.execute()}// *************************************************************************//     USER DATA TYPES// *************************************************************************case class Order(user: Long, product: String, amount: Int)}

1.19.10.13.StreamTableExample案例

package org.apache.flink.table.examples.scala.basicsimport org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._/*** Simple example for demonstrating the use of Table API on a Stream Table.** This example shows how to:*  - Convert DataStreams to Tables*  - Apply union, select, and filter operations*/
object StreamTableExample {// *************************************************************************//     PROGRAM// *************************************************************************def main(args: Array[String]): Unit = {// set up execution environmentval env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)val orderA = env.fromCollection(Seq(Order(1L, "beer", 3),Order(1L, "diaper", 4),Order(3L, "rubber", 2))).toTable(tEnv)val orderB = env.fromCollection(Seq(Order(2L, "pen", 3),Order(2L, "rubber", 3),Order(4L, "beer", 1))).toTable(tEnv)// union the two tablesval result: DataStream[Order] = orderA.unionAll(orderB).select('user, 'product, 'amount).where('amount > 2).toAppendStream[Order]result.print()env.execute()}// *************************************************************************//     USER DATA TYPES// *************************************************************************case class Order(user: Long, product: String, amount: Int)}

1.19.10.14.TPCHQuery3Table

package org.apache.flink.table.examples.scala.basicsimport org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._/*** This program implements a modified version of the TPC-H query 3. The* example demonstrates how to assign names to fields by extending the Tuple class.* The original query can be found at* [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)* (page 29).** This program implements the following SQL equivalent:** {{{* SELECT*      l_orderkey,*      SUM(l_extendedprice*(1-l_discount)) AS revenue,*      o_orderdate,*      o_shippriority* FROM customer,*      orders,*      lineitem* WHERE*      c_mktsegment = '[SEGMENT]'*      AND c_custkey = o_custkey*      AND l_orderkey = o_orderkey*      AND o_orderdate < date '[DATE]'*      AND l_shipdate > date '[DATE]'* GROUP BY*      l_orderkey,*      o_orderdate,*      o_shippriority* ORDER BY*      revenue desc,*      o_orderdate;* }}}** Input files are plain text CSV files using the pipe character ('|') as field separator* as generated by the TPC-H data generator which is available at* [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).** Usage:* {{{* TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>* }}}** This example shows how to:*  - Convert DataSets to Tables*  - Use Table API expressions**/
object TPCHQuery3Table {// *************************************************************************//     PROGRAM// *************************************************************************def main(args: Array[String]) {if (!parseParameters(args)) {return}// set filter dateval date = "1995-03-12".toDate// get execution environmentval env = ExecutionEnvironment.getExecutionEnvironmentval tEnv = BatchTableEnvironment.create(env)val lineitems = getLineitemDataSet(env).toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate).filter('shipDate.toDate > date)val customers = getCustomerDataSet(env).toTable(tEnv, 'id, 'mktSegment).filter('mktSegment === "AUTOMOBILE")val orders = getOrdersDataSet(env).toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio).filter('orderDate.toDate < date)val items =orders.join(customers).where('custId === 'id).select('orderId, 'orderDate, 'shipPrio).join(lineitems).where('orderId === 'id).select('orderId,'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,'orderDate,'shipPrio)val result = items.groupBy('orderId, 'orderDate, 'shipPrio).select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio).orderBy('revenue.desc, 'orderDate.asc)// emit resultresult.writeAsCsv(outputPath, "\n", "|")// execute programenv.execute("Scala TPCH Query 3 (Table API Expression) Example")}// *************************************************************************//     USER DATA TYPES// *************************************************************************case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)case class Customer(id: Long, mktSegment: String)case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)// *************************************************************************//     UTIL METHODS// *************************************************************************private var lineitemPath: String = _private var customerPath: String = _private var ordersPath: String = _private var outputPath: String = _private def parseParameters(args: Array[String]): Boolean = {if (args.length == 4) {lineitemPath = args(0)customerPath = args(1)ordersPath = args(2)outputPath = args(3)true} else {System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +" Due to legal restrictions, we can not ship generated data.\n" +" You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +" Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +"<orders-csv path> <result path>")false}}private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {env.readCsvFile[Lineitem](lineitemPath,fieldDelimiter = "|",includedFields = Array(0, 5, 6, 10) )}private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {env.readCsvFile[Customer](customerPath,fieldDelimiter = "|",includedFields = Array(0, 6) )}private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {env.readCsvFile[Order](ordersPath,fieldDelimiter = "|",includedFields = Array(0, 1, 4, 7) )}}

1.19.10.15.WordCountSQL

package org.apache.flink.table.examples.scala.basicsimport org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._/*** Simple example that shows how the Batch SQL API is used in Scala.** This example shows how to:*  - Convert DataSets to Tables*  - Register a Table under a name*  - Run a SQL query on the registered Table**/
object WordCountSQL {// *************************************************************************//     PROGRAM// *************************************************************************def main(args: Array[String]): Unit = {// set up execution environmentval env = ExecutionEnvironment.getExecutionEnvironmentval tEnv = BatchTableEnvironment.create(env)val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))// register the DataSet as a view "WordCount"tEnv.createTemporaryView("WordCount", input, $"word", $"frequency")// run a SQL query on the Table and retrieve the result as a new Tableval table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")table.toDataSet[WC].print()}// *************************************************************************//     USER DATA TYPES// *************************************************************************case class WC(word: String, frequency: Long)}

1.19.10.16.WordCountTable

package org.apache.flink.table.examples.scala.basicsimport org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._/*** Simple example for demonstrating the use of the Table API for a Word Count in Scala.** This example shows how to:*  - Convert DataSets to Tables*  - Apply group, aggregate, select, and filter operations**/
object WordCountTable {// *************************************************************************//     PROGRAM// *************************************************************************def main(args: Array[String]): Unit = {// set up execution environmentval env = ExecutionEnvironment.getExecutionEnvironmentval tEnv = BatchTableEnvironment.create(env)val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))val expr = input.toTable(tEnv)val result = expr.groupBy($"word").select($"word", $"frequency".sum as "frequency").filter($"frequency" === 2).toDataSet[WC]result.print()}// *************************************************************************//     USER DATA TYPES// *************************************************************************case class WC(word: String, frequency: Long)}

1.19.10.17.SQL案例

1.19.10.18.Flink之单流kafka写入mysql

配置参考: [jdbc](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html)[kafka](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html)触发方式: 针对每条触发一次source kafka json 数据格式  topic: flink_testmsg: {"day_time": "20201009","id": 7,"amnount":20}创建topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink_test查看topic是否存在了bin/kafka-topics.sh --list --zookeeper localhost:2181[root@flink01 flink-1.12.1]# cd $KAFKA_HOME[root@middleware kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flink_test>{"day_time": "20201009","id": 7,"amnount":20}>

sink mysql 创建语句

CREATE TABLE sync_test_1 (`day_time` varchar(64) NOT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
[root@flink01 ~]# cd $FLINK_HOME
[root@flink01 flink-1.12.1]# bin/sql-client.sh  embedded

配置语句

create table flink_test_1 ( id BIGINT,day_time VARCHAR,amnount BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'flink_test','properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'properties.group.id' = 'flink_gp_test1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = 'xxx.xxx.xxx.xxx:2181/kafka');CREATE TABLE sync_test_1 (day_time string,total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test?characterEncoding=UTF-8','table-name' = 'sync_test_1','username' = 'root','password' = '123456');INSERT INTO sync_test_1
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;

1.19.10.19.Flink之双流kafka写入mysql

cd $FLINK_HOME
./bin/sql-client.sh embedded

source kafka json 数据格式

创建topic

cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink_test_1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink_test_2bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flink_test_1
topic  flink_test_1  {"day_time": "20201011","id": 8,"amnount":211}{"day_time": "20211011","id": 1,"amnount":211}{"day_time": "20211012","id": 2,"amnount":211}bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flink_test_2
topic  flink_test_2  {"id": 8,"coupon_amnount":100}{"id": 1,"coupon_amnount":100}{"id": 2,"coupon_amnount":100}

注意:针对双流中的每条记录都发触发

sink mysql 创建语句

CREATE TABLE `sync_test_2` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`day_time` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`day_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

配置语句

create table flink_test_2_1 ( id BIGINT,day_time VARCHAR,amnount BIGINT,proctime AS PROCTIME ()
)with ( 'connector' = 'kafka','topic' = 'flink_test_1','properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'properties.group.id' = 'flink_gp_test2-1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = 'xxx.xxx.xxx.xxx:2181/kafka');create table flink_test_2_2 ( id BIGINT,coupon_amnount BIGINT,proctime AS PROCTIME ()
)with ( 'connector' = 'kafka','topic' = 'flink_test_2','properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092', 'properties.group.id' = 'flink_gp_test2-2','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = 'xxx.xxx.xxx.xxx:2181/kafka');CREATE TABLE sync_test_2 (day_time string,total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test?characterEncoding=UTF-8','table-name' = 'sync_test_2','username' = 'root','password' = '123456');INSERT INTO sync_test_2
SELECT day_time, SUM(amnount - coupon_amnount) AS total_gmv
FROM (SELECTa.day_time as day_time, a.amnount as amnount, b.coupon_amnount as coupon_amnount FROM flink_test_2_1 as a LEFT JOIN flink_test_2_2 b on b.id = a.id)
GROUP BY day_time;

1.19.10.20.Flink之kafka和mysql维表实时关联写入mysql

source kafka json 数据格式

topic  flink_test_1  {"day_time": "20201011","id": 8,"amnount":211}

dim test_dim

CREATE TABLE `test_dim` (`id` bigint(11) NOT NULL,`coupon_amnount` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;-- ----------------------------
-- Records of test_dim
-- ----------------------------
BEGIN;
INSERT INTO `test_dim` VALUES (1, 1);
INSERT INTO `test_dim` VALUES (3, 1);
INSERT INTO `test_dim` VALUES (8, 1);
COMMIT;

sink mysql 创建语句

CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`day_time` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`day_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

配置语句


create table flink_test_3 ( id BIGINT,day_time VARCHAR,amnount BIGINT,proctime AS PROCTIME ()
)with ( 'connector' = 'kafka','topic' = 'flink_test_1','properties.bootstrap.servers' = '172.25.20.76:9092', 'properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '172.25.20.76:2181/kafka');create table flink_test_3_dim ( id BIGINT,coupon_amnount BIGINT
)with ( 'connector' = 'jdbc','url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8','table-name' = 'test_dim','username' = 'videoweb','password' = 'suntek','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);CREATE TABLE sync_test_3 (day_time string,total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'videoweb','password' = 'suntek');INSERT INTO sync_test_3
SELECT day_time, SUM(amnount - coupon_amnount) AS total_gmv
FROM (SELECT a.day_time as day_time, a.amnount as amnount, b.coupon_amnount as coupon_amnount FROM flink_test_3 as a LEFT JOIN flink_test_3_dim  FOR SYSTEM_TIME AS OF  a.proctime  as bON b.id = a.id)
GROUP BY day_time;

1.19.10.21.Flink滚动窗口案例

source kafka json 数据格式

topic flink_test_4

{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2021-01-05 11:12:12"}
{"username":"zhp","click_url":"https://www.infoq.cn/video/BYSSg4hGR5oZmUFsL8Kb","ts":"2020-01-05 11:12:15"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:12:18"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2021-01-05 11:12:55"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2021-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2021-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2021-01-05 11:13:26"}

sink mysql 创建语句


CREATE TABLE `sync_test_tumble_output` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`window_start` datetime DEFAULT NULL,`window_end` datetime DEFAULT NULL,`username` varchar(255) DEFAULT NULL,`clicks` bigint(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

配置语句

-- -- 开启 mini-batch 指定是否启用小批量优化 (相关配置说明 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/config.html)
SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的时间间隔,即作业需要额外忍受的延迟
SET table.exec.mini-batch.allow-latency=60s;
-- -- 一个 mini-batch 中允许最多缓存的数据
SET table.exec.mini-batch.size=5;create table user_clicks ( username varchar,click_url varchar,ts timestamp,
--   ts BIGINT,
--   ts2 AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')),WATERMARK FOR ts AS ts - INTERVAL '20' SECOND )
with ( 'connector' = 'kafka','topic' = 'flink_test_4','properties.bootstrap.servers' = '172.25.20.76:9092', 'properties.group.id' = 'flink_gp_test4','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '172.25.20.76:2181/kafka');CREATE TABLE sync_test_tumble_output (window_start TIMESTAMP(3),window_end TIMESTAMP(3),username VARCHAR,clicks BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8','table-name' = 'sync_test_tumble_output','username' = 'videoweb','password' = 'suntek');INSERT INTO sync_test_tumble_outputSELECTTUMBLE_START(ts, INTERVAL '60' SECOND) as window_start,TUMBLE_END(ts, INTERVAL '60' SECOND) as window_end,username,COUNT(click_url)FROM user_clicksGROUP BY TUMBLE(ts, INTERVAL '60' SECOND), username;

1.19.10.22.Flink滑动窗口案例

source kafka json 数据格式

topic flink_test_5

{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2020-01-05 11:12:12"}
{"username":"zhp","click_url":"https://www.infoq.cn/video/BYSSg4hGR5oZmUFsL8Kb","ts":"2020-01-05 11:12:15"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:12:18"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2020-01-05 11:12:55"}
{"username":"zhp","click_url":"https://www.infoq.cn/","ts":"2020-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:13:25"}
{"username":"zhp","click_url":"https://www.infoq.cn/talks","ts":"2020-01-05 11:13:26"}

sink mysql 创建语句


CREATE TABLE `sync_test_hop_output` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`window_start` datetime DEFAULT NULL,`window_end` datetime DEFAULT NULL,`username` varchar(255) DEFAULT NULL,`clicks` bigint(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4;

配置语句

-- -- 开启 mini-batch (相关配置说明 https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/config.html)
SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的时间间隔,即作业需要额外忍受的延迟
SET table.exec.mini-batch.allow-latency=60s;
-- -- 一个 mini-batch 中允许最多缓存的数据
SET table.exec.mini-batch.size=5;create table user_clicks ( username varchar,click_url varchar,ts timestamp,--   ts BIGINT,--   ts2 AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND )
with ( 'connector' = 'kafka','topic' = 'flink_test_5','properties.bootstrap.servers' = '172.25.20.76:9092', 'properties.group.id' = 'flink_gp_test5','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '172.25.20.76:2181/kafka');CREATE TABLE sync_test_hop_output (window_start TIMESTAMP(3),window_end TIMESTAMP(3),username VARCHAR,clicks BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://172.25.21.10:3306/flink_web?characterEncoding=UTF-8','table-name' = 'sync_test_hop_output','username' = 'videoweb','password' = 'suntek');--统计每个用户过去1分钟的单击次数,每30秒更新1次,即1分钟的窗口,30秒滑动1次INSERT INTO sync_test_hop_outputSELECTHOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start,HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end,username,COUNT(click_url)FROM user_clicksGROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username;

1.19.10.23.Mysql-cdc,ElasticSearch connector

1. datagen简介
在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

  • DataGen 连接器允许按数据生成规则进行读取。
  • DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。
  • DataGen 连接器是内置的。

具体的使用方法可以先看下官网的概述

使用时注意如下:

  • 目前随机生成只支持基本数据类型:数字类型(TINYINT、SMALLINT、int、bigint、FLOAT、double)、字符串类型(VARCHAR、char),
    以及boolean类型,不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型
  • 目前有两种数据生成器,一种是随机生成器(默认),这个是无界的,另一个是序列生成器,是有界的。
  • 字段中只要有一个是按序列生成的,也就是有界的,程序就会在序列结束的时候退出。如果所有字段都是随机生成的,则程序最终不会结束。
  • 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。
    它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,
    这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。
  • 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义
  1. 使用示例
    2.1. 在flink sql-client中使用
    进入客户端
bin/sql-client.sh embedded
或
bin/sql-client.sh embedded -l 依赖的jar包路径

flink SQL测试

CREATE TABLE datagen (f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen',-- optional options --'rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10'
);select * from datagen;

2.2 参数解释
DDL的with属性中,除了connector是必填之外,其他都是可选的。
rows-per-second 每秒生成的数据条数
f_sequence字段的生成策略是按序列生成,并且指定了起始值,所以该程序将会在到达序列的结束值之后退出
f_random 字段是按照随机生成,并指定随机生成的范围
f_random_str是一个字符串类型,属性中指定了随机生成字符串的长度是10
ts列是一个计算列,返回当前的时间.

1.19.10.24.Flink datagen案例

  1. datagen简介
    在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
  • DataGen 连接器允许按数据生成规则进行读取。
  • DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。
  • DataGen 连接器是内置的。

具体的使用方法可以先看下官网的概述

使用时注意如下:

  • 目前随机生成只支持基本数据类型:数字类型(TINYINT、SMALLINT、int、bigint、FLOAT、double)、字符串类型(VARCHAR、char),
    以及boolean类型,不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型
  • 目前有两种数据生成器,一种是随机生成器(默认),这个是无界的,另一个是序列生成器,是有界的。
  • 字段中只要有一个是按序列生成的,也就是有界的,程序就会在序列结束的时候退出。如果所有字段都是随机生成的,则程序最终不会结束。
  • 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。
    它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,
    这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。
  • 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义
  1. 使用示例
    2.1. 在flink sql-client中使用
    进入客户端
bin/sql-client.sh embedded
或
bin/sql-client.sh embedded -l 依赖的jar包路径

flink SQL测试

CREATE TABLE datagen (f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen',-- optional options --'rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10'
);select * from datagen;

2.2 参数解释
DDL的with属性中,除了connector是必填之外,其他都是可选的。
rows-per-second 每秒生成的数据条数
f_sequence字段的生成策略是按序列生成,并且指定了起始值,所以该程序将会在到达序列的结束值之后退出
f_random 字段是按照随机生成,并指定随机生成的范围
f_random_str是一个字符串类型,属性中指定了随机生成字符串的长度是10
ts列是一个计算列,返回当前的时间.

1.19.10.25.Upsert Kafka SQL连接器

将flink的connector的jar包、flink-json、flink avro相关的包放到flink的lib目录下
创建两个topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pageviews
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pageviews_per_region

查看topic列表

bin/kafka-topics.sh --zookeeper localhost:2181 --list

生产者生产数据

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic pageviews
>{"user_id":11,"page_id":1,"viewtime":"2007-12-03 10:15:30","user_region":"hangzhou"}
>{"user_id":12,"page_id":2,"viewtime":"2008-12-03 10:15:30","user_region":"hangzhou"}
>{"user_id":13,"page_id":3,"viewtime":"2009-12-03 10:15:30","user_region":"hangzhou"}
>{"user_id":14,"page_id":4,"viewtime":"2010-12-03 10:15:30","user_region":"henan"}

消费者消费数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pageviews_per_region
CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092','key.format' = 'avro','value.format' = 'avro'
);CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092','format' = 'json'
);-- 计算 pv、uv 并插入到 upsert-kafka sink
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

1.19.10.26.Flink Elasticsearch connector相关的案例

ES数据操作查询工具
http://xxx.xxx.xxx.xxx:5601/ ,进入Dev Tools

ES数据操作
创建记录index visit ,此记录id为testx1

POST /visit/_doc/testx1
{"user_id":1,"user_name":"zhangsan","uv":10,"pv":20
}

查询记录

get /visit/_doc/testx1

删除记录

delete /visit/_doc/testx1

创建目标表,ES只能作为目标表

CREATE TABLE myUserTable (user_id STRING,user_name STRING,u_v BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-6','hosts' = 'http://xxx.xxx.xxx.xxx:9200','index' = 'visit','document-type'='_doc'
);

模拟数据:
插入数据后,数据将在ES中出现

INSERT INTO default_database.myUserTable VALUES ('2','wangwu',11,21),('3','zhaoliu',21,31);

1.19.10.Flink SQL工程案例\Flink批式处理\自定义函数\Window窗口计算\将DataSet数据转成Table数据\将Table数据转成DataSet等相关推荐

  1. Flink实时仓库-DWS层(关键词搜索分析-自定义函数,窗口操作,FlinkSql设置水位线,保存数据到Clickhouse)模板代码

    简介 关键词搜索实时分析,技术要点,自定义FlinkSql函数,使用聚合功能把结果输出到clickhourse 前置知识 ik分词 <dependency><groupId>c ...

  2. flink sql实战案例

    目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...

  3. python如何定义一个函数选择题_Python逻辑题(以下10个选择题有唯一答案,请编写自定义函数judge遍历答案列表并报告正确答案),python,道,汇报...

    python逻辑题(以下10道选择题有唯一答案,请编写自定义函数judge遍历答案列表并汇报正确答案) 1.题目.代码及答案演示 代码: 逻辑题 说明 :个人没用过 Jupyter Notebook ...

  4. python编写的函数调用说法正确的是_python逻辑题(以下10道选择题有唯一答案,请编写自定义函数judge遍历答案列表并汇报正确答案)...

    python逻辑题(以下10道选择题有唯一答案,请编写自定义函数judge遍历答案列表并汇报正确答案) 1.题目.代码及答案演示 代码: 逻辑题 说明:个人没用过Jupyter Notebook,题目 ...

  5. Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现

    TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜.流式的 TopN 不同于批处理的 TopN,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,然后当排行榜 ...

  6. flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    感谢您的关注  +  点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 背景篇-为啥需要 redis 维表 目标篇-做 redis 维表的预期效果是什么 ...

  7. Flink实战(八十):flink-sql使用(七)Flink SQL Clien读取Kafka数据流式写入Hive(用hive 管理kafka元数据)

    声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习. <2021年最新版大数据面试题全面开启更新> 版本说明: Flink 1.11.2 Kafka 2.4.0 Hive 3 ...

  8. 案例解析丨Spark Hive自定义函数应用

    摘要:Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数. 1. 简介 Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数.UDF使用场景:输入一行,返回一个结果,一对一, ...

  9. flink sql client讀取kafka數據的timestamp(DDL方式)

    实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...

  10. Flink入门第十二课:DataStream api/Flink sql实现每隔5分钟统计最近一小时热门商品小案例

    用到的数据文件 用到的数据文件 链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 提取码:2hmu 1.需求 & 数据 用户行为数据不断写入 ...

最新文章

  1. python基础学习笔记(六)
  2. 控制~Matlab非线性模型分析
  3. Python自动化3.0-------学习之路-------函数!
  4. 意想不到的有趣linux命令
  5. 分布式系统原理 之4 Quorum 机制
  6. adb android源码分析,Android源码分析(十六)----adb shell 命令进行OTA升级
  7. 滑动翻页效果_Flutter实现3D效果,一个字,炫!
  8. [Web开发] 如何改变IE滚动条的颜色
  9. Docker客户端与守护进程
  10. 什么技术才值得你长期投入? | 凌云时刻
  11. linux 编码转换iconv命令
  12. Java实现随机验证码和验证码图片渲染功能
  13. 金蝶服务器存在但无账套信息,金蝶KIS专业版
  14. 利用IDM下载QQ群文件
  15. 【西祠日志】【19】【20】有人说,21天可以养一种习惯
  16. 受限玻尔兹曼机(RBM)学习笔记(四)对数似然函数
  17. ROS配置多机器人导航
  18. html代码硬件加速优化,详解CSS3开启硬件加速的使用和坑
  19. teamview更换Id
  20. 解读波卡崛起之路:DOT大涨,只是顺势而为罢了

热门文章

  1. Dubbo-Adaptive实现解析
  2. shuipfcms二次开发之图片上传
  3. 史上超级详细:银行外包java面试题目
  4. SQL对象名无效的解决
  5. python-爬取东方财富网期货市场大商所数据
  6. 获取文件哈希值_哈希(hash)是个啥?
  7. 数字电视标准5种规格720p、1080i和…
  8. 加权平均np.average()
  9. 2021-1-30最短路入门
  10. Spark多版本共存