Flink分为: DataSet(批处理),DataStream(流处理),他们的方法都分别为Source、Transformation、Sink:

Source:负责数据的读取
Transformation:负责对数据的转换
Sink:负责计算好的结果数据输出

一、source分类

单并行的Source直接实现了SourceFunction接口

//无限数据流
1、socketTextStream("localhost", 8888)//有限数据流(读完就停掉)
2、fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))//有限数据流
3、fromElements(1,2,3,4,5,6,7,8,9,10,11,12)

多并行的Source实现ParallelSourceFunction接口

//有限数据流
4、generateSequence(1, 100)   //有限数据流
5、fromParallelCollection(new NumberSequenceIterator(1L, 20L), Long.class)    //无限数据流
6、readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);readFile(new TextInputFormat(null), path, FileProcessingMode.PROCESS_ONCE, 2000);//有限数据流
7、readTextFile(path)//自定义
8、addSource()

kafka的Source:

package cn._51doit.flink.day01;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;public class KafkaSourceDemo {public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();//设置webui的端口configuration.setInteger("rest.port", 8082);//创建这个environment,可以有ui页面看,不用提交到集群StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);//设置Kafka相关参数Properties properties = new Properties();//设置Kafka的地址和端口properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-1.51doit.cn:9092,node-1.51doit.cn:9092");//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读properties.setProperty("auto.offset.reset", "earliest");//设置消费者组IDproperties.setProperty("group.id", "g1");//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量properties.setProperty("enable.auto.commit", "true");//创建FlinkKafkaConsumer并传入相关参数FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("wordcount18", //要读取数据的Topic名称new SimpleStringSchema(), //读取文件的反序列化Schemaproperties //传入Kafka的参数);//在checkpoint时,不将偏移量写入到kafka特殊的topic中kafkaConsumer.setCommitOffsetsOnCheckpoints(false);//使用addSource添加kafkaConsumerDataStreamSource<String> lines = env.addSource(kafkaConsumer);//lines.print();env.execute();}
}

自定义的Soure:

1、单个并行:

package cn._51doit.flink.day02;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Arrays;
import java.util.List;public class CustomSource01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2); //设置Env的并行度DataStreamSource<String> streamSource = env.addSource(new MySource1());System.out.println("这个自定义的Source的并行度:" + streamSource.getParallelism());streamSource.print();env.execute();}//实现SourceFunction接口的Source,并行度为1,即非并行的Sourcepublic static class MySource1 implements SourceFunction<String> {@Overridepublic void run(SourceContext<String> ctx) throws Exception {List<String> words = Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee");for (String word : words) {ctx.collect(word); //将Source产生的数据输出}}@Overridepublic void cancel() {}}
}

2、多个并行:

package cn._51doit.flink.day02;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Arrays;
import java.util.List;public class CustomSource02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//env.setParallelism(1); //设置Env的并行度DataStreamSource<String> streamSource = env.addSource(new MySource2());System.out.println("这个自定义的MySource2的并行度:" + streamSource.getParallelism());streamSource.print();env.execute();}//实现ParallelSourceFunction接口的Source, 可以是多并行的public static class MySource2 implements ParallelSourceFunction<String> {@Overridepublic void run(SourceContext<String> ctx) throws Exception {List<String> words = Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee");for (String word : words) {ctx.collect(word); //将Source产生的数据输出}}@Overridepublic void cancel() {}}
}

3、多个并行(推荐):

