目录

  • 1. 背景
  • 2. 介绍
  • 3. Pulsar Function的使用
    • 3.1 Pulsar Function的启用
    • 3.2 使用Pulsar Function
  • 4. 自己编写一个Function

1. 背景

当从Pulsar中的一个topic消费数据,进行一些简单的ETL/聚合计算,然后将数据保存到另一个topic时。这个就可以使用Funtion流式计算框架。但是复杂的计算还是需要使用Spark/Flink等计算框架

2. 介绍

从多个Input Topic中消费数据,然后将计算后的数据发送到Output Topic。同时可以将日志写入到Log Topic中,主要用于Funtion出现问题时,定位错误并调试

Pulsar Funtion的编写方式有两种

  1. 本地模式:在集群外部进行本地运行
  2. 集群模式:在集群内部运行,支持独立模式和集成模式

3. Pulsar Function的使用

3.1 Pulsar Function的启用

修改Pulsar集群所有服务器的conf/broker.conf,如下内容

functionsWorkerEnabled=true

修改Pulsar集群所有服务器的conf/functions_worker.yml,如下内容

pulsarFunctionsCluster: pulsar-cluster

然后重启broker服务

3.2 使用Pulsar Function

运行官网提供的example包,先在集群模式下创建Function,创建完成的Function是运行的

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
>   --jar examples/api-examples.jar \
>   --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
>   --inputs persistent://public/default/exclamation-input \
>   --output persistent://public/default/exclamation-output \
>   --tenant public \
>   --namespace default \
>   --name exclamation
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#

然后触发Function运行,得到结果。原理是向exclamation-input这个topic发送消息,然后消费exclamation-output这个topic的消息

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name exclamation --trigger-value "hello world"
hello world!
[root@bigdata001 apache-pulsar-2.9.1]#

查看Function状态

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions status --name exclamation
{"numInstances" : 1,"numRunning" : 1,"instances" : [ {"instanceId" : 0,"status" : {"running" : true,"error" : "","numRestarts" : 0,"numReceived" : 0,"numSuccessfullyProcessed" : 0,"numUserExceptions" : 0,"latestUserExceptions" : [ ],"numSystemExceptions" : 0,"latestSystemExceptions" : [ ],"averageLatency" : 0.0,"lastInvocationTime" : 0,"workerId" : "c-pulsar-cluster-fw-bigdata003-8086"}} ]
}
[root@bigdata001 apache-pulsar-2.9.1]#

stop Function

