案例解析 GBase8s 在工业互联网平台中的应用
目录
工业互联网平台
工业4.0 和 中国智造2025
工业大数据
以 GBase8s 为核心的工业互联网数据平台逻辑架构
案例解析
模拟MQTT环境
创建MQTT Source
从 GBase8s 获取设备表信息
将处理结果写回数据库
流批混合
总结与展望
参考文献
工业互联网平台
工业互联网作为新一代信息技术与制造业深度融合的产物,通过实现人、机、物的全面互联,构建了一个全要素、全产业链、全价值链连接的、新型的生产制造和服务体系,是促进传统产业转型升级、实现高质量发展的重要驱动。企业工业互联网包括网络、平台、安全三大体系,其中工业互联网平台是工业全要素、全产业链、全价值链连接的枢纽,是实现制造业数字化、网络化、智能化过程中工业资源配置的核心,已成为当下各行各业瞩目的焦点。
工业4.0 和 中国智造2025
工信部长苗圩在讲到德国工业4.0与中国制造2025时,曾这样概括:如出一辙、异曲同工、殊途同归。因此,两者表述不同,但内涵基本一致。
工业4.0 : 由德国提出,主要指提升制造业的智能化水平,建立具有适应性、资源效率及基因工程学的智慧工厂,在商业流程及价值流程中整合客户及商业伙伴。其技术基础是网络实体系统及物联网 。
中国智造2025 : 中国制造2025,是中国政府实施制造强国战略第一个十年的行动纲领。《中国制造2025》提出,坚持“创新驱动、质量为先、绿色发展、结构优化、人才为本”的基本方针,坚持“市场主导、政府引导,立足当前、着眼长远,整体推进、重点突破,自主发展、开放合作”的基本原则,通过“三步走”实现制造强国的战略目标:
第一步,到2025年迈入制造强国行列;
第二步,到2035年中国制造业整体达到世界制造强国阵营中等水平;
第三步,到新中国成立一百年时,综合实力进入世界制造强国前列。
围绕实现制造强国的战略目标,《中国制造2025》明确了9项战略任务和重点,提出了8个方面的战略支撑和保障。
2016年4月6日国务院总理李克强主持召开国务院常务会议,会议通过了《装备制造业标准化和质量提升规划》,要求对接《中国制造2025》。7月19日国务院常务会议部署创建“中国制造2025”国家级示范区,专家指出,“中国制造2025”提至国家级,较以前城市试点有所升级。“7月19日部署的‘中国制造2025’国家级示范区相当于此前‘中国制造2025’城市试点示范的升级版,”工信部赛迪研究院规划所副所长张洪国对《21世纪经济报道》表示,此前是以工信部为主来批复“中国制造2025”试点示范城市,在国家制造强国建设领导小组指导下开展相关工作的;今后将由国务院来审核、批复国家级的示范区,相关文件也将由国务院来统一制定。
工业大数据
在中国智造2025的大前提下,工业大数据也就应运而生了。那么什么是“大数据“,什么又是“工业大数据“呢?
所谓“大数据”,指的是所涉及的数据量规模巨大到无法通过目前主流软件工具,在合理时间内达到截取、管理、处理、并整理成为帮助企业经营决策更积极目的的信息。其特征,数据容量大、多样、快速和价值密度低。
工业大数据除具有一般大数据的特征(数据容量大、多样、快速和价值密度低)外,还具有时序性、强关联性、准确性、闭环性等特征。
数据容量大(volume):数据的大小决定所考虑的数据的价值和潜在的信息。工业数据体量比较大,大量机器设备的高频数据和互联网数据持续涌入,大型工业企业的数据集将达到PB级甚至EB级别。
多样(variety):指数据类型的多样性和来源广泛。工业数据分布广泛,分布于机器设备、工业产品、管理系统、互联网等各个环节,并且结构复杂,既有结构化和半结构化的传感数据,也有非结构化数据。
快速(velocity):指获得和处理数据的速度。工业数据处理速度需求多样,生产现场级要求分析时限达到毫秒级,管理与决策应用需要支持交互式或批量数据分析。
价值密度低(value):工业大数据更强调用户价值驱动和数据本身的可用性,包括:提升创新能力和生产经营效率及促进个性化定制、服务化转型等智能制造新模式变革。
时序性(sequence):工业大数据具有较强的时序性,如订单、设备状态数据等。
强关联性(strong-relevance):一方面,产品生命周期同一阶段的数据具有强关联性,如产品零部件组成、工况、设备状态、维修情况、零部件补充采购等;另一方面,产品生命周期的研发设计、生产、服务等不同环节的数据之间需要进行关联。
准确性(accuracy):主要指数据的真实性、完整性和可靠性,更加关注数据质量以及处理、分析技术和方法的可靠性。对数据分析的置信度要求较高,仅依靠统计相关性分析不足以支撑故障诊断、预测预警等工业应用,需要将物理模型与数据模型结合,挖掘因果关系。
闭环性(closed-loop):包括产品全生命周期横向过程中数据链条的封闭和关联以及智能制造纵向数据采集和处理过程中,需要支撑状态感知、分析、反馈、控制等闭环场景下的动态持续调整和优化。
工业大数据作为大数据的一个应用行业,在具有广阔应用前景的同时,对传统的数据管理技术与数据分析技术也提出了很大的挑战。
以 GBase8s 为核心的工业互联网数据平台逻辑架构
工业互联网数据平台应该以提升产品智能化和深入拓展行业应用为己任。我们的逻辑架构也以此展开,首先我们从边缘(端)回去数据,通过技术手段将数据重新整合,并对外提供价值。
传统批处理式的数据平台,已经不再适应现代信息发展的要求的了,就更无法适配工业互联网数据平台的要求。所以我们采取了一种类 lambda 架构的结构形式,基于GBase8s 强大的事务处理能力,为整个平台提供数据支撑。
案例解析
接下来,我们将通过一个保姆级教程,来模拟一个简单的工业互联网数据处理模型。
设备通过mqtt向平台发送数据,但是一般设备数据都不会携带具体的设备信息,只会携带mac,或是一些其他标识,这样我们入库的时候就需要一次清理工作。基于上一章节我们的架构设计,我们只需要在8s里记录一个设备表,来存储必要信息,然后通过flink向mqtt流广播数据,这样我们就可以无缝添加设备,并进行处理了。处理后的数据sink到8s,再通过 8s 提供的 cdc(Change Data Capture 变更数据捕获) 功能。我们可以对增量数据进行更多应用需要的处理工作。
模拟MQTT环境
这里我们使用 docker 创建一个mqtt服务,并通过程序模拟,向其发送数据
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
下面是用于模拟发送数据
import cn.gbase.mqtt.MQTTSource; import com.google.gson.Gson; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.Hashtable; import java.util.Random; //数据模拟器 public class Emmit { public static void main(String[] args) {String[] gmacs = new String[]{"127.0.0.1","127.0.0.2","127.0.0.3","127.0.0.4","127.0.0.5","127.0.0.6"};for(;;){int i = new Random().nextInt(gmacs.length);String gmac = gmacs[i];Hashtable<String,String> data = new Hashtable<>();data.put("gmac",gmac);data.put("v1",new Random().nextInt(300)+"");data.put("v2",new Random().nextInt(100)+""); Gson g = new Gson();String jsonData = g.toJson(data); Emmit.emmit(jsonData);} } public static void emmit(String content){MemoryPersistence persistence = new MemoryPersistence(); try {MqttClient client = new MqttClient(MQTTSource.broker,"client2",persistence); // MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("admin");connOpts.setPassword("public".toCharArray());// 保留会话connOpts.setCleanSession(true);// 建立连接System.out.println("Connecting to broker: " + MQTTSource.broker);client.connect(connOpts); System.out.println("Connected"); // 消息发布所需参数System.out.println("Publishing message: " + content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(MQTTSource.qos);client.publish(MQTTSource.pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close(); } catch (MqttException me) {me.printStackTrace();}} }
创建MQTT Source
这个类的主要目的是从mqtt里回去输入数据。并将其封装成一个元组(Tuple)返回给下一个步骤。
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.eclipse.paho.client.mqttv3.*; import java.nio.charset.StandardCharsets; public class MQTTSource implements SourceFunction<Tuple2<String, String>> { public static String subTopic = "testtopic/#";public static String pubTopic = "testtopic/1";public static int qos = 2;public static String broker = "tcp://127.0.0.1:1883";public static String clientId = "emqx_test"; public static final String USERNAME = "admin";public static final String PASSWORD = "public"; private transient MqttClient client;private transient volatile boolean running;private transient Object waitLock; public void stop() {close();} @Overridepublic void run(final SourceContext<Tuple2<String, String>> ctx) throws Exception {MqttConnectOptions connectOptions = new MqttConnectOptions();connectOptions.setCleanSession(true);connectOptions.setAutomaticReconnect(true);connectOptions.setUserName(USERNAME);connectOptions.setPassword(PASSWORD.toCharArray());client = new MqttClient(broker, clientId);client.connect(connectOptions); client.subscribe(subTopic, (topic, message) -> {System.out.println(message);String msg = new String(message.getPayload(), StandardCharsets.UTF_8);ctx.collect(Tuple2.of(msg,topic));}); running = true;waitLock = new Object(); while (running) {synchronized (waitLock) {waitLock.wait(100L);} }} @Overridepublic void cancel() {close();} private void close() {try {if (client != null) {client.disconnect();}} catch (MqttException exception) { } finally {this.running = false;} // leave main methodsynchronized (waitLock) {waitLock.notify();}} }
从 GBase8s 获取设备表信息
这里我们会定时从信息表里面获取到一些数据,并将其广播出去。
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.*; import java.util.Hashtable; import java.util.Map; //RichSourceFunction RichParallelSourceFunction public class GBase8sSource extends RichSourceFunction<Map<String, String>> {private static final Logger logger = LoggerFactory.getLogger(GBase8sSource.class);public static String driverClassName = "com.gbasedbt.jdbc.IfxDriver";public static String db_url = "jdbc:gbasedbt-sqli://172.30.232.114:1533/test1:GBASEDBTSERVER=ol_gbasedbt1210;NEWCODESET=UTF-8,cp1252,819;DB_LOCALE=zh_cn.utf8;CLIENT_LOCALE=zh_cn.utf8;";public static String user = "gbasedbt";public static String password = "dafei1288"; private Connection connection = null;private Statement s = null;private volatile boolean isRunning = true; @Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Class.forName(driverClassName);connection = DriverManager.getConnection(db_url, user, password);//获取连接s = connection.createStatement();} @Overridepublic void run(SourceContext<Map<String, String>> ctx) throws Exception {Map<String, String> deviceMap = new Hashtable<String, String>();try {while (isRunning) {ResultSet resultSet = s.executeQuery("select mid, gmac,name from mtable");resultSet.beforeFirst();while (resultSet.next()) { String gmac = resultSet.getString(2);String name = resultSet.getString(3);//System.out.println("gmac = "+gmac + " , name = "+name );deviceMap.put(gmac, name); }System.out.println();System.out.println("flash map ==> " + deviceMap);System.out.println();ctx.collect(deviceMap);deviceMap.clear();Thread.sleep(1000 * 60);}} catch (Exception e) {logger.error("runException:{}", e);}} //关闭数据库连接@Overridepublic void cancel() {try {super.close();if (connection != null) {connection.close();}if (s != null) {s.close();}} catch (Exception e) {logger.error("runException:{}", e);}isRunning = false;} }
将处理结果写回数据库
此类的作用,将数据结果写回
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class GBase8sSink extends RichSinkFunction<DeviceData> {private PreparedStatement ps = null;private Connection connection = null; @Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConn();ps = connection.prepareStatement("insert into factable(name,gmac,v1,v2,updatetime) values (?,?,?,?,?);");}private Connection getConn() {try {Class.forName(GBase8sSource.driverClassName);connection = DriverManager.getConnection(GBase8sSource.db_url, GBase8sSource.user, GBase8sSource.password);} catch (Exception e) {e.printStackTrace();}return connection;} //每一个元素的插入,都会被调用一次invoke方法@Overridepublic void invoke(DeviceData dd, Context context) throws Exception {ps.setString(1,dd.getName());ps.setString(2,dd.getGmac());ps.setString(3,dd.getV1());ps.setString(4,dd.getV2());ps.setString(5,dd.getUpdatetime());ps.execute();} @Overridepublic void close() throws Exception {super.close();if(connection != null){connection.close();}if (ps != null){ps.close();}} }
这个类仅作为DTO使用
public class DeviceData { private String name;private String gmac;private String v1;private String v2;private String updatetime; @Overridepublic String toString() {return "DeviceData{" + // "mid='" + mid + '\'' +", name='" + name + '\'' +", gmac='" + gmac + '\'' +", v1='" + v1 + '\'' +", v2='" + v2 + '\'' +", updatetime='" + updatetime + '\'' +'}';} public String getName() {return name;} public void setName(String name) {this.name = name;} public String getGmac() {return gmac;} public void setGmac(String gmac) {this.gmac = gmac;} public String getV1() {return v1;} public void setV1(String v1) {this.v1 = v1;} public String getV2() {return v2;} public void setV2(String v2) {this.v2 = v2;} public String getUpdatetime() {return updatetime;} public void setUpdatetime(String updatetime) {this.updatetime = updatetime;} }
流批混合
此类的作用,混合流与批,并将处理的数据写回数据库。
import cn.gbase.mqtt.MQTTSource; import com.google.gson.Gson; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.*; public class MqttFlinkMain {private static Map<String, String> deviceMap = new Hashtable<String, String>(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 广播更新设备对照表DataStream<Map<String, String>> deviceStream = env.addSource(new GBase8sSource());deviceStream.broadcast().map(new MapFunction<Map<String, String>, Object>() {@Overridepublic Object map(Map<String, String> value) {deviceMap = value;return null;}}); // mqtt 数据输入DataStream<Tuple2<String, String>> inputStream = env.addSource(new MQTTSource()); DataStream<DeviceData> dataStream = inputStream.rebalance().flatMap(new FlatMapFunction<Tuple2<String, String>, DeviceData>() {@Overridepublic void flatMap(Tuple2<String, String> value, Collector<DeviceData> out) {String message = value.f0;String topic = value.f1;List<DeviceData> d = dataHandle(message, topic);for (DeviceData line : d) {out.collect(line);}}});//打印输出dataStream.print();//添加 8s sinkdataStream.addSink(new GBase8sSink()).name("gbase8s").setParallelism(2); env.execute("mqttFlinkMain");} private static List<DeviceData> dataHandle(String message, String topic) {List<DeviceData> d = new ArrayList<>();try {System.out.println("message = "+message);System.out.println("topic = "+topic);Gson g = new Gson();DeviceData dd = new DeviceData();Hashtable<String,String> mdata = g.fromJson(message,Hashtable.class); dd.setGmac(mdata.get("gmac"));dd.setV1(mdata.get("v1"));dd.setV2(mdata.get("v2"));String name = deviceMap.get(mdata.get("gmac"));dd.setName(name);dd.setUpdatetime(System.currentTimeMillis()+""); d.add(dd); } catch (Throwable t) {t.printStackTrace();}return d;} }
总结与展望
启动flink开始接受数据
启动数据模拟程序,用于模拟mqtt提交
好了,我们可以看到数据已经开始源源不断灌进来了。
通过上面的一个简单模拟,我们构建了一个简单的工业互联网数据处理模型。
我们不难发现,CDC并没有提到,CDC部分其实也是非常有意思的部分,但不在我们本次的讨论范围之内,未来会带来相关部分的内容。
参考文献
https://www.sohu.com/a/339738486_99916165
https://blog.csdn.net/u012447842/article/details/89175772
https://zhuanlan.zhihu.com/p/143696144
http://blog.itpub.net/69953737/viewspace-2741287/
https://pingcap.com/blog-cn/when-tidb-and-flink-are-combined/
https://blog.csdn.net/qq_30187071/article/details/110429771
案例解析 GBase8s 在工业互联网平台中的应用相关推荐
- 解析:GE工业互联网平台Predix
来源:赛迪智库 摘要:当前,工业互联网平台作为我国构建工业互联网生态的核心载体,成为推动制造业与互联网融合的重要抓手.早在2012年GE提出工业互联网的概念,随后推出Predix,要将GE在工业领域的 ...
- 2020年跨行业跨领域工业互联网平台
2020年跨行业跨领域工业互联网平台 来源:工信部 2020年12月,工信部信发司公示"2020年跨行业跨领域工业互联网平台".公示的双跨平台共15家,比2019年十大双跨平台增长 ...
- (摘要)新基建风口下,今年工业互联网平台将呈现十大新特征
目录 一是提升核心能力成为平台发展的主攻方向 二是垂直行业和产业集聚区应用爆发式增长 四是数据驱动的制造范式正在形成 五是平台加速推动大中小企业融通发展 六是平台 "双创"生态体系 ...
- 指令集参编的《工业互联网平台 数据管理通用要求》团体标准正式发布
为提高工业互联网平台的智能化服务水平.降低工业应用开发和使用成本.推动工业互联网平台在智能制造和工业软件研发等领域的落地应用.浙江省智能技术标准创新促进会提出并发布T/ZAITS 10101-2022 ...
- 全球工业互联网平台应用案例分析报告
近年来,全球工业经济形势发生深刻变化,能源和原材料价格持续攀升,资源环境约束进一步强化,工业竞争格局深度调整,以人工智能.物联网.云计算等新一代信息技术与工业融合为主的第四次工业革命悄然来袭,工业企业 ...
- GE失去Predix,阿里云推出飞象,中美工业互联网平台逆转时刻到了?
8月初,业界传出GE出售工业数字资产的消息,包括最为人熟悉的明星产品工业互联网平台Predix,即基于云服务的软件平台Predix.业界由此发出"世上从此再无GE Predix,工业互联网何 ...
- 工业云到工业互联网平台的发展之路|中机智库
工业互联网发展行动计划(2018-2020年)指出,在鼓励支持各省(区.市)和有条件的行业协会建设本区域.本行业的工业互联网平台基础上,分期分批遴选10 个左右跨行业跨领域平台,培育一批独立经营的企业 ...
- 阿里工业互联网平台“思考”:一场从0到1的蜕变
阿里云总裁胡晓明(花名孙权)曾在一场媒体采访中透露了自己的业务秘籍,说他永远记住两句话,一是"拓展商业边界",二是"商业驱动技术进步". 这句话不失为阿里的真实 ...
- 工业互联网平台发展概况
应用.技术.产业和商业等方面研究分析工业互联网平台发展脉络,为业界厂商.政府机构和投资者提供相关参考. (一)互联网平台驱动工业数字化转型 平台模式.平台经济正在变革传统工业形态,对于多个企业来说,( ...
最新文章
- [.net 面向对象编程基础] (18) 泛型
- 想学python有什么用-Python为什么这么火?学习python有什么用?
- 【Linux】10_存储管理EXT4文件系统详解
- VC++制作DLL具体解释
- linux 相关零碎知识整理
- python根据时间序列画折线图_Python:matplotlib 和 Seaborn 之折线图 (三十七)
- ubuntu linux 批量部署,使用Cobbler批量部署Linux和Windows:CentOS/Ubuntu批量安装(二)...
- jpa和hibernate_JPA和Hibernate级联类型的初学者指南
- 网关过滤器验证token
- matlab水蒸气焓值计算_焓变 反应热-化学选修4同步优质系列教案(人教版)
- 【vim新手心得】最常用快捷键、编辑器vim插件使用心得(VsVim、IdeaVim、Vimium)
- weAdmin(layuiAdmin)
- python数列的平方_python数组平方
- CET-6--2018.6--2
- KDD 2021 | 基于多智能体协同竞价博弈的电商搜索广告多目标竞价优化
- soundwire修改服务器,SoundWire Server,电脑声音实时同步到移动手机
- 计算机毕业设计Java东理咨询交流论坛(源码+系统+mysql数据库+lw文档)
- 2021Java面经:最便宜java培训机构
- 关于DOTS的个人总结
- LVDS,LCD调试总结(持续更新)
热门文章
- 一文读懂:私有云与公有云、混合云有什么区别?
- 2021肖秀荣强化班
- 【每日早报】2019/09/23
- Flink流式计算从入门到实战 四
- 产品OR运营,杉车网、懂车帝们成车企品牌营销必争点?
- 解压后java文字乱码_怎么解决java解压zip包出现乱码
- 什么是激励函数(Activation Functions)
- 英特尔NUC 11板载USB3.0座子接口定义
- 论文《Intelligent Computing: The Latest Advances, Challenges and Future》 思维导图-仅供参考
- MySQL 的数据库错误日志设置