NiFi进行统一nar包组件升级

auth : Hadi
since : 2022-3-9 10:54:29

NiFi界面上,每次更新了nar包后,总是会涉及到手动changeVersion的操作:

但,如果这个nar包涉及到的组件实例过多,每次都一个一个的去点击,那实在是太麻烦了。所以身为程序员不能做这种机器化的工作,所以写了简单的模拟请求给直接供大家参考。由于代码在公司开发,这里仅仅发送第一个测试版本,但放心可靠!

模拟请求

这个版本仅使用http请求,且无认证的方式进行更改,如果需要包装,那么就包装一下就可以了。

通过手动发送请求,可以在浏览器F12界面看到对应的请求体,我们模拟请求体就可以了。(或者直接nifi doc api中也有相关介绍)

模拟一下json请求:

一共需要6个参数,clientId/cmdVersion/identifier/group/artifact/version。后面三项是我们nar包的签名,前面三项是可以通过修改前访问组件获取。(直接访问http://NIFI_IP/nifi-api/processId)


搜寻对应Processors

通过nifi doc api查询可以得到访问/resources 可以获取到页面上的所有组件的信息,筛选出processors部分,然后判断nar包签名是否匹配。

完成后,保存在HashMap中,等待执行。

功能菜单

由于可能有多个nar需要更改,或者多个NiFi集群需要更改,那么我们就弄个交互页面来独立成为单独的NiFi Util工具包吧!

总结

如果需要HTTPS 和 认证版本只需要进行很简单的包装即可。本工具需要在nifi集群健康,且组件STOP或DISABLE状态,如果想添加功能,只需要模拟STOP请求即可(也很简单)。框架已经搭好,各位随意使用,不过转载或使用请标注作者和出处,谢谢。

注意

  • 无法更改的组件不会被更改(RUNNING状态)。

  • 如果组件的relationship和配置项不一致,还是需要手动处理。

  • 如果对应版本没加载到NiFi中,不会进行版本更改。

  • 代码简单,各位想要的东西自寻更改。

  • 如果该代码涉及到部分隐私泄露,请立即私信联系,谢谢各位。

源码

简单的构建就弄完了,具体免费源代码请移步[这里]https://download.csdn.net/download/qq_36610426/84050357)。

主要实现类:

package com.huangyichun.nifi.util;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;import java.io.*;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;/*** @author huangyichun107* @since 2021-11-23 10:40:03* <p>* 用于NiFi的nar包统一升级改造*/
public class ChangeNiFiNarVersion {static {Properties prop = System.getProperties();prop.setProperty("http.proxyHost", "127.0.0.1"); //如果不需要代理请注释静态方法。prop.setProperty("http.proxyPort", "3128");}/*** testIp 127.0.0.1*/static String NIFI_IP = "";static String NIFI_PORT = "8848";static String NIFI_URL = "http://" + NIFI_IP + ":" + NIFI_PORT + "/nifi-api";final static Map<String, Processor> PROCESSOR_MAP = new HashMap<>();final static String STRING_LIST = "\tq | exit | out :\n" +"\t    out the program.\n" +"\t    \n" +"\tip $nifiIp:\n" +"\t    set nifi ip.\n" +"\n" +"\tversion $version:\n" +"\t    set version.\n" +"\n" +"\tartifact $artifact:\n" +"\t    set artifact.\n" +"\n" +"\tgroup $group:\n" +"\t    set group.\n" +"\n" +"\tsearch:\n" +"\t    need ip/version/artifact/group to seach older processors.\n" +"\n" +"\tchange:\n" +"\t    need ip/version/artifact/group to change older processors.\n" +"\n" +"\tcheck:\n" +"\t    to list parameters now.";final static Scanner scanner = new Scanner(System.in);public static void main(String[] args) {System.out.println(STRING_LIST);System.out.println("尽量输入慢一点,java嘛,不寒碜");String version = "";String artifact = "";String group = "";String cmd;while (true) {System.out.println("cmd plz.");cmd = scanner.nextLine();String in = cmd.split(" ")[0];switch (in) {case "q":case "exit":case "out":System.out.println("log out");return;case "ip":final String[] s = cmd.split(" ");if (s.length != 2) {System.out.println("ip $nifiIp:\n  set nifi ip.\n  ip cmd need 1 parameter");break;}if (!s[1].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {System.out.println("wrong nifi ip");break;}NIFI_IP = s[1];NIFI_URL = "http://" + NIFI_IP + ":10111/nifi-api";PROCESSOR_MAP.clear();break;case "version":final String[] versions = cmd.split(" ");if (versions.length != 2) {System.out.println("version $version:\n  set nifi version.\n  version cmd need 1 parameter");break;}version = versions[1];break;case "artifact":final String[] artifacts = cmd.split(" ");if (artifacts.length != 2) {System.out.println("artifact $artifact:\n  set nifi artifact.\n  artifact cmd need 1 parameter");break;}artifact = artifacts[1];break;case "group":final String[] groups = cmd.split(" ");if (groups.length != 2) {System.out.println("group $group:\n  set nifi group.\n  group cmd need 1 parameter");break;}group = groups[1];break;case "search":if (NIFI_IP.isEmpty() || version.isEmpty() || artifact.isEmpty() || group.isEmpty()) {System.out.println("search need ip/version/artifact/group to change older processors.");}System.out.println("+----------------------------- search start -------------------------------+");search(artifact, group);if (PROCESSOR_MAP.isEmpty()) {System.out.println("find no processors");}System.out.println("+----------------------------- search done --------------------------------+");break;case "change":if (NIFI_IP.isEmpty() || version.isEmpty() || artifact.isEmpty() || group.isEmpty()) {System.out.println("change need ip/version/artifact/group to change older processors.");}if (PROCESSOR_MAP.isEmpty()) {System.out.println("find no processors");break;}System.out.println("+----------------------------- change start -------------------------------+");change(artifact, group, version);System.out.println("+----------------------------- change done --------------------------------+");break;case "check":System.out.println("-----------------------------------------------------------------------------------------------------------");System.out.println("\tip:" + NIFI_IP + "\n\tversion:" + version + "\n\tartifact:" + artifact + "\n\tgroup:" + group);System.out.println("-----------------------------------------------------------------------------------------------------------");break;default:System.out.println("wrong cmd.");System.out.println(STRING_LIST);}}}public static void change(String artifact, String group, String version) {if (PROCESSOR_MAP.isEmpty()) {System.out.println("find 0 processor to change, plz search again.");}for (Map.Entry<String, Processor> stringProcessorEntry : PROCESSOR_MAP.entrySet()) {final Processor value = stringProcessorEntry.getValue();System.out.println(value.identifier + ":" + doChangeVersionJsonObject(value.identifier, artifact, group, version));}}public static void search(String myArtifact, String myGroup) {PROCESSOR_MAP.clear();final String resourcesJson = doGetRequest(NIFI_URL + "/resources");final JSONObject jsonObject = JSONObject.parseObject(resourcesJson);final JSONArray resourcesArray = jsonObject.getJSONArray("resources");for (Object o : resourcesArray) {JSONObject processorId = o instanceof JSONObject ? ((JSONObject) o) : null;if (processorId == null) {continue;}final String identifier = processorId.getString("identifier");// 判断是否是 processorsif (!identifier.startsWith("/processors")) {continue;}// 获取组件信息 jsonfinal String processorsJson = doGetRequest(NIFI_URL + identifier);final JSONObject processorsJsonObject = JSONObject.parseObject(processorsJson);// 获取组件 componentfinal JSONObject component = processorsJsonObject.getJSONObject("component");final String type = component.getString("type");// 获取组件 bundlefinal JSONObject bundle = component.getJSONObject("bundle");final String artifact = bundle.getString("artifact");final String version = bundle.getString("version");final String group = bundle.getString("group");if (artifact.equals(myArtifact) && group.equals(myGroup)) {// 获取配置 configfinal JSONObject config = component.getJSONObject("config");final JSONObject properties = config.getJSONObject("properties");System.out.println(String.join("\t", "find", artifact, version, group, identifier, type));PROCESSOR_MAP.put(identifier, new Processor(identifier, type, config, properties, artifact));}}}static boolean doChangeVersionJsonObject(String identifier, String artifact, String group, String version) {// 在更新的时候,必须获取到最新的 Processors versionfinal String path = NIFI_URL + identifier;final JSONObject result = JSONObject.parseObject(doGetRequest(path));final JSONObject revision = result.getJSONObject("revision");final String clientId = revision.getString("clientId");int cmdVersion = revision.getInteger("version");identifier = identifier.replaceFirst("/processors/", "");String json = JSONObject.parseObject("" +"{\n" +"    \"revision\": {\n" +"        \"clientId\": \"" + clientId + "\",\n" +"        \"version\": " + cmdVersion + "\n" +"    },\n" +"    \"disconnectedNodeAcknowledged\": false,\n" +"    \"component\": {\n" +"        \"id\": \"" + identifier + "\",\n" +"        \"bundle\": {\n" +"            \"group\": \"" + group + "\",\n" +"            \"artifact\": \"" + artifact + "\",\n" +"            \"version\": \"" + version + "\"\n" +"        }\n" +"    }\n" +"}").toString();doPutRequest(path, json);return true;}/*** 发起一个url GET 请求并反回String** @param path http://www.baidu.com* @return the html or data*/public static String doGetRequest(String path) {StringBuilder stringBuilder = new StringBuilder();final String http = "http";if (!path.startsWith(http)) {System.out.println("the url NOT begin with http. plz check: " + path);return stringBuilder.toString();}URL url;try {url = new URL(path);} catch (MalformedURLException e) {System.out.println("the url create failed. plz check: " + path);return stringBuilder.toString();}try {HttpURLConnection conn = (HttpURLConnection) url.openConnection();conn.setRequestProperty("accept", "*/*");conn.setRequestProperty("connection", "Keep-Alive");conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");conn.setDoOutput(true);conn.setDoInput(true);conn.setRequestMethod("GET");conn.setConnectTimeout(50000);conn.connect();try (InputStream is = conn.getInputStream()) {// 返回的就是一个json 不会很长直接推string吧, 后续也不想用JSON来解析了,直接String解析吧BufferedReader br = new BufferedReader(new InputStreamReader(is));String str;while ((str = br.readLine()) != null) {str = new String(str.getBytes(), StandardCharsets.UTF_8);stringBuilder.append(str);}} catch (IOException e) {e.printStackTrace();}conn.disconnect();} catch (MalformedURLException e) {System.out.println("IOException plz check:" + e);e.printStackTrace();} catch (IOException e) {System.out.println("IOException plz check:" + e);}return stringBuilder.toString();}/*** 发起一个url put 请求并反回String** @param path http://www.baidu.com* @return the html or data*/public static String doPutRequest(String path, String json) {StringBuilder stringBuilder = new StringBuilder();final String http = "http";if (!path.startsWith(http)) {System.out.println("the url NOT begin with http. plz check: " + path);return stringBuilder.toString();}URL url;try {url = new URL(path);} catch (MalformedURLException e) {System.out.println("the url create failed. plz check: " + path);return stringBuilder.toString();}try {HttpURLConnection conn = (HttpURLConnection) url.openConnection();conn.setRequestProperty("accept", "*/*");conn.setRequestProperty("connection", "Keep-Alive");conn.setRequestProperty("Content-Type", "application/json");conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");conn.setDoOutput(true);conn.setDoInput(true);conn.setRequestMethod("PUT");conn.setConnectTimeout(50000);conn.connect();OutputStream outputStream = conn.getOutputStream();outputStream.write(json.getBytes(StandardCharsets.UTF_8));outputStream.flush();outputStream.close();try (InputStream is = conn.getInputStream()) {BufferedReader br = new BufferedReader(new InputStreamReader(is));String str;while ((str = br.readLine()) != null) {str = new String(str.getBytes(), StandardCharsets.UTF_8);stringBuilder.append(str);}} catch (IOException e) {e.printStackTrace();}conn.disconnect();} catch (MalformedURLException e) {System.out.println("IOException plz check:" + e);e.printStackTrace();} catch (IOException e) {System.out.println("IOException plz check:" + e);}return stringBuilder.toString();}static class Processor {String identifier;String type;JSONObject config;JSONObject properties;String artifact;Processor(String identifier, String type, JSONObject config, JSONObject properties, String artifact) {this.identifier = identifier;this.type = type;this.config = config;this.properties = properties;this.artifact = artifact;}}}

NiFi 一键自动升级Nar包相关推荐

  1. 设置Maven自动升级jar包

    通常,Maven中配置依赖使用以下形式 <dependency><groupId>org.seleniumhq.selenium</groupId><arti ...

  2. 预警系统一键自动升级_东风风行新风行T5购车手册,首推1.5T自动尊贵型

    "东风风行汽车这两年在国内汽车市场中的表现是出色的." 国产紧凑型SUV车型市场的竞争变得愈发的激烈起来了,不少汽车厂商都在发力这个级别的车型市场.紧凑型SUV车型市场一直以来都是 ...

  3. 预警系统一键自动升级程序v2.5.2_一汽大众全新高尔夫上市 售价12.9816.58万

    [太平洋汽车网新车频道]11月7日,一汽-大众全新数字高尔夫(MK8)在佛灵湖车迷大会上上市,共四款配置车型, 售价12.98-16.58万 .新车为全球首款基于MQBEvo平台打造的大众车,采用全新 ...

  4. 移动开发:Android Ant一键自动打多渠道包

    build.xml =========================================================================== <?xml versi ...

  5. 移动开发:Android Ant一键自动打多渠道包(xmltask)

    build.xml =========================================================================== <?xml versi ...

  6. tomcat升级_「shell脚本」懒人运维之自动升级tomcat应用(war包)

    准备: 提前修改war包里的相关配置,并上传到服务器: 根据要自动升级的tomcat应用修改或添加脚本相关内容: tomcat启动脚本如是自己写的,要统一格式命名,如:xxx.xxxTomcat 等: ...

  7. 自动将 NuGet 包的引用方式从 packages.config 升级为 PackageReference

    在前段时间我写了一篇迁移 csproj 格式的博客 将 WPF.UWP 以及其他各种类型的旧 csproj 迁移成基于 Microsoft.NET.Sdk 的新 csproj,不过全过程是手工进行的, ...

  8. 本地tomcat启动war包_「shell脚本」懒人运维之自动升级tomcat应用(war包)

    准备: 提前修改war包里的相关配置,并上传到服务器: 根据要自动升级的tomcat应用修改或添加脚本相关内容: tomcat启动脚本如是自己写的,要统一格式命名,如:xxx.xxxTomcat 等: ...

  9. LNMP一键安装包 PHP自动升级脚本

    前一段时间完成了lnmp一键安装包的PHP自动升级脚本,今天发布出来,如果想升级PHP版本的lnmp用户可以试用一下.支持目前lnmp的所有版本. 只能有低版本升级到高版本不能降级.可以升级到现有PH ...

  10. 安卓端一键自动设置WiFi代理的APP,配合Fiddler、Burp、Charles等抓包工具使用,懒人必备!

    本文为原创文章,转载请注明出处!!! 前言 在安卓逆向.软件测试等工作过程中,使用Fiddler.Burp.Charles等抓包工具,需要经常设置和取消手机的WiFi代理. 因为一个字"懒& ...

最新文章

  1. jackson.ObjectMapper里enableDefaultTyping方法过期
  2. 0-1背包(及初始化问题)
  3. 五行中的土在哪个方位_土命人适合往哪个方向发展
  4. mysql 赋给用户权限 grant all privileges on
  5. [css] 为什么说不提倡用1px的小尺寸图片做背景平铺?
  6. Android 获取设备ID,手机厂商,运营商,联网方式,获取系统语言,获取时区
  7. ap计算机科学a买什么书,准备AP*计算机科学A考试-第1部分
  8. 在创业之路上,每个人都会有很多的老师
  9. CodeForces Round #290 Div.2
  10. 关于tomcat那些事情 - tomcat6.0 配置ip地址访问不用加端口和项目名
  11. Latex添加矢量图/将visio画图转换成矢量图
  12. 【Tomcat】修改密码
  13. STM32 CAN编程详解
  14. linux串口ttys1,linux ttySx 应用
  15. Ubuntu开发嵌入式串口权限问题
  16. Camouflaged Object Detection阅读笔记
  17. 别再说你不会!java嵌入式开发教程
  18. 引入video.js并使用
  19. 十独吟 之一 李清照
  20. 【20210409期AI简报】INT8加速训练方案、用树莓派打造的寄居蟹机器人

热门文章

  1. SharePoint下载大文件失败 异常信息: system.OutOfMemoryException
  2. 云医院HIS系统—医院挂号模块
  3. Eclipse启动Tomcat 警告: 基于APR的本地库加载失败.错误报告为
  4. 随滚动条移动的QQ在线客服代码
  5. 软件工程专业大学四年学什么
  6. 用PPT就可以做印章?是的,超简单超逼真,教你一分钟搞定
  7. IntelliJ IDEA入门教程:如何使用工具窗口
  8. Revit二次开发——链接模型坐标系与模型坐标系转换
  9. 传统运维 VS 互联网运维 框架体系大观
  10. 豆瓣电影TOP250全套下载