MapState的方法和Java的Map的方法极为相似,所以上手相对容易。
常用的有如下:

  • get()方法获取值
  • put(),putAll()方法更新值
  • remove()删除某个key
  • contains()判断是否存在某个key
  • isEmpty() 判断是否为空

需求:统计每个用户的行为次数

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Random;public class MapStateTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<Tuple3<String, String, Long>> tuple2DataStreamSource = env.addSource(new SourceFunction<Tuple3<String, String, Long>>() {boolean flag = true;@Overridepublic void run(SourceContext<Tuple3<String, String, Long>> sourceContext) throws Exception {String[] s = {"张三", "王五", "李四", "秋英"};String[] s1 = {"登录", "退出", "加购", "够买"};while (flag) {Thread.sleep(1000);int i = new Random().nextInt(4);sourceContext.collect(new Tuple3<String, String, Long>(s[i], s1[i], System.currentTimeMillis()));}}@Overridepublic void cancel() {flag = false;}});SingleOutputStreamOperator<Tuple3<String, String, Long>> tuple3SingleOutputStreamOperator = tuple2DataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> stringLongTuple2, long l) {return stringLongTuple2.f2;}}));tuple3SingleOutputStreamOperator.keyBy(new KeySelector<Tuple3<String, String, Long>, String>() {@Overridepublic String getKey(Tuple3<String, String, Long> stringStringLongTuple3) throws Exception {return stringStringLongTuple3.f0;}}).process(new KeyedProcessFunction<String, Tuple3<String, String, Long>, String>() {MapState<String,Integer> mapState = null;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);MapStateDescriptor<String,Integer> mapStateDescriptor = new MapStateDescriptor<String, Integer>("mapstate",String.class,Integer.class);mapState = getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {// 初始化if(!mapState.contains(value.f1)){mapState.put(value.f1,1);}// "登录", "退出", "加购", "够买"mapState.put(value.f1,mapState.get(value.f1) + 1);out.collect(value.f0 + "[登录次数:" + mapState.get("登录") + ",退出次数:" + mapState.get("退出") + ",加购次数:" + mapState.get("加购") + ",够买次数:" + mapState.get("够买") + "]");}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);}}).print();env.execute("mapState");}
}

Flink MapState实例相关推荐

  1. Flink MapState TTL

    Flink MapState TTL Flink MapState的失效时间是针对整个map还是单个key? Flink MapState 使用 MapState 失效时间验证 Flink MapSt ...

  2. flink window实例分析

    window是处理数据的核心.按需选择你需要的窗口类型后,它会将传入的原始数据流切分成多个buckets,所有计算都在window中进行. flink本身提供的实例程序TopSpeedWindowin ...

  3. Flink ReducingState 实例

    ReducingState介绍 ReducingState是和ReduceFunction配合使用 get() 获取状态的值 add(IN value)方法添加一个元素,触发reduceFunctio ...

  4. Flink MapState的实践

    State通过用来保存中间状态 public class TestWindows extends RichWindowFunction<Long, Long, Long, Window> ...

  5. kafka+flink集成实例

    kafka+flink集成 1.目的 1.1 Flink简介 Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming ...

  6. flink中的WaterMark调研和具体实例

    一些基本概念介绍: Event Time 事件时间是每个事件在其生产设备上发生的时间 Ingestion Time 摄取时间是数据进入Flink的时间 Processing Time 处理时间是是指正 ...

  7. Flink 读取 Kafka 消息并批量写入到 MySQL实例

    前言说明 环境搭建可参考 <kafka+flink集成实例> 本实例主要实现功能如下: 模拟消息生成->Kafka->Flink->Mysql 其中Flink做数据流收集 ...

  8. Flink实际问题以及知识点

    Flink实际问题以及知识点 1 状态原理 2.状态是什么东西?有了状态能做什么? 3.为什么离线计算中不提状态,实时计算老是提到状态这个概念?状态到底在实时计算中解决了什么问题? 4.有了状态.为什 ...

  9. flink入门实战总结

    随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...

最新文章

  1. Netty 和 RPC 框架线程模型分析
  2. awk命令扩展使用操作
  3. 2020前端面试(一面面试题)
  4. MYsql数据库误删mysql下的user内容,导致哪哪都不能登录
  5. 没有mysql支持时的替代方案
  6. 基于SSM的校园二手交易平台的设计与实现
  7. 一个数如果恰好等于它的因子之和,这个数就成为“完数”。 例如,28的因子为1,2,4,7,14。而28=1+2+4+7+14,因此28是“完数”。编程找出1000之内的所有完数,并按下面格
  8. 《计算机网络:自顶向下方法》阅读笔记
  9. 单元测试怎么就成了银弹?
  10. 解决 Windows USB 鼠标键盘断连掉线的问题 和 安全删除硬件并弹出媒体图标 没有弹出移动硬盘的选项
  11. NX二次开发-重命名装配组件
  12. Linux 环境保存【终端打印信息】到特定文件
  13. android地图入门,android 百度地图入门01 (史上最详没有之一)
  14. Java实现树状结构解析
  15. 工业相机内外触发以及控制频闪灯
  16. idea提示java: -source 1.5 中不支持修改
  17. 2023养老展,中福协养老展,中国国际养老服务业博览会
  18. [Unity安卓封装][C#版]Unity使用TextToSpeech
  19. 《Java Web开发实战经典》李兴华 王月清 第五章5.7.3 例5.26
  20. 完美攻略心得之圣魔大战3(Castle Fantisia)艾伦希亚战记(艾伦西亚战记)包括重做版(即新艾伦希亚战记)

热门文章

  1. 保存的流程图怎么找不到_别找了,找不到了!最赞的流程图工具都在这!
  2. Junit 入门级使用教程
  3. 常用测试用例设计方法之判定表法详解
  4. mysql sqlserver 性能优化_SQLServer性能优化之---数据库级日记监控
  5. 安装WinXPSP2后BT速度却变慢的原因(转)
  6. 1005. 存款收益
  7. 基于php003飞机票航空售票查询预定系统
  8. 估计理论(3):充分统计量的完备性
  9. 金属触摸化妆镜控制器方案
  10. 智能客服机器人系统的优势及提供哪些服务?