package cn._51doit.flink.day02;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Arrays;
import java.util.List;
import java.util.UUID;public class CustomSource03 {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(3); //设置Env的并行度DataStreamSource<String> streamSource = env.addSource(new MySource3());System.out.println("这个自定义的MySource2的并行度:" + streamSource.getParallelism());streamSource.print();env.execute();}//继承RichParallelSourceFunction抽象类的Source, 是多并行的Sourcepublic static class MySource3 extends RichParallelSourceFunction<String> {public MySource3() {System.out.println("constructor invoked");}//1.调用MySource3构造方法//2.调用open方法,调用一次//3.调用run方法//4.调用cancel方法停止//5.调用close方法释放资源private boolean flag = true;@Overridepublic void open(Configuration parameters) throws Exception {int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();System.out.println("subTask: " + indexOfThisSubtask + " open method invoked");}@Overridepublic void close() throws Exception {int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();System.out.println("subTask: " + indexOfThisSubtask + " close method invoked");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();System.out.println("subTask: " + indexOfThisSubtask + " run method invoked");while (flag) {ctx.collect("subTask: " + indexOfThisSubtask + " " + UUID.randomUUID().toString());Thread.sleep(2000);}}@Overridepublic void cancel() {int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();System.out.println("subTask: " + indexOfThisSubtask + " cancel method invoked");flag = false;}}
}

备注:使用createLocalEnvironmentWithWebUI需要导入依赖,这样可以本地访问flink

二、Sink分类

1、print():   --打印2、writeToSocket("localhost", 9999, new SimpleStringSchema())  --数据写入端口号3.addSink()

4、kafka的Sink:

package cn._51doit.flink.day02;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;public class KafkaSinkDemo {public static void main(String[] args) throws Exception {//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:" + parallelism0);DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:" + parallelism);FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "wordcount18", new SimpleStringSchema());lines.addSink(kafkaProducer);env.execute();}
}

5、redis的Sink:

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;/*** 从指定的socket读取数据,对单词进行计算,将结果写入到Redis中*/
public class RedisSinkDemo {public static void main(String[] args) throws Exception {//创建Flink流计算执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//SourceDataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//调用TransformationSingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {//new Tuple2<String, Integer>(word, 1)collector.collect(Tuple2.of(word, 1));}}});//分组KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tp) throws Exception {return tp.f0;}});//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);//调用Sink//summed.addSink()FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node-3.51doit.cn").setPassword("123456").setDatabase(8).build();summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));//启动执行env.execute("StreamingWordCount");}//自定义 RedisWordCountMapperpublic static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");}@Overridepublic String getKeyFromData(Tuple2<String, Integer> data) {return data.f0;}@Overridepublic String getValueFromData(Tuple2<String, Integer> data) {return data.f1.toString();}}}

6、streamingFile的Sink

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;public class StreamingFileSinkDemo {public static void main(String[] args) throws Exception {System.setProperty("HADOOP_USER_NAME", "root");//local模式默认的并行度是当前机器的逻辑核的数量Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.enableCheckpointing(5000);int parallelism0 = env.getParallelism();System.out.println("执行环境默认的并行度:" + parallelism0);DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//获取DataStream的并行度int parallelism = lines.getParallelism();System.out.println("SocketSource的并行度:" + parallelism);//构建文件滚动生成的策略DefaultRollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(30 * 1000L) //30秒滚动生成一个文件.withMaxPartSize(1024L * 1024L * 100L) //当文件达到100m滚动生成一个文件.build();//创建StreamingFileSink,数据以行格式写入StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("hdfs://node-1.51doit.cn:9000/out84"), //指的文件存储目录new SimpleStringEncoder<String>("UTF-8")) //指的文件的编码.withRollingPolicy(rollingPolicy) //传入文件滚动生成策略.build();//调用DataStream的addSink添加该Sinklines.addSink(sink);env.execute();}public static class MyPrintSink extends RichSinkFunction<String> {private int indexOfThisSubtask;@Overridepublic void open(Configuration parameters) throws Exception {RuntimeContext runtimeContext = getRuntimeContext();indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();}@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(indexOfThisSubtask + 1 + "> " + value);}}
}

7、JDBC的Sink

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 从指定的socket读取数据,对单词进行计算,最后将结果写入到MySQL*/
public class JDBCSinkDemo {public static void main(String[] args) throws Exception {//创建Flink流计算执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);//创建DataStream//SourceDataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//调用Transformation开始//调用TransformationSingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {//new Tuple2<String, Integer>(word, 1)collector.collect(Tuple2.of(word, 1));}}});//分组KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tp) throws Exception {return tp.f0;}});//聚合SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);summed.addSink(JdbcSink.sink("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?",(ps, t) -> {ps.setString(1, t.f0);ps.setInt(2, t.f1);ps.setInt(3, t.f1);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8").withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456").build()));//启动执行env.execute("StreamingWordCount");}
}

三、Transformation

1、map算子

接收一个数据,经过处理之后,就返回一个数据
比如从给一个文件中读取数据,返回每一行的字符串长度

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapDemo1 {public static void main(String[] args) throws Exception {//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//sparkDataStreamSource<String> words = env.socketTextStream("localhost", 8888);//将输入的单词变大写SingleOutputStreamOperator<String> upperWord = words.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}});upperWord.print();env.execute();}
}

2、flatmap算子

接收一个数据,可以返回多条数据
比如:将每一行数据按照逗号隔开,输出多行数据

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {if (!"error".equals(word)) {out.collect(word);}}}});//        SingleOutputStreamOperator<String> words = lines.transform(
//                "MyFlatMap",
//                TypeInformation.of(String.class),
//                new MyStreamFlatMap()
//        );words.print();env.execute();}//自定义flatMap算子public static class MyStreamFlatMap extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {@Overridepublic void processElement(StreamRecord<String> element) throws Exception {String line = element.getValue();String[] words = line.split(" ");for (String word : words) {if(!word.equals("error")) {output.collect(element.replace(word));}}}}
}

3、filter算子

过滤数据

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;public class FilterDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);SingleOutputStreamOperator<Integer> even = nums.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0;}});//SingleOutputStreamOperator<Integer> even = nums.filter(i -> i % 2 == 0);//SingleOutputStreamOperator<Integer> even = nums.transform("MyFilter", TypeInformation.of(Integer.class),//       new MyStreamFilter()//);even.print();env.execute();}//自定义filter算子public static class MyStreamFilter extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {@Overridepublic void processElement(StreamRecord<Integer> element) throws Exception {Integer value = element.getValue();if(value % 2 == 0) {output.collect(element);}}}
}

