Flink MapState实例
MapState的方法和Java的Map的方法极为相似,所以上手相对容易。
常用的有如下:
- get()方法获取值
- put(),putAll()方法更新值
- remove()删除某个key
- contains()判断是否存在某个key
- isEmpty() 判断是否为空
需求:统计每个用户的行为次数
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Random;public class MapStateTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<Tuple3<String, String, Long>> tuple2DataStreamSource = env.addSource(new SourceFunction<Tuple3<String, String, Long>>() {boolean flag = true;@Overridepublic void run(SourceContext<Tuple3<String, String, Long>> sourceContext) throws Exception {String[] s = {"张三", "王五", "李四", "秋英"};String[] s1 = {"登录", "退出", "加购", "够买"};while (flag) {Thread.sleep(1000);int i = new Random().nextInt(4);sourceContext.collect(new Tuple3<String, String, Long>(s[i], s1[i], System.currentTimeMillis()));}}@Overridepublic void cancel() {flag = false;}});SingleOutputStreamOperator<Tuple3<String, String, Long>> tuple3SingleOutputStreamOperator = tuple2DataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> stringLongTuple2, long l) {return stringLongTuple2.f2;}}));tuple3SingleOutputStreamOperator.keyBy(new KeySelector<Tuple3<String, String, Long>, String>() {@Overridepublic String getKey(Tuple3<String, String, Long> stringStringLongTuple3) throws Exception {return stringStringLongTuple3.f0;}}).process(new KeyedProcessFunction<String, Tuple3<String, String, Long>, String>() {MapState<String,Integer> mapState = null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);MapStateDescriptor<String,Integer> mapStateDescriptor = new MapStateDescriptor<String, Integer>("mapstate",String.class,Integer.class);mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {// 初始化if(!mapState.contains(value.f1)){mapState.put(value.f1,1);}// "登录", "退出", "加购", "够买"mapState.put(value.f1,mapState.get(value.f1) + 1);out.collect(value.f0 + "[登录次数:" + mapState.get("登录") + ",退出次数:" + mapState.get("退出") + ",加购次数:" + mapState.get("加购") + ",够买次数:" + mapState.get("够买") + "]");}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);}}).print();env.execute("mapState");}
}
Flink MapState实例相关推荐
- Flink MapState TTL
Flink MapState TTL Flink MapState的失效时间是针对整个map还是单个key? Flink MapState 使用 MapState 失效时间验证 Flink MapSt ...
- flink window实例分析
window是处理数据的核心.按需选择你需要的窗口类型后,它会将传入的原始数据流切分成多个buckets,所有计算都在window中进行. flink本身提供的实例程序TopSpeedWindowin ...
- Flink ReducingState 实例
ReducingState介绍 ReducingState是和ReduceFunction配合使用 get() 获取状态的值 add(IN value)方法添加一个元素,触发reduceFunctio ...
- Flink MapState的实践
State通过用来保存中间状态 public class TestWindows extends RichWindowFunction<Long, Long, Long, Window> ...
- kafka+flink集成实例
kafka+flink集成 1.目的 1.1 Flink简介 Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming ...
- flink中的WaterMark调研和具体实例
一些基本概念介绍: Event Time 事件时间是每个事件在其生产设备上发生的时间 Ingestion Time 摄取时间是数据进入Flink的时间 Processing Time 处理时间是是指正 ...
- Flink 读取 Kafka 消息并批量写入到 MySQL实例
前言说明 环境搭建可参考 <kafka+flink集成实例> 本实例主要实现功能如下: 模拟消息生成->Kafka->Flink->Mysql 其中Flink做数据流收集 ...
- Flink实际问题以及知识点
Flink实际问题以及知识点 1 状态原理 2.状态是什么东西?有了状态能做什么? 3.为什么离线计算中不提状态,实时计算老是提到状态这个概念?状态到底在实时计算中解决了什么问题? 4.有了状态.为什 ...
- flink入门实战总结
随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...
最新文章
- Netty 和 RPC 框架线程模型分析
- awk命令扩展使用操作
- 2020前端面试(一面面试题)
- MYsql数据库误删mysql下的user内容,导致哪哪都不能登录
- 没有mysql支持时的替代方案
- 基于SSM的校园二手交易平台的设计与实现
- 一个数如果恰好等于它的因子之和,这个数就成为“完数”。 例如,28的因子为1,2,4,7,14。而28=1+2+4+7+14,因此28是“完数”。编程找出1000之内的所有完数,并按下面格
- 《计算机网络:自顶向下方法》阅读笔记
- 单元测试怎么就成了银弹?
- 解决 Windows USB 鼠标键盘断连掉线的问题 和 安全删除硬件并弹出媒体图标 没有弹出移动硬盘的选项
- NX二次开发-重命名装配组件
- Linux 环境保存【终端打印信息】到特定文件
- android地图入门,android 百度地图入门01 (史上最详没有之一)
- Java实现树状结构解析
- 工业相机内外触发以及控制频闪灯
- idea提示java: -source 1.5 中不支持修改
- 2023养老展,中福协养老展,中国国际养老服务业博览会
- [Unity安卓封装][C#版]Unity使用TextToSpeech
- 《Java Web开发实战经典》李兴华 王月清 第五章5.7.3 例5.26
- 完美攻略心得之圣魔大战3(Castle Fantisia)艾伦希亚战记(艾伦西亚战记)包括重做版(即新艾伦希亚战记)