好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受.

简单描述一下自己通过学习nifi和查看源码,然后自己通过nifi的体现很基础的实现自己的业务的处理器开发。

适合刚入门的看,有nifi基础的,可以跳过!

目录

一、 获取一个json文件的文本信息.

1.Json文档的输入端,需要一个GenerateFlowFile处理器,做如下配置:

2.拖拽自己实现的处理器

3.使用PutFile

二、合并文本的内容

1.Json文档的输入端

2.拖拽自己实现的处理器

3.使用PutFile

4.整体的流程图如下所示:

三、给一个文件内容添加头信息

1.Json文档的输入端

2.拖拽自己实现的处理器

3.使用PutFile

4.整体的流程图如下所示


一、 获取一个json文件的文本信息.

假定我们需要在一段json文件中,获取json串中的某个key所对应的value的值,那么在nifi的架子上,该如何操作呐?共分为三步:

1.Json文档的输入端,需要一个GenerateFlowFile处理器,做如下配置:

Custom text 存放的是目标json串.json如下所示:{"error_code":0,"reason":"ok","result":{"items":[{"regStatus":"存续","estiblishTime":1495555200000,"regCapital":"","pencertileScore":4902,"type":1,"legalPersonName":"温旭颖","toco":2,"legalPersonId":2051255554,"name":"陕西蜂窝科技股份有限公司","logo":"","alias":"蜂窝科技","id":3053414776,"category":"723","personType":1,"base":"han"},{"regStatus":"注销","estiblishTime":1473264000000,"regCapital":"","pencertileScore":3860,"type":1,"legalPersonName":"常青","toco":8,"legalPersonId":1911055314,"name":"陕西蜂窝科技股份有限公司","logo":"","alias":"蜂窝科技","id":2958332903,"category":"721","personType":1,"base":"xj"}],"total":18}}

2.拖拽自己实现的处理器

根据要求配置好自己的json_path的路径,这样方便获取数据.如: $.result.items[*]

自己处理器的主要代码是:

