Apache Pulsar的Function流式计算框架使用
目录
- 1. 背景
- 2. 介绍
- 3. Pulsar Function的使用
- 3.1 Pulsar Function的启用
- 3.2 使用Pulsar Function
- 4. 自己编写一个Function
1. 背景
当从Pulsar中的一个topic消费数据,进行一些简单的ETL/聚合计算,然后将数据保存到另一个topic时。这个就可以使用Funtion流式计算框架。但是复杂的计算还是需要使用Spark/Flink等计算框架
2. 介绍
从多个Input Topic中消费数据,然后将计算后的数据发送到Output Topic。同时可以将日志写入到Log Topic中,主要用于Funtion出现问题时,定位错误并调试
Pulsar Funtion的编写方式有两种
- 本地模式:在集群外部进行本地运行
- 集群模式:在集群内部运行,支持独立模式和集成模式
3. Pulsar Function的使用
3.1 Pulsar Function的启用
修改Pulsar集群所有服务器的conf/broker.conf,如下内容
functionsWorkerEnabled=true
修改Pulsar集群所有服务器的conf/functions_worker.yml,如下内容
pulsarFunctionsCluster: pulsar-cluster
然后重启broker服务
3.2 使用Pulsar Function
运行官网提供的example包,先在集群模式下创建Function,创建完成的Function是运行的
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
> --jar examples/api-examples.jar \
> --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
> --inputs persistent://public/default/exclamation-input \
> --output persistent://public/default/exclamation-output \
> --tenant public \
> --namespace default \
> --name exclamation
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
然后触发Function运行,得到结果。原理是向exclamation-input这个topic发送消息,然后消费exclamation-output这个topic的消息
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
hello world!
[root@bigdata001 apache-pulsar-2.9.1]#
查看Function状态
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions status --name exclamation
{"numInstances" : 1,"numRunning" : 1,"instances" : [ {"instanceId" : 0,"status" : {"running" : true,"error" : "","numRestarts" : 0,"numReceived" : 0,"numSuccessfullyProcessed" : 0,"numUserExceptions" : 0,"latestUserExceptions" : [ ],"numSystemExceptions" : 0,"latestSystemExceptions" : [ ],"averageLatency" : 0.0,"lastInvocationTime" : 0,"workerId" : "c-pulsar-cluster-fw-bigdata003-8086"}} ]
}
[root@bigdata001 apache-pulsar-2.9.1]#
stop Function
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions stop --name exclamation
Stopped successfully
[root@bigdata001 apache-pulsar-2.9.1]#
start Function
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions start --name exclamation
Started successfully
[root@bigdata001 apache-pulsar-2.9.1]#
delete Function
[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions delete --name exclamation
"Deleted successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
**其它Function使用说明
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions [command]
command可选值:
- localrun:创建本地function运行
- get:获取function相关信息
- restart:重启
- stats:查看状态
- list:查看特点tenant和namespace下的所有function
4. 自己编写一个Function
需求:读取input topic,其中日期格式为yyyy/MM/dd HH/mm/ss,转换为格式yyyy-MM-dd HH:mm:ss,然后发送到output topic
- 添加依赖
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-functions-api</artifactId><version>2.9.1</version>
</dependency>
- 编写程序
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.pulsar.functions.api.{Context, Function}import java.util.Dateclass DateTransfromFunction extends Function[String, String] {private val oldDateFormat: FastDateFormat =FastDateFormat.getInstance("yyyy/MM/dd HH/mm/ss")private val newDateFormat: FastDateFormat =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")// 每来一条消息,都会调用process进行处理// context表示上下文对象,用于执行一些相关的统计计算操作,以及获取相关的对象和元数据信息override def process(input: String, context: Context): String = {val oldDate: Date = oldDateFormat.parse(input)val newStringDate = newDateFormat.format(oldDate)newStringDate}}
- 然后将程序进行打包,上传到Pulsar集群中的一台服务器
- 创建Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
> --jar /opt/pulsar_dev-1.0-SNAPSHOT.jar \
> --classname DateTransfromFunction \
> --inputs persistent://public/default/dateTransfrom-input \
> --output persistent://public/default/dateTransfrom-output \
> --tenant public \
> --namespace default \
> --name dateTransfrom
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
- 触发Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name dateTransfrom --trigger-value "2022/04/10 16/32/18"
2022-04-10 16:32:18
[root@bigdata001 apache-pulsar-2.9.1]#
Apache Pulsar的Function流式计算框架使用相关推荐
- python 流式计算框架_流式计算的三种框架:Storm、Spark和Flink
我们知道,大数据的计算模式主要分为批量计算(batch computing).流式计算(stream computing).交互计算(interactive computing).图计算(graph ...
- Storm 流式计算框架介绍
文章目录 1.Storm简介 1.1 DAG(有向无环图) 1.2 Storm介绍 1.2.1 Storm 简介 1.2.2 Storm的优点 1.2.3 Storm的特性 1.3 Storm与Had ...
- JStorm—实时流式计算框架入门介绍
JStorm介绍 JStorm是参考storm基于Java语言重写的实时流式计算系统框架,做了很多改进.如解决了之前的Storm nimbus节点的单点问题. JStorm类似于Hadoop ...
- 分布式流式计算框架Storm
Storm用于实时处理,就好比 Hadoop 用于批处理. --> 离线计算:批量获取数据,批量传输数据,周期性比量计算数据,数据展示(Sqoop-->HDFS--> ...
- Flink流式计算框架中的窗口函数
一.窗口(window) (1)一般真实的流都是无界的,怎样处理无界的数据? (2)可以把无限的数据流进行切分,得到有限的数据集进行处理--也就是得到有界流: (3)窗口(window)就是将无限流切 ...
- 流式计算框架Storm 编程案例部署Linux结果演示及pom依赖
使用maven方式创建storm项目: <?xml version="1.0" encoding="UTF-8"?> <project xml ...
- 【Flink流式计算框架】常见sink操作
007Flink print() / printToErr() writeAsText() Flink提供的sink 自定义sink 获取source的方式(自带的) 基于文件:read ...
- 流式计算框架Storm网站访问来源实时统计及存储到redis代码示例
- 流式计算框架Storm编程案例:实时给手机品牌转大写并加上时间戳后缀代码示例
导入jar包,保险起见,直接从storm安装目录拷贝,maven方式可能会因版本问题出现纰漏. 结果演示:
最新文章
- Python从零开始 day2
- Codeforces Round #469 (Div. 2) A/B
- WaitForMultipleObjects用法详解
- Java Iterator 接口简介和简单用法.
- 添加文件然后自动打开
- HDU - 1536 S-Nim(sg函数)
- python堆栈反向输出列表_python - IPython:将Python脚本的输出重定向到文件(如bash) - 堆栈内存溢出...
- python自动化接口测试excel用例串行之行_python 读取 Excel 自动化执行测试用例
- 记录下kaggle比赛经验
- (40)Xilinx PLL IP核配置(一)(第8天)
- python多进程运行死机_python多进程假死
- Concrete Mathematics A Foundation for Computer Science
- 梦幻内存辅助制作教程
- 计算机组装工具以及装机流程,如何组装电脑,图文教程详解电脑组装全过程
- 重庆大Vseo这么多年的seo心得
- 【时间转换】将秒转换成“时分秒”格式
- 为什么我不建议编程初学者使用Vim,一张图告诉你
- github创建tag
- python数据分析期末_Python数据分析期末作业
- Java课设对对碰_java课程设计实验报告
热门文章
- 买卖股票的最佳时机系列
- java 递归函数_java 递归函数
- 快乐打小鸟游戏的开发
- DirectshowLib摄像头拍照的”未找到可用于建立连接的介质筛选器组合“ 解决办法...
- Web开发人员月报2018年10月
- [ITIL]-ITIL4模拟题库1
- 过年帮同学做Windows 7/8系统的心得
- AXI,AXI_Lite,AXI_Stream
- python爬虫(廖雪峰商业爬虫)
- 兰州中考计算机考试,兰州城市四区2017年中考考务工作安排