JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法
一、Flink项目依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://maven.apache.org/POM/4.0.0"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hainiu</groupId><artifactId>hainiuflink</artifactId><version>1.0</version><properties><java.version>1.8</java.version><scala.version>2.11</scala.version><flink.version>1.9.3</flink.version><parquet.version>1.10.0</parquet.version><hadoop.version>2.7.3</hadoop.version><fastjson.version>1.2.72</fastjson.version><redis.version>2.9.0</redis.version><mysql.version>5.1.35</mysql.version><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.7</slf4j.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.build.scope>compile</project.build.scope><!-- <project.build.scope>provided</project.build.scope>--><mainClass>com.hainiu.Driver</mainClass></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的hadoop兼容 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的hadoop兼容 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的java的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink streaming的java的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的scala的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink streaming的scala的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink运行时的webUI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- 使用rocksdb保存flink的state --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink操作hbase --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hbase_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink操作es --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink 的kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink 写文件到HDFS --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- mysql连接驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version><scope>${project.build.scope}</scope></dependency><!-- redis连接 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink操作parquet文件格式 --><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>${parquet.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>${parquet.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- json操作 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version></dependency></dependencies><build><resources><resource><directory>src/main/resources</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><descriptors><descriptor>src/assembly/assembly.xml</descriptor></descriptors><archive><manifest><mainClass>${mainClass}</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12</version><configuration><skip>true</skip><forkMode>once</forkMode><excludes><exclude>**/**</exclude></excludes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin></plugins></build></project>
二、JavaFlink案例
package com.linwj.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.Arrays;public class test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment scc = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> socket = scc.socketTextStream("localhost", 6666); // nc -lk 6666// // 1.lambda写法
// SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap((String a, Collector<String> out) -> { // 传入一个收集工具 Collector<String> out,更多案例见https://vimsky.com/examples/detail/java-method-org.apache.flink.util.Collector.collect.html
// Arrays.stream(a.split(" ")).forEach((x1) -> {
// out.collect(x1);
// });
// }).returns(Types.STRING);t1_flatmap.print();
//
// SingleOutputStreamOperator<Tuple3<String, Integer, String>> t2_map =
// t1_flatmap.map((x1) -> Tuple3.of(x1, 1, "gg")).returns(Types.TUPLE(Types.STRING,Types.INT,Types.STRING));t2_map.print();
//
// SingleOutputStreamOperator<Tuple3<String, Integer, String>> t3_keyBy_sum =
// t2_map.keyBy(0).sum(1); //没有->lambda表达式无需再声明返回类型
// t3_keyBy_sum.print();
//
// scc.execute();
//
// // 2.function写法
// /*
// 1)匿名内部类的格式: new 父类名&接口名(){ 定义子类成员或者覆盖父类方法 }.方法。而内部类是有自己定的类名的。
// 2) FlatMapFunction<String,String>如果不写泛型会编译报错:Class 'Anonymous class derived from FlatMapFunction' must either be declared abstract or implement abstract method 'flatMap(T, Collector<O>)' in 'FlatMapFunction'
// 保持<String,String>对应实现接口或类的泛型
// */
// SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap(new FlatMapFunction<String,String>() {
// @Override
// public void flatMap(String value, Collector<String> out) throws Exception {
// String[] s = value.split(" ");
// for (String ss : s) {
// out.collect(ss);
// }
// }
// });t1_flatmap.print();
//
// SingleOutputStreamOperator<Tuple2<String, Integer>> t2_map = t1_flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
// @Override
// public Tuple2<String, Integer> map(String s) throws Exception {
// return Tuple2.of(s, 1);
// }
// });t2_map.print();
//
// SingleOutputStreamOperator<Tuple2<String, Integer>> t3_keyBy_sum = t2_map.keyBy(0).sum(1);
// t3_keyBy_sum.print();// // 3.function组合写法
// SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
//
// @Override
// public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// String[] s = value.split(" ");
// for (String ss : s) {
// out.collect(Tuple2.of(ss, 1));
// }
// }
// });
//
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
// sum.print();// // 4.richfunction组合写法
// SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
// private String name = null;
//
// @Override
// public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// String[] s = value.split(" ");
// for (String ss : s) {
// System.out.println(getRuntimeContext().getIndexOfThisSubtask()); //RichMapFunction富函数,额外提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
// out.collect(Tuple2.of(name + ss, 1));
// }
//
// }
//
// @Override
// public void open(Configuration parameters) {
// name = "linwj_";
// }
//
// @Override
// public void close() {
// name = null;
// }
//
// });
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
// sum.print();//5.processfunction组合写法SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {private String name = null;@Overridepublic void open(Configuration parameters) throws Exception {name = "linwj";}@Overridepublic void close() throws Exception {name = null;}@Overridepublic void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
// getRuntimeContext()String[] s = value.split(" ");for (String ss : s) {System.out.println(getRuntimeContext().getIndexOfThisSubtask());out.collect(Tuple2.of(name + ss, 1));}}}).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {private Integer num = 0;@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {num += value.f1;out.collect(Tuple2.of(value.f0,num));}});sum.print();scc.execute();}
}
JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法相关推荐
- 智慧物业小程序_智慧小区物业管理小程序搭建开发有现成案例
智慧小区物业管理小程序搭建开发有现成案例 [欢迎手机致电:沈经理153.1556.5651 微信同步]你我您社区团购模式平台开发,你我您社区商城购物便捷取货模式开发,你我您社区app小程序系统开发,支 ...
- 新手小白入门编程第3讲 JAVA入门案例
1 HelloWorld案例 1.1 工作空间设置 工作空间就是一个文件夹,用来保存我们所有的开发文件和代码等等. 工作空间也是可以切换的,但是切换了工作空间,我们对于eclipse的设置就重置了. ...
- Mybatis学习IDEA(1)-环境搭建以及入门案例
Mybatis的环境搭建: 第一步:创建Maven项目 new Project->Maven->next->GroupId(反写域名如:com.itheima)ArtifactID( ...
- 给新入坑的小伙伴们的郑氏Java入门指南
欢迎入坑郑老师的Java课堂 1. 引言 又是一年的Java季,又有一批新的小伙伴们要在郑老师的指引下进入Java课程的学习之路啦. 首先我想告诉大家的是,无论你大一的编程学习好与坏,都不用担心,Ja ...
- 我爱Java系列---【 maven依赖版本冲突的解决方法】
我爱Java系列---[ maven依赖版本冲突的解决方法] 参考文章: (1)我爱Java系列---[ maven依赖版本冲突的解决方法] (2)https://www.cnblogs.com/hu ...
- 使用LINUX云服务器搭建web、小程序服务器MySql+Java+Tomcat
记小白第一次使用LINUX搭建web.小程序服务器 MySql+Java+Tomcat (很全的安装过程收集整合) 一.使用LINUX搭建服务器的原因 准备工具 二.下载配置 2.1 阿里云服务器的购 ...
- SpringBoot简介、SpringBoot 入门程序搭建、与JDBC、Druid、Mybatis和SpringData JPA的整合
一.SpringBoot 简介: spring boot并不是一个全新的框架,它不是spring解决方案的一个替代品,而是spring的一个封装.所以,你以前可以用spring做的事情,现在用spri ...
- springmvc与mysql实例_Spring+Mybatis+SpringMVC+Maven+MySql搭建实例
摘要:本文主要讲了如何使用Maven来搭建Spring+Mybatis+SpringMVC+MySql的搭建实例,文章写得很详细,有代码有图片,最后也带有运行的效果. 一.准备工作 1. 首先创建一个 ...
- 在Mac上使用idea搭建flink java开发环境
1.环境 本文档记录的是使用flink的java API简单地创建应用的过程. 前置条件:需要安装Java.maven和flink. 1.1 Java环境变量 Java需要jdk.path.class ...
最新文章
- Maven Eclipse Run as 命令
- C++11中std::function的使用
- 第三届全国大学生智能汽车竞赛获奖名单
- python程序设计报告-20183122 实验一《Python程序设计》实验报告
- shellcode 编码技术
- 本地tomcat的start.bat启动时访问不出现小猫图标
- greenplum配置高可用_GREENPLUM介绍之数据库管理(七)- 配置数据库高可用性之master镜像 | 学步园...
- C语言指针类型和类型转换
- Catalent康泰伦特上海外高桥临床供应中心扩建温控产能;富士胶片发布2021财年第三季度财报 | 医药健闻...
- 10000个怎么用js写 创建li_JavaScript给每一个li节点绑定点击事件的实现方法
- python 基础系列(四) — Python中的面向对象
- pageoffice如何离线申请_pageoffice常见问题
- 独孤木专栏Delayed Project(中)
- 宝藏级别的负数取模,让你关于负数取模不在陌生 >o< 进来看看吧
- 摄像头录像时出现连接错误
- html调用头尾html,合并html头尾 - 基础支持 - 用gulp搭建前后分离的开发环�? - KK的小故事...
- python的名字空间(namespace)与作用域(scope)
- TKinter布局之pack
- 完美国际2找回老账号服务器,我告诉大家一个找回账号的办法
- nyist--周期串