这是每个密钥和窗口

state和

timers的新功能的典型用例.

状态在a Beam blog post中描述,而对于计时器,您将不得不依赖于Javadoc.没关系javadoc所说的支持他们的跑步者,真实的状态可以在Beam的capability matrix中找到.

该模式非常类似于您所编写的模式,但是状态允许它与窗口以及捆绑包一起使用,因为它们在流式传输中可能非常小.由于必须以某种方式对state进行分区以保持并行性,因此您需要添加某种键.目前没有自动分片.

private static final class Function extends DoFn, Void> implements Serializable {

private static final long serialVersionUID = 2417984990958377700L;

private static final int LIMIT = 500;

@StateId("bufferedSize")

private final StateSpec> bufferedSizeSpec =

StateSpecs.value(VarIntCoder.of());

@StateId("buffered")

private final StateSpec> bufferedSpec =

StateSpecs.bag(StringUtf8Coder.of());

@TimerId("expiry")

private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement

public void processElement(

ProcessContext context,

BoundedWindow window,

@StateId("bufferedSize") ValueState bufferedSizeState,

@StateId("buffered") BagState bufferedState,

@TimerId("expiry") Timer expiryTimer) {

int size = firstNonNull(bufferedSizeState.read(), 0);

bufferedState.add(context.element().getValue());

size += 1;

bufferedSizeState.write(size);

expiryTimer.set(w.maxTimestamp().plus(allowedLateness));

if (size > LIMIT) {

flush(context, bufferedState, bufferedSizeState);

}

}

@OnTimer("expiry")

public void onExpiry(

OnTimerContext context,

@StateId("bufferedSize") ValueState bufferedSizeState,

@StateId("buffered") BagState bufferedState) {

flush(context, bufferedState, bufferedSizeState);

}

private void flush(

Context context,

BagState bufferedState,

ValueState bufferedSizeState) {

Iterable buffered = bufferedState.read();

// build batch request from buffered

...

// clear things

bufferedState.clear();

bufferedSizeState.clear();

}

}

在这里做几点说明:

> State取代了你的DoFn的实例变量,因为

实例变量在窗口之间没有内聚力.

>缓冲区和大小只是根据需要进行初始化

@StartBundle.

> BagState支持“盲”写入,因此不需要

任何读 – 修改 – 写,只需提交相同的新元素

当你输出时的方式.

>在同一时间重复设置一个计时器就好了;

它应该主要是一个noop.

> @OnTimer(“expiry”)取代@FinishBundle,因为

完成一个包不是每个窗口的东西,而是一个神器

跑步者如何执行您的管道.

所有这一切,如果你正在写一个外部系统,也许你会想要重新启用窗口并重新窗口进入全局窗口,然后才进行写入,其中写入的方式取决于窗口,因为“外部世界是全球窗口“.

