Flink java模拟生成自定义流式数据
思路如下:
- 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法
- 自定义DataSource实现SourceFunction接口
- 使用
ctx.collect()
传入想要发送的数据就可以了
首先定义一个POJO类:
class MyData {public int keyId;public long timestamp;public double value;public MyData() {}public MyData(int accountId, long timestamp, double value) {this.keyId = accountId;this.timestamp = timestamp;this.value = value;}public long getKeyId() {return keyId;}public void setKeyId(int keyId) {this.keyId = keyId;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public double getValue() {return value;}public void setValue(double value) {this.value = value;}@Overridepublic String toString() {return "MyData{" +"keyId=" + keyId +", timestamp=" + timestamp +", value=" + value +'}';}
}
生成自己的数据:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class CreateMyData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<MyData> sourceStream = env.addSource(new MyDataSource());env.setParallelism(3);sourceStream.print();env.execute();}private static class MyDataSource implements SourceFunction<MyData> {// 定义标志位,用来控制数据的产生private boolean isRunning = true;private final Random random = new Random(0);@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(new MyData(random.nextInt(5), System.currentTimeMillis(), random.nextFloat()));Thread.sleep(1000L); // 1s生成1个数据}}@Overridepublic void cancel() {isRunning = false;}}
}
Flink java模拟生成自定义流式数据相关推荐
- 小白学习Flink系列--第二篇-01(流式数据概念)
导读 要想彻底理解Flink,就要了解流数据的前世今生,流数据的语义.特点,以及如何处理,以下文章就能很好的解释流数据的概念和模型,对了解Flink有很大的帮助 前言 今天流式数据处理在大数据领域是一 ...
- flink源码分析_Flink源码分析之深度解读流式数据写入hive
前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下.以便朋友们对flink ...
- Apache Griffin+Flink+Kafka实现流式数据质量监控实战
点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...
- Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...
- 使用 Flink Hudi 构建流式数据湖
简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进. 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增 ...
- 大数据Hadoop之——新一代流式数据湖平台 Apache Hudi
文章目录 一.概述 二.Hudi 架构 三.Hudi的表格式 1)Copy on Write(写时复制) 2)Merge On Read(读时合并) 3)COW vs MOR 四.元数据表(Metad ...
- DataSketch求流式数据分位数
类似的功能Spark.Flink等都实现了,但是使用起来比较繁琐.Apache下的DataSketch用起来稍微简单一些.先导包: <dependency><groupId>o ...
- kafka处理流式数据_通过Apache Kafka集成流式传输大数据
kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...
- python 可视化监控平台_python可视化篇之流式数据监控的实现
preface 流式数据的监控,以下主要是从算法的呈现出发,提供一种python的实现思路 其中: 1.python是2.X版本 2.提供两种实现思路,一是基于matplotlib的animation ...
最新文章
- python实现图结构github_Github项目+代码:新型深度网络体系结构去除图像中的雨水痕迹...
- python面试-python简单面试题
- command line
- 移动硬盘无法弹出的问题
- (231)DPU数据处理单元现有产品介绍
- Python 3.65 安装geopandas
- 两球完全弹性碰撞反弹方向问题
- 使用PS2019制作明信片
- 黑白双色背景图java_黄色背景黑白双色简洁ppt图表
- Python实现isPrime函数----新手
- 众测、专属、渗透测试捡破烂小tips
- delphi xe7 EMS是什么 什么作用,怎么使用?
- xcode 软件˙∆集~
- Cursor攻略,吃个螃蟹
- 2022年新一代最强开源UI自动化测试神器 ——Playwright(三)
- 关于内外网数据同步解决方案
- intellij idea报错:类文件具有错误的版本 61.0, 应为 52.0
- 计算机公式SUBSTITUTE,全了,SUBSTITUTE函数常用套路集合!
- Storm_Storm主要特点
- internal/modules/cjs/loader.js:584