
package reduce;import;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;/*** @Author you guess* @Date 2020/6/17 20:52* @Version 1.0* @Desc*/
public class DataStreamReduceTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple3<String, String, Integer>> src1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {@Overridepublic void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {ctx.collect(Tuple3.of("Lisi", "Math", 1));ctx.collect(Tuple3.of("Lisi", "English", 2));ctx.collect(Tuple3.of("Lisi", "Chinese", 3));ctx.collect(Tuple3.of("Zhangsan", "Math", 4));ctx.collect(Tuple3.of("Zhangsan", "English", 5));ctx.collect(Tuple3.of("Zhangsan", "Chinese", 6));}@Overridepublic void cancel() {}}, "source1");//        src1.print();
//        7> (Zhangsan,Chinese,6)
//        4> (Lisi,Chinese,3)
//        2> (Lisi,Math,1)
//        5> (Zhangsan,Math,4)
//        3> (Lisi,English,2)
//        6> (Zhangsan,English,5)/*** 代码段2*/
//        src1.keyBy(0).reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
//            @Override
//            public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
//                return Tuple3.of(value1.f0, "总分:", value1.f2 + value2.f2);
//            }
//        }).print();
//        1> (Lisi,Math,1)
//        11> (Zhangsan,Math,4)
//        1> (Lisi,总分:,3)
//        11> (Zhangsan,总分:,9)
//        1> (Lisi,总分:,6)
//        11> (Zhangsan,总分:,15)/*** 代码段3,与代码段2 同义*/src1.keyBy(0).reduce((value1, value2) -> Tuple3.of(value1.f0, "总分:", value1.f2 + value2.f2)).print();
//        1> (Lisi,Math,1)
//        11> (Zhangsan,Math,4)
//        1> (Lisi,总分:,3)
//        11> (Zhangsan,总分:,9)
//        1> (Lisi,总分:,6)
//        11> (Zhangsan,总分:,15)env.execute("Flink DataStreamReduceTest by Java");}}



DataStreamSource没有aggregate(min minby max maxby sum等)、reduce操作;

KeyedStream、AllWindowedStream、DataSet有aggregate(min minby max maxby sum等)、reduce操作;

Flink ,Min MinBy Max MaxBy sum实例

flink 1.9.2,java1.8


/*** Base interface for Reduce functions. Reduce functions combine groups of elements to* a single value, by taking always two elements and combining them into one. Reduce functions* may be used on entire data sets, or on grouped data sets. In the latter case, each group is reduced* individually.** <p>For a reduce functions that work on an entire group at the same time (such as the* MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the general case,* ReduceFunctions are considered faster, because they allow the system to use more efficient* execution strategies.** <p>The basic syntax for using a grouped ReduceFunction is as follows:* <pre>{@code* DataSet<X> input = ...;** DataSet<X> result = input.groupBy(<key-definition>).reduce(new MyReduceFunction());* }</pre>** <p>Like all functions, the ReduceFunction needs to be serializable, as defined in {@link}.** @param <T> Type of the elements that this function processes.*/
public interface ReduceFunction<T> extends Function, Serializable {/*** The core method of ReduceFunction, combining two values into one value of the same type.* The reduce function is consecutively applied to all values of a group until only a single value remains.** @param value1 The first value to combine.* @param value2 The second value to combine.* @return The combined value of both input values.** @throws Exception This method may throw exceptions. Throwing an exception will cause the operation*                   to fail and may trigger recovery.*/T reduce(T value1, T value2) throws Exception;