java 缓冲流 刷新_java – 缓冲和刷新Apache Beam流数据相关推荐

  1. java excel 展开折叠_Java 创建、刷新Excel透视表/设置透视表行折叠、展开

    透视表是依据已有数据源来创建的交互式表格,我们可在excel中创建透视表,也可编辑已有透视表.本文以创建透视表.刷新透视表以及设置透视表的行展开或折叠为例,介绍具体的操作方法. 所需工具:Free S ...

  2. java excel 展开折叠_java创建和刷新excel透视表,还可设置透视表行折叠和展开的实例...

    码农公社  210.net.cn  210是何含义?10月24日是程序员节,1024 =210.210既 210 之意. java创建和刷新excel透视表,还可设置透视表行折叠和展开的实例 透视表是 ...

  3. java io文件流序列化_Java——Properties集合,Object序列化流与反序列化流,打印流,commons-IO文件工具类...

    一.properties集合 集合对象Properties类,继承Hashtable,实现Map接口,可以和IO对象结合使用,实现数据的持久存储. 特点: Hashtable的子类,map集合中的方法 ...

  4. java读取二进制流文件_java分别通过字节流、字符流、二进制读取文件的代码

    将做工程过程中比较好的一些内容段做个备份,下面的资料是关于 java分别通过字节流.字符流.二进制读取文件的内容,应该是对小伙伴们有些用途. public class Start { public s ...

  5. java 下载图片流_java下载图片(通用)httpClient,io流

    httpClient下载图片 public static void downImage(CloseableHttpClient client, String imgUrl, String savePa ...

  6. java memcached 存储对象_java – 从Memcache中获取低级别数据存储区实体对象时的慢速反序列化...

    事实证明,检索存储内存缓存的低级数据存储实体非常缓慢.由于objectify将实体缓存为低级数据存储区实体类型,因此当使用objectify从memcache中获取许多实体时,这会导致性能不佳. 真正 ...

  7. java后台接收数据格式_Java后台基于POST获取JSON格式数据

    1.直接使用request.getParamater()的方法获取(这种取参方式对于POST和GET的提交方式均适用): 2.通过请求体的IO流获取参数(这种方式只能用于POST,因为GET方式没有请 ...

  8. java并发数据共享机制_Java并发编程:核心理论之数据共享性

    原标题:Java并发编程:核心理论之数据共享性 并发编程是Java程序员最重要的技能之一,也是最难掌握的一种技能.它要求编程者对计算机最底层的运作原理有深刻的理解,同时要求编程者逻辑清晰.思维缜密,这 ...

  9. java hive建表_java jdbc 操作 hive 建表 load 数据

    // 需要引入 hadoop & hive jar import java.sql.Connection; import java.sql.DriverManager; import java ...

最新文章

  1. ASP .NET 如何在 SQL 查询层面实现分页
  2. 认证登录时代来临,主流验证登录方式盘点
  3. SMARTFORM的使用BSIS会计凭证中的字段DMBTR(本币金额)报错
  4. android自带下拉阻尼动画,android 有阻尼下拉刷新列表的实现方法
  5. .htaccess 重定向_如何使用.htaccess将HTTP重定向到HTTPS
  6. 使用TensorFlow.js进行人脸触摸检测第2部分:使用BodyPix
  7. Getting Started with STM32 in Segger Embedded Studio
  8. WebLogic的下载与安装(图文教程)
  9. php筛选怎么做,thinkphp条件筛选 例子
  10. 金九银十正确打开方式!那些年我们一起踩过算法与数据结构的坑
  11. 大数据学习路线图 让你精准掌握大数据技术学习
  12. Java 语法 索引 ----- 继承(Inheritance) 和重写(Overriding)
  13. if __name__ == '__main__' 的正确理解
  14. 【微信页面】移动端微信页面禁止字体放大
  15. 网页开发(三)——实现网页前端和数据库的数据交换
  16. 聊天室应用开发实践(二):实现基于 Web 的聊天室
  17. Python爬虫获取网易云歌单封面(带Cookie)
  18. PHP 出现 The requested URL was not found on this server 怎么办?
  19. Markdown (CSDN) MD编辑器(三)- 图片缩放、指定尺寸、居中、左对齐、右对齐
  20. snowboy嵌入式_树莓派3B+使用snowboy唤醒

热门文章

  1. 想尝试搭建图像识别系统?这里有一份TensorFlow速成教程
  2. spring security系列一:架构概述
  3. 【★★★★★】提高PHP代码质量的36个技巧
  4. 洛谷 P1313 计算系数 Label:杨辉三角形 多项式计算
  5. grub配置文件丢失的解决方法
  6. 营销团队管理必备101招
  7. 关于不使用漫游配置文件解决方案保持登陆域后保持原来的配置文件不变
  8. 我是新来的,希望大家以后能多指教.
  9. 转:过度疲劳的27个信号与预防方法
  10. gcc oracle mysql_[转]Windows下用GCC连接MySQL数据库