4、keyBy算子

分组就是为了聚合操作做准备的,keyBy方法会将数据流按照hash实现,分别放在不同的分区,每个分区都可以进行聚合操作

单个字段进行keyBy:

package cn._51doit.flink.day02;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyedDemo01 {public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);//sparkDataStreamSource<String> words = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));//使用下标,只适用于元组类型//KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);//匿名内部类
//      KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {//            @Override
//            public String getKey(Tuple2<String, Integer> value) throws Exception {//                return value.f0;
//            }
//        });//拉姆达表达式KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(t -> t.f0);keyed.sum(1).print();env.execute();}
}

多在字段进行keyBy

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyedDemo02 {public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);//省份,城市,金额DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, String, Integer>> provinceCityAndMoney = lines.map(new MapFunction<String, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(String value) throws Exception {String[] fields = value.split(",");return Tuple3.of(fields[0], fields[1], Integer.parseInt(fields[2]));}});//使用下标//KeyedStream<Tuple3<String, String, Integer>, Tuple> keyed = provinceCityAndMoney.keyBy(0, 1);//使用匿名内部类
//      KeyedStream<Tuple3<String, String, Integer>, Tuple2<String, String>> keyd = provinceCityAndMoney.keyBy(new KeySelector<Tuple3<String, String, Integer>, Tuple2<String, String>>() {//
//            @Override
//            public Tuple2<String, String> getKey(Tuple3<String, String, Integer> value) throws Exception {//                return Tuple2.of(value.f0, value.f1);
//            }
//        });//拉姆达表达式KeyedStream<Tuple3<String, String, Integer>, Tuple2<String, String>> keyed = provinceCityAndMoney.keyBy(t -> Tuple2.of(t.f0, t.f1));keyed.sum(2).print();env.execute();}
}

按照封装类的字段名进行keyBy