@Tags({"first-example:fetch value from json string"})
@SideEffectFree
@CapabilityDescription("fetch value from json string.")
public class FirstProcessor extends AbstractProcessor {private List<PropertyDescriptor> properties;private Set<Relationship> relationships;private final String arrayFlag="true";/*** json路径.*/public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder().name("Json Path").required(true).description("json path value,such as:$.test").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();/*** json路径.*/public static final PropertyDescriptor ARRAY_FLAG = new PropertyDescriptor.Builder().name("Array Flag").required(true).description("mark if the input json is array or not").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues("true", "false").defaultValue("false").build();/*** 成功标识.*/public static final Relationship SUCCESS = new Relationship.Builder().name("SUCCESS").description("Succes relationship").build();@Overridepublic Set<Relationship> getRelationships(){return relationships;}@Overridepublic List<PropertyDescriptor> getSupportedPropertyDescriptors(){return properties;}/*** 初始化配置* @param context*/@Overridepublic void init(final ProcessorInitializationContext context){List<PropertyDescriptor> properties = new ArrayList<>();properties.add(JSON_PATH);properties.add(ARRAY_FLAG);this.properties = Collections.unmodifiableList(properties);Set<Relationship> relationships = new HashSet<>();relationships.add(SUCCESS);this.relationships = Collections.unmodifiableSet(relationships);}@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {final AtomicReference<String> value = new AtomicReference<>();FlowFile flowfile = session.get();session.read(flowfile, new InputStreamCallback() {@Overridepublic void process(InputStream in) throws IOException {try{String json =IOUtils.toString(in, StandardCharsets.UTF_8.name());String flag = context.getProperty(ARRAY_FLAG).getValue();if (flag.equalsIgnoreCase(arrayFlag)){List<Object> dataList = JsonPath.read(json, context.getProperty(JSON_PATH).getValue());if (ObjectUtils.allNotNull(dataList)){StringBuilder all = new StringBuilder("[");int total = 0;for (Object object : dataList) {LinkedHashMap<String,Object> dataMap = (LinkedHashMap<String, Object>) object;Set<String> keys = dataMap.keySet();int count = 0;StringBuilder builder = new StringBuilder("{");for (String key :keys ) {if (count==keys.size()-1){builder.append("\""+key+"\":\""+dataMap.get(key)+"\"");}else{builder.append("\""+key+"\":\""+dataMap.get(key)+"\",");}count++;}if (total==dataList.size()-1){builder.append("}");}else {builder.append("},");}total++;all.append(builder.toString());builder.reverse();}all.append("]");value.set(all.toString());}}else {String result = JsonPath.read(json, context.getProperty(JSON_PATH).getValue());value.set(result);}}catch(Exception ex){ex.printStackTrace();getLogger().error("failed to read json string.");}}});//Write the results to an attributeString results = value.get();if(results != null && !results.isEmpty()){String flag = context.getProperty(ARRAY_FLAG).getValue();if (flag.equalsIgnoreCase(arrayFlag)){Map<String,String> data=new HashMap<>(16);data.put(NiFiConstant.MATCH_ATTR,value.toString());flowfile = session.putAllAttributes(flowfile,data);}else {flowfile = session.putAttribute(flowfile, NiFiConstant.MATCH_ATTR, results);}}//To write the results back out ot flow fileflowfile = session.write(flowfile, new OutputStreamCallback() {@Overridepublic void process(OutputStream out) throws IOException {out.write(value.get().getBytes());}});session.transfer(flowfile, SUCCESS);}}

要指明是不是一个json array ,因为jsonobject 和jsonarray 的解析和接收对象是不一样的。

3.使用PutFile

指定处理完成之后,文件的输出地址:

4.整个的流程图如下:

直接上代码,按照图所示来操作,就可以看见对应的文件输出到目录里了。

二、合并文本的内容

假定我们需要在把一个文本内容拼接上另外一个文本内容,那么在nifi的架子上,该如何操作呐?共分为三步:

1.Json文档的输入端

需要一个GenerateFlowFile处理器,做如下配置:

Custom text 存放的是操作的文本内容,如下所示: 你是哪个 ?

2.拖拽自己实现的处理器

根据要求配置好自己的input value的值,这样就可以将a中的文本内容进行拼接:

代码实现如下:

@Tags({"second-example:Combine two sentences!"})
@SeeAlso({})
@SideEffectFree
@CapabilityDescription("merge two content to one together")
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class SecondProcessor extends AbstractProcessor {/*** 属性描述对象集合*/private List<PropertyDescriptor> descriptors;/*** 关联关系集合*/private Set<Relationship> relationships;/*** 文件设置.*/private static final String FILE_NAME = "out-";private static final String FILE_SUFFIX = ".txt";public static final PropertyDescriptor INPUT_VALUE = new PropertyDescriptor.Builder().name("INPUT_VALUE").displayName("INPUT VALUE").description("input value for operating").required(true)//非空验证.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("sucess").description("example relationship uccess").build();public static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure").description("example relationship failure").build();public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("character-set").displayName("Character Set").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();@Overrideprotected void init(final ProcessorInitializationContext context) {final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();descriptors.add(INPUT_VALUE);descriptors.add(CHARSET);this.descriptors = Collections.unmodifiableList(descriptors);final Set<Relationship> relationships = new HashSet<Relationship>();relationships.add(RELATIONSHIP_SUCCESS);relationships.add(RELATIONSHIP_FAILURE);this.relationships = Collections.unmodifiableSet(relationships);}@Overridepublic Set<Relationship> getRelationships() {return this.relationships;}@Overridepublic final List<PropertyDescriptor> getSupportedPropertyDescriptors() {return descriptors;}@OnScheduledpublic void onScheduled(final ProcessContext context) {getLogger().info("Processor-Name"+context.getName());Map<PropertyDescriptor, String> dataMap = context.getProperties();for (Map.Entry<PropertyDescriptor, String> entry : dataMap.entrySet()) {getLogger().info("key="+entry.getKey().toString()+",value="+entry.getValue());}}@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {FlowFile flowFile = session.get();if ( flowFile == null ) {return;}final AtomicReference<String> value = new AtomicReference<>();session.read(flowFile, new InputStreamCallback() {@Overridepublic void process(InputStream inputStream) throws IOException {try{String inputVal = IOUtils.toString(inputStream, StandardCharsets.UTF_8.name());//utf8 的设置final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());getLogger().info("得到字符集结果是:"+charset.name());String current = new String(context.getProperty(INPUT_VALUE).getValue().getBytes(charset),StandardCharsets.UTF_8.name());String result = "处理结果:" + inputVal + current;//以 utf8 的方式把流信息写出去.getLogger().info("处理得到的结果是:"+result);value.set(result);}catch(Exception ex){ex.printStackTrace();getLogger().error("failed to read input string!");}}});String results = value.get();if(results != null && !results.isEmpty()){flowFile = session.putAttribute(flowFile, NiFiConstant.MATCH_ATTR, results);}//写入文件信息.flowFile = session.write(flowFile, new OutputStreamCallback() {@Overridepublic void process(OutputStream outputStream) throws IOException {getLogger().info("写出的消息是:"+value.get());byte[] content = value.get().getBytes();//远程的输出流outputStream.write(content);//重新定义本地输出流.outputStream = new FileOutputStream(new File(FILE_NAME+uuid()+FILE_SUFFIX));outputStream.write(content);}});session.transfer(flowFile, RELATIONSHIP_SUCCESS);}/*** 产生一个32位的GUID* @return*/public String uuid() {return getIdentifier().replace("-", "").toUpperCase();}
}

3.使用PutFile

指定处理完成之后,文件的输出地址:.

4.整体的流程图如下所示:

如此就可以完成两个文本内容的拼接并输出的操作。

三、给一个文件内容添加头信息

假定我们需要在把一个文件内部的内容拼接上另外一个文本内容,那么在nifi的架子上,该如何操作呐?共分为三步:

1.Json文档的输入端

需要一个GenerateFlowFile处理器,做如下配置:

注意file size 和字符集的设置。Custom text 存放的是操作的文本内容,如下所示:你是谁?

2.拖拽自己实现的处理器

根据要求配置好自己的文件的绝对路径的值,这样就可以将a中的文本内容进行拼接

需要一个绝对的文件路径,具体代码如下:

@Tags({"third-example:deal with content!"})
@SeeAlso({})
@SideEffectFree
@CapabilityDescription("add prefix to given content.")
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class ThirdProcessor extends AbstractProcessor {/*** 属性描述对象集合*/private List<PropertyDescriptor> descriptors;/*** 关联关系集合*/private Set<Relationship> relationships;/*** 文件设置.*/private static final String FILE_NAME = "combine-";private static final String FILE_SUFFIX = ".txt";public static final PropertyDescriptor ABSOLUTE_PATH = new PropertyDescriptor.Builder().name("ABSOLUTE_PATH").displayName("ABSOLUT PATH").description("input file path for operating").required(true)//非空验证.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();public static final Relationship SHIP_SUCCESS = new Relationship.Builder().name("sucess").description("example relationship uccess").build();public static final Relationship SHIP_FAILURE = new Relationship.Builder().name("failure").description("example relationship failure").build();public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("character-set").displayName("Character Set").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();@Overrideprotected void init(final ProcessorInitializationContext context) {final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();descriptors.add(ABSOLUTE_PATH);descriptors.add(CHARSET);this.descriptors = Collections.unmodifiableList(descriptors);final Set<Relationship> ships = new HashSet<Relationship>();ships.add(SHIP_SUCCESS);ships.add(SHIP_FAILURE);this.relationships = Collections.unmodifiableSet(ships);}@Overridepublic Set<Relationship> getRelationships() {return this.relationships;}@Overridepublic final List<PropertyDescriptor> getSupportedPropertyDescriptors() {return descriptors;}@OnScheduledpublic void onScheduled(final ProcessContext context) {getLogger().info("Processor-Name"+context.getName());Map<PropertyDescriptor, String> dataMap = context.getProperties();for (Map.Entry<PropertyDescriptor, String> entry : dataMap.entrySet()) {getLogger().info("key="+entry.getKey().toString()+",value="+entry.getValue());}}@Overridepublic void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {FlowFile flowFile = session.get();if ( flowFile == null ) {return;}final AtomicReference<String> value = new AtomicReference<>();//utf8 的设置final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());session.read(flowFile, new InputStreamCallback() {@Overridepublic void process(InputStream inputStream) throws IOException {try{String headerDesc = IOUtils.toString(inputStream, StandardCharsets.UTF_8.name());String filePath = context.getProperty(ABSOLUTE_PATH).getValue();InputStreamReader inRd = new InputStreamReader(new FileInputStream(filePath),charset);BufferedReader reader = new BufferedReader(inRd);String line=null;StringBuilder  builder = new StringBuilder();while (null!=(line=reader.readLine())){getLogger().info("文件信息是:"+line);builder.append(headerDesc+new String(line.getBytes(charset),StandardCharsets.UTF_8.name())+"\n\t");}//以 utf8 的方式把流信息写出去.getLogger().info("处理得到的结果是:"+builder.toString());value.set(builder.toString());}catch(Exception ex){ex.printStackTrace();getLogger().error("failed to read input string!");}}});String results = value.get();if(results != null && !results.isEmpty()){flowFile = session.putAttribute(flowFile, NiFiConstant.MATCH_ATTR, results);}//写入文件信息.flowFile = session.write(flowFile, new OutputStreamCallback() {@Overridepublic void process(OutputStream outputStream) throws IOException {getLogger().info("写出的消息是:"+value.get());byte[] content = value.get().getBytes();//远程的输出流outputStream.write(content);//重新定义本地输出流.outputStream = new FileOutputStream(new File(FILE_NAME+uuid()+FILE_SUFFIX));outputStream.write(content);}});session.transfer(flowFile, SHIP_SUCCESS);}/*** 产生一个32位的GUID* @return*/public String uuid() {return getIdentifier().replace("-", "").toUpperCase();}
}

3.使用PutFile

指定处理完成之后,文件的输出地址

4.整体的流程图如下所示

如此挨个执行,不报错的情况下,就可以看见执行的结果了。

本文只是一个简单描述下如何基于nifi框架,来实现自己的业务逻辑,下一篇我就复杂使用下,看看如何操作。

如有不明白的,请微信搜索公众号 :codingba ,我会一一解答。

NiFi 学习 —自己实现处理器相关推荐

