一、Flink项目依赖配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="http://maven.apache.org/POM/4.0.0"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hainiu</groupId><artifactId>hainiuflink</artifactId><version>1.0</version><properties><java.version>1.8</java.version><scala.version>2.11</scala.version><flink.version>1.9.3</flink.version><parquet.version>1.10.0</parquet.version><hadoop.version>2.7.3</hadoop.version><fastjson.version>1.2.72</fastjson.version><redis.version>2.9.0</redis.version><mysql.version>5.1.35</mysql.version><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.7</slf4j.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.build.scope>compile</project.build.scope><!--        <project.build.scope>provided</project.build.scope>--><mainClass>com.hainiu.Driver</mainClass></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的hadoop兼容 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的hadoop兼容 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的java的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink streaming的java的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink的scala的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink streaming的scala的api --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink运行时的webUI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- 使用rocksdb保存flink的state --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink操作hbase --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hbase_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink操作es --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink 的kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink 写文件到HDFS --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- mysql连接驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version><scope>${project.build.scope}</scope></dependency><!-- redis连接 --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis.version}</version><scope>${project.build.scope}</scope></dependency><!-- flink操作parquet文件格式 --><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>${parquet.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>${parquet.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet_${scala.version}</artifactId><version>${flink.version}</version><scope>${project.build.scope}</scope></dependency><!-- json操作 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version><scope>${project.build.scope}</scope></dependency><dependency><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version></dependency></dependencies><build><resources><resource><directory>src/main/resources</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><descriptors><descriptor>src/assembly/assembly.xml</descriptor></descriptors><archive><manifest><mainClass>${mainClass}</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12</version><configuration><skip>true</skip><forkMode>once</forkMode><excludes><exclude>**/**</exclude></excludes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin></plugins></build></project>

二、JavaFlink案例

package com.linwj.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.Arrays;public class test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment scc = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> socket = scc.socketTextStream("localhost", 6666);  // nc -lk 6666//        // 1.lambda写法
//        SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap((String a, Collector<String> out) -> {   // 传入一个收集工具 Collector<String> out,更多案例见https://vimsky.com/examples/detail/java-method-org.apache.flink.util.Collector.collect.html
//            Arrays.stream(a.split(" ")).forEach((x1) -> {
//                out.collect(x1);
//            });
//        }).returns(Types.STRING);t1_flatmap.print();
//
//        SingleOutputStreamOperator<Tuple3<String, Integer, String>> t2_map =
//                t1_flatmap.map((x1) -> Tuple3.of(x1, 1, "gg")).returns(Types.TUPLE(Types.STRING,Types.INT,Types.STRING));t2_map.print();
//
//        SingleOutputStreamOperator<Tuple3<String, Integer, String>> t3_keyBy_sum =
//                t2_map.keyBy(0).sum(1); //没有->lambda表达式无需再声明返回类型
//        t3_keyBy_sum.print();
//
//        scc.execute();
//
//        // 2.function写法
//        /*
//        1)匿名内部类的格式: new 父类名&接口名(){ 定义子类成员或者覆盖父类方法 }.方法。而内部类是有自己定的类名的。
//        2) FlatMapFunction<String,String>如果不写泛型会编译报错:Class 'Anonymous class derived from FlatMapFunction' must either be declared abstract or implement abstract method 'flatMap(T, Collector<O>)' in 'FlatMapFunction'
//        保持<String,String>对应实现接口或类的泛型
//         */
//        SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap(new FlatMapFunction<String,String>() {
//            @Override
//            public void flatMap(String value, Collector<String> out) throws Exception {
//                String[] s = value.split(" ");
//                for (String ss : s) {
//                    out.collect(ss);
//                }
//            }
//        });t1_flatmap.print();
//
//        SingleOutputStreamOperator<Tuple2<String, Integer>> t2_map = t1_flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> map(String s) throws Exception {
//                return Tuple2.of(s, 1);
//            }
//        });t2_map.print();
//
//        SingleOutputStreamOperator<Tuple2<String, Integer>> t3_keyBy_sum = t2_map.keyBy(0).sum(1);
//        t3_keyBy_sum.print();//        // 3.function组合写法
//        SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
//
//            @Override
//            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                String[] s = value.split(" ");
//                for (String ss : s) {
//                    out.collect(Tuple2.of(ss, 1));
//                }
//            }
//        });
//
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
//        sum.print();//        // 4.richfunction组合写法
//        SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
//            private String name = null;
//
//            @Override
//            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                String[] s = value.split(" ");
//                for (String ss : s) {
//                    System.out.println(getRuntimeContext().getIndexOfThisSubtask()); //RichMapFunction富函数,额外提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
//                    out.collect(Tuple2.of(name + ss, 1));
//                }
//
//            }
//
//            @Override
//            public void open(Configuration parameters) {
//                name = "linwj_";
//            }
//
//            @Override
//            public void close() {
//                name = null;
//            }
//
//        });
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
//        sum.print();//5.processfunction组合写法SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {private String name = null;@Overridepublic void open(Configuration parameters) throws Exception {name = "linwj";}@Overridepublic void close() throws Exception {name = null;}@Overridepublic void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
//                getRuntimeContext()String[] s = value.split(" ");for (String ss : s) {System.out.println(getRuntimeContext().getIndexOfThisSubtask());out.collect(Tuple2.of(name + ss, 1));}}}).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {private Integer num = 0;@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {num += value.f1;out.collect(Tuple2.of(value.f0,num));}});sum.print();scc.execute();}
}

JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法相关推荐

  1. 智慧物业小程序_智慧小区物业管理小程序搭建开发有现成案例

    智慧小区物业管理小程序搭建开发有现成案例 [欢迎手机致电:沈经理153.1556.5651 微信同步]你我您社区团购模式平台开发,你我您社区商城购物便捷取货模式开发,你我您社区app小程序系统开发,支 ...

  2. 新手小白入门编程第3讲 JAVA入门案例

    1 HelloWorld案例 1.1 工作空间设置 工作空间就是一个文件夹,用来保存我们所有的开发文件和代码等等. 工作空间也是可以切换的,但是切换了工作空间,我们对于eclipse的设置就重置了. ...

  3. Mybatis学习IDEA(1)-环境搭建以及入门案例

    Mybatis的环境搭建: 第一步:创建Maven项目 new Project->Maven->next->GroupId(反写域名如:com.itheima)ArtifactID( ...

  4. 给新入坑的小伙伴们的郑氏Java入门指南

    欢迎入坑郑老师的Java课堂 1. 引言 又是一年的Java季,又有一批新的小伙伴们要在郑老师的指引下进入Java课程的学习之路啦. 首先我想告诉大家的是,无论你大一的编程学习好与坏,都不用担心,Ja ...

  5. 我爱Java系列---【 maven依赖版本冲突的解决方法】

    我爱Java系列---[ maven依赖版本冲突的解决方法] 参考文章: (1)我爱Java系列---[ maven依赖版本冲突的解决方法] (2)https://www.cnblogs.com/hu ...

  6. 使用LINUX云服务器搭建web、小程序服务器MySql+Java+Tomcat

    记小白第一次使用LINUX搭建web.小程序服务器 MySql+Java+Tomcat (很全的安装过程收集整合) 一.使用LINUX搭建服务器的原因 准备工具 二.下载配置 2.1 阿里云服务器的购 ...

  7. SpringBoot简介、SpringBoot 入门程序搭建、与JDBC、Druid、Mybatis和SpringData JPA的整合

    一.SpringBoot 简介: spring boot并不是一个全新的框架,它不是spring解决方案的一个替代品,而是spring的一个封装.所以,你以前可以用spring做的事情,现在用spri ...

  8. springmvc与mysql实例_Spring+Mybatis+SpringMVC+Maven+MySql搭建实例

    摘要:本文主要讲了如何使用Maven来搭建Spring+Mybatis+SpringMVC+MySql的搭建实例,文章写得很详细,有代码有图片,最后也带有运行的效果. 一.准备工作 1. 首先创建一个 ...

  9. 在Mac上使用idea搭建flink java开发环境

    1.环境 本文档记录的是使用flink的java API简单地创建应用的过程. 前置条件:需要安装Java.maven和flink. 1.1 Java环境变量 Java需要jdk.path.class ...

最新文章

  1. Maven Eclipse Run as 命令
  2. C++11中std::function的使用
  3. 第三届全国大学生智能汽车竞赛获奖名单
  4. python程序设计报告-20183122 实验一《Python程序设计》实验报告
  5. shellcode 编码技术
  6. 本地tomcat的start.bat启动时访问不出现小猫图标
  7. greenplum配置高可用_GREENPLUM介绍之数据库管理(七)- 配置数据库高可用性之master镜像 | 学步园...
  8. C语言指针类型和类型转换
  9. Catalent康泰伦特上海外高桥临床供应中心扩建温控产能;富士胶片发布2021财年第三季度财报 | 医药健闻...
  10. 10000个怎么用js写 创建li_JavaScript给每一个li节点绑定点击事件的实现方法
  11. python 基础系列(四) — Python中的面向对象
  12. pageoffice如何离线申请_pageoffice常见问题
  13. 独孤木专栏Delayed Project(中)
  14. 宝藏级别的负数取模,让你关于负数取模不在陌生 >o< 进来看看吧
  15. 摄像头录像时出现连接错误
  16. html调用头尾html,合并html头尾 - 基础支持 - 用gulp搭建前后分离的开发环�? - KK的小故事...
  17. python的名字空间(namespace)与作用域(scope)
  18. TKinter布局之pack
  19. 完美国际2找回老账号服务器,我告诉大家一个找回账号的办法
  20. nyist--周期串

热门文章

  1. 十五天学会Autodesk Inventor,看完这一系列就够了(十一),放样和螺旋扫掠(绘弹簧)
  2. 如何提交高质量的缺陷报告
  3. IOS 笔记大全 (UI控件到网络协议)
  4. 计算机网络判断题(详细解析)
  5. 阿里P6架构师的成长之路,我只用了5个月.....
  6. 简单网页设计静态成品分享
  7. Android可更换布局的换肤方案
  8. 自考本科计算机有哪些专业可以选,自考本科有什么专业可以选择
  9. 创业的捷径!打造黄金人脉!
  10. 三本类计算机行色专业,高考倒计时50天:物理34个易错易忘知识点