package cn._51doit.flink.day02;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyedDemo05 {public static void main(String[] args) throws Exception{Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);//省份,城市,金额DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<DataBean> provinceCityAndMoney = lines.map(new MapFunction<String, DataBean>() {@Overridepublic DataBean map(String value) throws Exception {String[] fields = value.split(",");return new DataBean(fields[0], fields[1], Integer.parseInt(fields[2]));}});KeyedStream<DataBean, Tuple> keyed = provinceCityAndMoney.keyBy("province", "city");keyed.sum("money").print();env.execute();}public static class DataBean {public String province;public String city;public Integer money;public DataBean() {}public DataBean(String province, String city, Integer money) {this.province = province;this.city = city;this.money = money;}@Overridepublic String toString() {return "DataBean{" +"province='" + province + '\'' +", city='" + city + '\'' +", money=" + money +'}';}}
}

5、Reduce算子

自定义聚合
在实际生产中,不可能让我们完成简单的sum/max聚合操作,所以我们需要更复杂的操作,而reduce就是满足这个条件,它可以让我们自定义聚合的方式。

package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 从指定的socket读取数据,对单词进行计算*/
public class RedcuceDemo01 {public static void main(String[] args) throws Exception {//创建Flink流计算执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//创建DataStream//SourceDataStreamSource<String> lines = env.socketTextStream("localhost", 8888);//调用TransformationSingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {//new Tuple2<String, Integer>(word, 1)collector.collect(Tuple2.of(word, 1));}}});//分组KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tp) throws Exception {return tp.f0;}});//调用reduce进行聚合//SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);SingleOutputStreamOperator<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {//value1:第一个数据的数据或中间累加的结果//value2: 以后输入key相同的数据value1.f1 = value1.f1 + value2.f1;return value1;}});//调用Sinkreduced.print();//启动执行env.execute("StreamingWordCount");}
}

6、聚合算子

如果只有分组的字段和要比较的字段,min和minBy效果一样;若是还有别的字段,min其他字段被分组的第一个字段占位,minBy其他字段不会被占位

比如:

辽宁,沈阳,1000
辽宁,大连,3000
辽宁,铁岭,500根据省分组,使用min,最终返回
辽宁,沈阳,500根据省分组,使用minBy,最终返回
辽宁,铁岭,500
package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//分组后,求组内最小值
public class MinMinByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//spark,2//spark,5//hadoop,7//hadoop,3DataStream<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] fields = value.split(",");String word = fields[0];int count = Integer.parseInt(fields[1]);return Tuple2.of(word, count);}});KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(w -> w.f0);SingleOutputStreamOperator<Tuple2<String, Integer>> res = keyed.minBy(1);res.print();env.execute();}
}

minBy/maxBy 设置参数

比如:

辽宁,沈阳,1000
辽宁,铁岭,500
辽宁,长春,500根据省分组,使用min,如果默认,最终返回
辽宁,铁岭,500根据省分组,使用minBy,如果false,最终返回
辽宁,长春,500
package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//分组后,求组内最小值
public class MinMinByDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//调用Source创建DataStream//辽宁,沈阳,1000//北京,朝阳,8000//辽宁,朝阳,1000//辽宁,朝阳,1000//辽宁,沈阳,2000//北京,朝阳,1000//辽宁,大连,3000//辽宁,铁岭,500DataStream<String> lines = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, String, Double>> pcm = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {@Overridepublic Tuple3<String, String, Double> map(String value) throws Exception {String[] fields = value.split(",");String province = fields[0];String city = fields[1];double money = Double.parseDouble(fields[2]);return Tuple3.of(province, city, money);}});KeyedStream<Tuple3<String, String, Double>, String> keyed = pcm.keyBy(t -> t.f0);SingleOutputStreamOperator<Tuple3<String, String, Double>> res = keyed.minBy(2, false);res.print();env.execute();}
}

7、union算子

多条数据流合并,并行度跟第一个流有关

package cn._51doit.flink.day03;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionDemo {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines1 = env.socketTextStream("localhost", 8888);DataStreamSource<String> lines2 = env.socketTextStream("localhost", 9999);//DataStreamSource<Integer> nums = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);//union要求两个流中数据的类型必须一致//lines1.union(nums)DataStream<String> unioned = lines1.union(lines2);unioned.print();env.execute();}
}