  1. Spring MVC学习(8)—HandlerInterceptor处理器拦截器机制全解

    基于最新Spring 5.x,详细介绍了Spring MVC的HandlerInterceptor处理器拦截器机制,以及它的一系列拦截方法. 本次我们来学习Sring MVC的HandlerInter ...

  2. C语言再学习 -- C 预处理器

    gcc/cc xxx.c  可以编译链接C源程序生成一个可执行文件 a.out 整个过程中可以划分为以下的4步流程: (1)预处理/预编译: 主要用于包含头文件的扩展,以及执行宏替换等 //加上 -E ...

  3. CEVA推出用于前端设备深度学习的AI处理器NeuPro系列

    CEVA 发布了用于前端设备深度学习推理,而且功能强大的专用人工智能(AI)处理器系列 NeuPro™.NeuPro 系列处理器专为智能和互联的前端设备供应商而设计,寻求快速利用深度神经网络技术提供的 ...

  4. Linux内核学习--ARM系列处理器(指令集等)性能全面分析

    目录 一.引言 二.ARM处理器类型 ------> 2.1.Classic系列 ------> 2.2.Cortex系列 ------------> 2.2.1.Cortex-M系 ...

  5. Cortex-M3/M4学习随笔——一些处理器基本信息

    cortex-m3和cortex-m4处理器的一般信息 处理器类型 ARM Cortex-M为32位RISC处理器,具有以下特征: 1.32位寄存器 2.32位内部数据通路 3.32位总线接口 当然, ...