[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions stop --name exclamation
Stopped successfully
[root@bigdata001 apache-pulsar-2.9.1]#

start Function

[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions start --name exclamation
Started successfully
[root@bigdata001 apache-pulsar-2.9.1]#

delete Function

[root@bigdata001 apache-pulsar-2.9.1]#
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions delete --name exclamation
"Deleted successfully"
[root@bigdata001 apache-pulsar-2.9.1]#

**其它Function使用说明

[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions [command]

command可选值:

  1. localrun:创建本地function运行
  2. get:获取function相关信息
  3. restart:重启
  4. stats:查看状态
  5. list:查看特点tenant和namespace下的所有function

4. 自己编写一个Function

需求:读取input topic,其中日期格式为yyyy/MM/dd HH/mm/ss,转换为格式yyyy-MM-dd HH:mm:ss,然后发送到output topic

  1. 添加依赖
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-functions-api</artifactId><version>2.9.1</version>
</dependency>
  1. 编写程序
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.pulsar.functions.api.{Context, Function}import java.util.Dateclass DateTransfromFunction extends Function[String, String] {private val oldDateFormat: FastDateFormat =FastDateFormat.getInstance("yyyy/MM/dd HH/mm/ss")private val newDateFormat: FastDateFormat =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")// 每来一条消息,都会调用process进行处理// context表示上下文对象,用于执行一些相关的统计计算操作,以及获取相关的对象和元数据信息override def process(input: String, context: Context): String = {val oldDate: Date = oldDateFormat.parse(input)val newStringDate = newDateFormat.format(oldDate)newStringDate}}
  1. 然后将程序进行打包,上传到Pulsar集群中的一台服务器
  2. 创建Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions create \
> --jar /opt/pulsar_dev-1.0-SNAPSHOT.jar \
> --classname DateTransfromFunction \
> --inputs persistent://public/default/dateTransfrom-input \
> --output persistent://public/default/dateTransfrom-output \
> --tenant public \
> --namespace default \
> --name dateTransfrom
"Created successfully"
[root@bigdata001 apache-pulsar-2.9.1]#
  1. 触发Function
[root@bigdata001 apache-pulsar-2.9.1]# bin/pulsar-admin functions trigger --name dateTransfrom --trigger-value "2022/04/10 16/32/18"
2022-04-10 16:32:18
[root@bigdata001 apache-pulsar-2.9.1]#

Apache Pulsar的Function流式计算框架使用相关推荐

  1. python 流式计算框架_流式计算的三种框架:Storm、Spark和Flink

    我们知道,大数据的计算模式主要分为批量计算(batch computing).流式计算(stream computing).交互计算(interactive computing).图计算(graph ...

  2. Storm 流式计算框架介绍

    文章目录 1.Storm简介 1.1 DAG(有向无环图) 1.2 Storm介绍 1.2.1 Storm 简介 1.2.2 Storm的优点 1.2.3 Storm的特性 1.3 Storm与Had ...

  3. JStorm—实时流式计算框架入门介绍

    JStorm介绍   JStorm是参考storm基于Java语言重写的实时流式计算系统框架,做了很多改进.如解决了之前的Storm nimbus节点的单点问题.   JStorm类似于Hadoop ...

  4. 分布式流式计算框架Storm

    Storm用于实时处理,就好比 Hadoop 用于批处理.         --> 离线计算:批量获取数据,批量传输数据,周期性比量计算数据,数据展示(Sqoop-->HDFS--> ...

  5. Flink流式计算框架中的窗口函数

    一.窗口(window) (1)一般真实的流都是无界的,怎样处理无界的数据? (2)可以把无限的数据流进行切分,得到有限的数据集进行处理--也就是得到有界流: (3)窗口(window)就是将无限流切 ...

  6. 流式计算框架Storm 编程案例部署Linux结果演示及pom依赖

    使用maven方式创建storm项目: <?xml version="1.0" encoding="UTF-8"?> <project xml ...

  7. 【Flink流式计算框架】常见sink操作

    007Flink print() / printToErr() writeAsText() Flink提供的sink 自定义sink 获取source的方式(自带的)        基于文件:read ...

  8. 流式计算框架Storm网站访问来源实时统计及存储到redis代码示例

  9. 流式计算框架Storm编程案例:实时给手机品牌转大写并加上时间戳后缀代码示例

    导入jar包,保险起见,直接从storm安装目录拷贝,maven方式可能会因版本问题出现纰漏. 结果演示:

最新文章

  1. Python从零开始 day2
  2. Codeforces Round #469 (Div. 2) A/B
  3. WaitForMultipleObjects用法详解
  4. Java Iterator 接口简介和简单用法.
  5. 添加文件然后自动打开
  6. HDU - 1536 S-Nim(sg函数)
  7. python堆栈反向输出列表_python - IPython:将Python脚本的输出重定向到文件(如bash) - 堆栈内存溢出...
  8. python自动化接口测试excel用例串行之行_python 读取 Excel 自动化执行测试用例
  9. 记录下kaggle比赛经验
  10. (40)Xilinx PLL IP核配置(一)(第8天)
  11. python多进程运行死机_python多进程假死
  12. Concrete Mathematics A Foundation for Computer Science
  13. 梦幻内存辅助制作教程
  14. 计算机组装工具以及装机流程,如何组装电脑,图文教程详解电脑组装全过程
  15. 重庆大Vseo这么多年的seo心得
  16. 【时间转换】将秒转换成“时分秒”格式
  17. 为什么我不建议编程初学者使用Vim,一张图告诉你
  18. github创建tag
  19. python数据分析期末_Python数据分析期末作业
  20. Java课设对对碰_java课程设计实验报告

热门文章

  1. 买卖股票的最佳时机系列
  2. java 递归函数_java 递归函数
  3. 快乐打小鸟游戏的开发
  4. DirectshowLib摄像头拍照的”未找到可用于建立连接的介质筛选器组合“ 解决办法...
  5. Web开发人员月报2018年10月
  6. [ITIL]-ITIL4模拟题库1
  7. 过年帮同学做Windows 7/8系统的心得
  8. AXI,AXI_Lite,AXI_Stream
  9. python爬虫(廖雪峰商业爬虫)
  10. 兰州中考计算机考试,兰州城市四区2017年中考考务工作安排