8、Connect算子

两个流进行合并,数据类型可以不一致,但是能共享一个状态,connect之后可以调用 coMap,coflatMap、process

package cn._51doit.flink.day03;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.util.Collector;public class ConnectDemo {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> lines1 = env.socketTextStream("localhost", 8888);DataStream<Integer> lines2 = env.socketTextStream("localhost", 9999).map(s -> Integer.parseInt(s));//将两个流connectConnectedStreams<String, Integer> connected = lines1.connect(lines2);SingleOutputStreamOperator<String> result = connected.map(new CoMapFunction<String, Integer, String>() {//对第一个流进行map运算的方法@Overridepublic String map1(String value) throws Exception {return value.toUpperCase();}//对第二个流进行map运算的方法@Overridepublic String map2(Integer value) throws Exception {return (value * 10) + "";}//这两个流的map方法执行完的返回值会放入到新的流中});result.print();env.execute();}
}

9、Iterate算子

迭代计算,对传递过来数据,再次处理

package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//Iterate迭代流式计算
public class IterateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//10DataStreamSource<String> strs = env.socketTextStream("localhost", 8888);DataStream<Long> numbers = strs.map(Long::parseLong);//调用iterate方法 DataStream -> IterativeStream//对Nums进行迭代(不停的输入int的数字)IterativeStream<Long> iteration = numbers.iterate();//IterativeStream -> DataStream//对迭代出来的数据进行运算 //对输入的数据应用更新模型,即输入数据的处理逻辑DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("iterate input =>" + value);return value -= 2;}});//只要满足value > 0的条件,就会形成一个回路,重新的迭代,即将前面的输出作为输入,在进行一次应用更新模型,即输入数据的处理逻辑DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value > 0;}});//传入迭代的条件iteration.closeWith(feedback);//不满足迭代条件的最后要输出DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value <= 0;}});//数据结果output.print("output value:");env.execute();}
}

10、Project算子

只能使用在Tuple类型上,对数据进行映射, 要tuple部分字段

package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ProjectDemo {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//DataStreamSource<String> lines1 = env.socketTextStream("localhost", 8888);DataStreamSource<String> lines = env.fromElements("laozhao,18,9999.99", "laoduan,28,99.99");SingleOutputStreamOperator<Tuple3<String, Integer, Double>> tpDataStream = lines.map(e -> {String[] fields = e.split(",");return Tuple3.of(fields[0], Integer.parseInt(fields[1]), Double.parseDouble(fields[2]));}).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE));SingleOutputStreamOperator<Tuple> projected = tpDataStream.project(2, 0);projected.print();env.execute();}
}

11、split/select算子

把一个流按照指定逻辑,打不同标签,拆分成多个流,后来被SideOutPut替代