  6. 【新智元人工智能创业家】陈天石:寒武纪,全球首款深度学习专用处理器

    日前,GPU巨头企业NVIDIA宣布推出致力于加速人工智能和深度学习的芯片Tesla P100,以及全球首款面向深度学习的超级计算机DGX-1.不过,在陈天石看来,NVIDIA的这一系列组合拳对服务器 ...

  7. 20135306黄韧 信息安全系统设计基础期中学习总结

    信息安全系统设计基础第八周学习总结 知识点总结 第1章 计算机系统漫游 △计算机系统是由硬件和系统软件组成的,它们共同协作以运行应用程序.计算机内部的信息被表示为一组组的位.它们依据上下文有不同的解释 ...

  8. 从深度学习选择什么样的gpu来谈谈gpu的硬件架构

    2016-06-27 23:39 2511人阅读 评论(0) 收藏 举报 分类: GPU 从深度学习在2012年大放异彩,gpu计算也走入了人们的视线之中,它使得大规模计算神经网络成为可能.人们可以通 ...

  9. DSP学习 -- Visual Studio 操作

    如需转载请注明出处 https://juyou.blog.csdn.net/article/details/107862686 最近一直使用Visual Studio写代码了,其中用到一些东西感觉有必 ...

  10. UNIX再学习 -- 错误和警告