package com.lxk.test;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class TestSplitSelect {public static void main(String[] args) throws Exception{//运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//输入数据源DataStreamSource<Tuple3<Integer, String, String>> source = env.fromElements(new Tuple3<>(1, "1", "AAA"),new Tuple3<>(1, "1", "BBB"),new Tuple3<>(2, "2", "BBB"),);//定义拆分逻辑SplitStream<Tuple3<Integer, String, String>> splitStream = source.split(new OutputSelector<Tuple3<Integer, String, String>>() {@Overridepublic Iterable<String> select(Tuple3<Integer, String, String> value) {ArrayList<String> output = new ArrayList<>();if (value.f2.equals("AAA")) {output.add("A");} else if (value.f2.equals("BBB")) {output.add("B");}return output;}});//将流真正拆分出来splitStream.select("B").print("输出B:");env.execute();}
}

大数据之flink常用算子相关推荐

  1. 大数据——Spark RDD常用算子总结

    Spark的核心是建立在同一的抽象弹性分布式数据集(Resilient Distributed Datasets,RDD)之上的,这使得Spark的各个组件可以无缝的进行集成,能够在同一个应用程序中完 ...

  2. Flink - 尚硅谷- 大数据高级 Flink 技术精讲 - 2

    七.Flink 时间语义与 Watermark 7.1 Flink 中的时间语义 7.2 设置 Event Time 7.3 水位线 - Watermark 7.3.1 基本概念 7.3.2 Wate ...

  3. 大数据——HDFS的常用命令

    标题大数据--HDFS的常用命令 hdfs常用命令行: (1)查看帮助hdfs dfs -help (2)查看当前目录信息hdfs dfs -ls /(3)上传文件hdfs dfs -put /本地路 ...

  4. 深圳大数据培训:Transformation算子演示

    深圳大数据培训:Transformation算子演示 val conf = new SparkConf().setAppName("Test").setMaster("l ...

  5. 手把手教你搭建实时大数据引擎FLINK

    手把手教你搭建实时大数据引擎FLINK 服务器规划 Standalone高可用HA模式 架构图 下载并上传tar包 具体安装步骤 yarm 集群环境搭建 服务器规划 服务器规划 服务名称 职能 zhe ...

  6. 深度解读!新一代大数据引擎Flink厉害在哪?(附实现原理细节)

    导语 | 大数据计算分为离线计算和实时计算,其中离线计算就是我们通常说的批计算,代表技术是Hadoop MapReduce.Hive等:实时计算也被称作流计算,代表技术是Storm.Spark Str ...

  7. 大数据之flink教程

    第一章 Flink简介 1.1  初识Flink Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究 ...

  8. 大数据入门--Flink(四)状态管理与容错机制

    状态管理与容错机制 术语 状态管理 容错机制 状态一致性 检查点(checkpoint) 保存点(savepoint) 状态后端(state backend) 案例 术语 算子状态.键控状态.状态一致 ...

  9. python大数据运维常用脚本_大数据岗位要求之大数据运维

    继续介绍大数据系列岗位要求,大数据运维可能是"技术含量最高"的职位之一,这里说的大数据运维主要是指hadoop生态体系方面的运维,在一些小公司或者传统行业的大公司也会使用oracl ...

最新文章

  1. Jsoncpp 在C++开发中的一些使用记录
  2. 充分理解表达式——《狂人C》习题解答2(第二章习题5)
  3. 犯了一个连接数据库的低级错误
  4. 谈论源码_当我们谈论开放音乐时,我们指的是什么?
  5. ahoi2009维护序列
  6. 鸟哥的 Linux 私房菜学习笔记
  7. 易语言怎么给手机发短信,对接验证码短信接口DEMO示例
  8. 学画画软件app推荐_一步一步教画画的app有哪些_2018手机画画软件哪个好呢_96u手游网...
  9. HDU5442 Favorite Donut(KMP+最大表示法)
  10. 照片模糊怎么变清晰?不如试试这两个方法
  11. 怎么制作有趣的表情包
  12. 【Pytorch深度学习实战】(4)前馈神经网络(FNN)
  13. 想要搭建个人博客?我调研了 100 来个 Java 开源博客系统,发现这 5 个最好用!......
  14. 个人收款码和商家收款码有哪些区别
  15. 高中计算机专业满分多少,高中各科满分是多少
  16. 可控硅为啥不能用万用表触发?可控硅四种工作象限分析
  17. 中国将自主建造宇宙空间站
  18. Gnome排序(地精排序)
  19. 如何进行自动化测试?提高测试效率,缩短开发周期。
  20. Kubernetes(k8s)高可用简介与安装

热门文章

  1. c语言汇编语言在线转换,如何把汇编语言转换成C语言
  2. win10离线装linux子系统 运行ubuntu.exe失败闪退没反应
  3. 2023年美国大学生数学建模竞赛美赛B题思路分享
  4. How to fix ORA-01017:用户名/口令无效 登录拒绝
  5. Spire.OCR for .NET Patch
  6. Windiws10系统不显示可用网络的处理方法!
  7. 易康ecognition软件及其插件EPS1/2
  8. CCS使用教程06:在线仿真操作
  9. 使用大华惠智双目半球网络摄像机DH-IPC-HD4140X-E2获取人流量统计数据
  10. 2020历年真题全解析【数学一】-汤家凤【上册(1987-1999年)】