    错误和警告是常会出现的现象,了解它对以后解决问题会很有帮助.下面我们就重点来详细介绍它们. 一.错误 1.回忆错误 我们之前讲解其他内容时有涉及到错误的部分,下面让我们来回忆一下: (1)参看:C语言 ...

最新文章

  1. python100个必背知识-python学习必备知识汇总
  2. 股票连续投资历史收益计算
  3. 北斗导航 | 复杂环境下卫星导航算法(理论)
  4. caj转pdf_CAJ转PDF文件,这恐怕是最全的方法总结了!
  5. 如何删除eclipse多余的工作空间
  6. java爬虫-简单爬取网页图片
  7. 16.通过实现“函数一个返回值出口,返回两个返回值”。来了解指针与普通地址的区别...
  8. 并行、并发和代码性能优化
  9. 关于函数式编程的思考(1)
  10. busybox inittab
  11. bzoj2839 集合计数
  12. rest-framework解析器,url控制,分页,响应器,渲染器,版本控制
  13. 【sklearn第二十七讲】模型持久性
  14. LeetCode题解——Reverse Integer
  15. 服务器系统启用flash,基础设置:Windows Server 2012及2012R2 启用IE Flash
  16. 利用反射等离激元超表面的模拟光计算
  17. 解决QQ空间说说自动被发广告信息办法:取消第三方授权
  18. win10打开视频显示服务器运行失败,apache启动失败,详细教您快速解决Win10系统apache启动失败...
  19. 域名怎样实现自动跳转
  20. 英伟达JETSON XAVIER NX使用小记

热门文章

  1. 离线ROS API文档(Zeal或Dash)
  2. windows的那些好软件
  3. 淘宝运营到底是做什么的?
  4. mtk手机的联机方法
  5. 高通QCOM 8610平台电量计算
  6. Struts中动态ActionForm与静态ActionForm有什么区别?_
  7. 计算机输入什么指令关机,电脑自动关机命令是什么【详细介绍】
  8. 兄弟连php课程,LAMP兄弟连PHP课程学习笔记 第一天 PHP基本语法
  9. java 安卓 物联网_面向物联网的Android应用开发与实践
  10. html链接打开word文档,一招教你在Word文档中创建超链接打开其他文件