这玩意能干什么?我只能说,这是一个物联网的方案,能通过java客户端监听来自单片机发送的消息,单片机有什么消息?常见的有:比如持久性的传感器数据上报,这你得1s上传一次吧,还有一些控制设备的信息,比如灯,电机之类一次操作持续运转的东西。

写在前面:

曾经用过的一种需要接入Internet的物联网方案(这是目前的主流):

我之前就受益与免费的云平台(云服务器),比如我以前博客里介绍过了的巴法云平台,还有我以前用过的小熊派华为云平台,它们的好处显而易见,就是不需要你去搭建服务器,不需要去了解数据传输的各种细节,你只需要用你的单片机去连上云就行了,用它们的API接口(特别是巴法云平台的接口真的适合小白实现前端和设备的控制),面向接口编程确实给人很舒服。缺点就是你必须得有网络吧,没网络你就连不上云。

本次主角:

一种局域网内物联网方案(非主流,但实用)

优点:不需要接入外网,除了这个优点没什么优点。

步骤:

首先,你得有个MQTT的服务器吧,怎么办,用EMQX搭建一个局域网的MQTT服务器。

然后,用MQTTX或paho或是它本身自带的web测试接口测试是否能正常订阅和发布消息以及能否正常收到消息。

然后,你得有单片机吧,单片机得支持MQTT协议吧,推荐用ESP32(ESP8266),基于开源的arduino,你能在太极创客的文档上轻松学会MQTT的相关知识。然后你得把单片机的数据格式转换成通用的JSON格式去发。

附上太极创客的相关文档链接地址:

零基础入门学用物联网 – MQTT基础篇 – 目录 – 太极创客 (taichi-maker.com)

然后,也就是本次的主题了,用JAVA建立一个客户端,由于要用MQTT,所以java是一个springboot的项目,用其去监听EMQX建立的MQTT服务器的报文(也就是对于主题所发布个各种消息),客户端不局限于用JAVA语言去写,具体可参考EMQX的文档,报文采用JSON格式,为什么要用JSON格式,因为方便用JSON格式反过来创建对象然后将对象的属性持久化到数据库。

MQTT Java 客户端库 | EMQX 文档

你可以用这文档里的相关代码,然后稍微修改,就以我的为参考(已经设置了断开服务器后客户端自动重连)

连接Mysql(主要是拿到数据库的操作对象connection)

package emqx.demo.Console;import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
import java.util.Properties;public class JDBC {static String driverClass=null;static String url=null;static String name=null;static String password=null;static {try {//读取配置文件的信息Properties properties= new Properties();InputStream is=new FileInputStream("MyInfo.properties");//放在工程文件下使用//InputStream is=JDBCUtil.class.getClassLoader().getResourceAsStream("jdbc.properties");//放在src下使用//导入输入流properties.load(is);//读取属性driverClass=properties.getProperty("driverClass");url=properties.getProperty("url");name=properties.getProperty("name");password=properties.getProperty("password");System.out.println(driverClass);} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}/*** 获取连接对象* @return* */public static Connection getConn() {Connection conn=null;try {Class.forName(driverClass);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();}try {conn = DriverManager.getConnection(url,name,password);} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}return conn;}/**  释放资源****/public static void release(Connection conn,Statement st,ResultSet rs) {closeRs(rs);closeSt(st);closeConn(conn);}public static void release(Connection conn,Statement st) {closeSt(st);closeConn(conn);}private static void closeRs(ResultSet rs) {try {if(rs!=null)rs.close();} catch (SQLException e) {e.printStackTrace();}finally {rs=null;}}private static void closeSt(Statement st) {try {if(st!=null)st.close();} catch (SQLException e) {e.printStackTrace();}finally {st=null;}}private static void closeConn(Connection conn) {try {if(conn!=null)conn.close();} catch (SQLException e) {e.printStackTrace();}finally {conn=null;}}}

连接EMQX的MQTT服务器

package emqx.demo.Console;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class ConnectServer {String subTopic ="ESP32/DEVICE";//订阅主题int qos =1;//消息服务等级String broker ="tcp://127.0.0.1:1883";//emqx搭建的mqtt服务器的地址String clientId  ="JavaClient";//这个客户端的名字在emqx上显示的String content="Java Client is online";//发布消息String pubtopic="esp32";//发布主题public ConnectServer() {MemoryPersistence persistence = new MemoryPersistence();//保存形式以内容保存try {MqttClient client = new MqttClient(broker, clientId, persistence);//设置消息对象MqttMessage message = new MqttMessage(content.getBytes());//待发送的信息message.setQos(qos);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_java");connOpts.setPassword("random".toCharArray());// 保留会话connOpts.setCleanSession(false);//设置超时时间connOpts.setConnectionTimeout(10);/*设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/connOpts.setKeepAliveInterval(20);// 设置回调client.setCallback(new MyMqttCallback());// 建立连接while (!client.isConnected()) {System.out.println(" client is Connecting to broker: " + broker);client.connect(connOpts);}System.out.println("连接成功");// 订阅client.subscribe(subTopic,qos  );System.out.println("订阅主题"+subTopic);// 发布上线消息client.publish(pubtopic,message);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}

编写回调函数类(这个根据自己的需求写就好了)

回调函数类必须实现MqttCallback接口

package emqx.demo.Console;import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;import java.sql.SQLException;
import java.sql.Statement;/*
* MQTT回调
* 收消息持久化到mysql
*
*
* */
public class MyMqttCallback  implements MqttCallback {public  static    Statement stmt= null;ESP32 myesp32;String mystring;public  MyMqttCallback()  {try {stmt= JDBC.getConn().createStatement();}catch (Exception e){System.out.println("无法创建statement实例化对象");}}@Overridepublic void connectionLost(Throwable cause) {System.out.println("连接断开,可以做重连");new ConnectServer();//每次连接断开就又创建一个连接对象}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收来自主题:" + topic+"的信息内容:");mystring=new String(message.getPayload());System.out.println(mystring);myesp32= JSON.parseObject(  mystring,ESP32.class);System.out.println(myesp32.toString());System.out.println("--------------------------------");/*** 更新瞬间性操作传感器比如灯,电机的状态* */if(myesp32.command==1){try {MysqlDao.updata(myesp32.name,myesp32.state,stmt);System.out.println("更新传感器状态");}catch (Exception e){System.out.println("更新数据失败");}}/** 插入传感器持久性上传的数据** */else if(myesp32.command==2) {try {MysqlDao.insert(myesp32.name, myesp32.data, stmt);System.out.println("插入新数据");}catch ( Exception e){System.out.println("插入新数据失败");}}mystring="";}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}}

以及编写我的Mydao类,用于数据持久化到数据库

package emqx.demo.Console;import com.sun.source.tree.LineMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import sun.nio.cs.FastCharsetProvider;import java.security.Key;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Repository
public class MysqlDao  {/*** 供接口拿数据的方法** *///获取某传感器的全部数据public   static  Map   FindAll(Statement statement, String name, String tablename) throws SQLException {ResultSet rs=statement.executeQuery( "SELECT id, name, data FROM "+tablename+"  where  name ='"+name+"'");Map map=new HashMap();List list=new ArrayList();map.put(name,list);while (rs.next()) {// 通过字段检索list.add(rs.getString("data")) ;}map.put(name,list);return  map;}//获取传感器对应的最新数据public    static  String find(String Sensor_name,String key,Statement statement) throws SQLException {String sql=  "SELECT    "+  key+"   FROM devices.sensor   WHERE  id=(SELECT MAX(id) FROM sensor GROUP BY `name` HAVING `name`="+"\""+Sensor_name+"\")";System.out.println(sql);ResultSet rs=statement.executeQuery(sql);System.out.println(rs.toString());rs.next();System.out.println(rs.getString(key));return rs.getString(key);}public static String   GetState(String name ,Statement statement){String sql= "SELECT state  FROM action WHERE `sname` ="+"'"+name+"'";System.out.println(sql);try {ResultSet rs=statement.executeQuery(sql);rs.next();return rs.getString("state");} catch (SQLException e) {e.printStackTrace();return "error";}}/**** 监听单片机发出的数据** *///在数据库的表中插入数据public  static  boolean insert(String name,String dat,Statement sta) throws SQLException {String sql="insert  into  devices.sensor(name,data)  VALUES ('"+name+"" +"','"+dat+"')";System.out.println(sql);if (sta.execute(sql))return true;return false;}//更新持续性外设如灯,电机的状态一次改变永久使用public static  void  updata(String dev,String state,Statement statement) throws SQLException {String sql="UPDATE devices.action SET state='"+state+"'WHERE sname='"+dev+"'";System.out.println(sql);statement.executeUpdate(sql);}}

以上就做完了,Mqtt服务器的消息就JAVA客户端监听并被被持久化报存到了数据库中。

最后就是用JAVA的JDBC写数据库相关的API接口供安卓端,微信小程序端去调用,让它们拿数据放前端界面了就ok了。可以的话,可用JAVA的第三方框架Mybatis去简化API接口的编写。(主要简化的是sql语句)

局域网内用JAVA建立MQTT客户端监听MQTT服务器消息并持久化到数据库相关推荐

  1. java本地监听zk服务器节点【动态上下线】

    [README] java本地访问 zk cluster, refer 2 https://blog.csdn.net/PacosonSWJTU/article/details/111404364 [ ...

  2. 【java】画图和监听事件的应用

    [java]画图和监听事件的应用 (1)frame.getContentPane().add(new Change()); ***用getContentPane()方法获得JFrame的内容面板,再对 ...

  3. Java Swing 键盘事件监听

    Java Swing 键盘事件监听 开发工具与关键技术:java. elipse2019.jdk1.8 作者:Amewin 撰写时间:2019年9月16日 键盘事件的事件源一般丐组件相关,当一个组件处 ...

  4. Oracle客户端监听服务配置方法

    Oracle客户端监听服务配置方法 可直接修改tnsnames.oRA文件 Oracle客户端中一般有两个tnsnames.oRA文件,配置监听要修改的是Oracle客户端家目下的$ORACLE_HO ...

  5. 【解决】uniapp,无法在onLoad及onUnload生命周期内触发激光扫码的监听事件,可在created及beforeDestroy生命周期内触发

    [问题] uniapp开发App,发现无法在onLoad及onUnload生命周期内触发及销毁子组件内的激光扫码的监听事件. 子组件内示例代码如下: <script> export def ...

  6. java实现全局键盘监听

    java实现全局键盘监听 Java本身是无法对桌面进行全局键盘监听的,无法设置全局快捷键,当焦点从java程序面板失去时,自带的监听器就无法监听了,但是比如一些用java写的截图程序是需要全局快捷键操 ...

  7. Java 实现日志文件监听并读取相关数据

    Java 实现日志文件监听并读取相关数据 项目需求 由于所在数据中台项目组需要实现监听文件夹或者日志文件并读取对应格式的脏数据的需求,以便在文件.文件夹发生变化时进行相应的业务流程:所以在这里记录下相 ...

  8. android 按键消息,Android监听Home按键消息

    Android对屏幕下方常用的四个按键消息处理是不一致的: 搜索按键的消息在onKeyDown或者onKeyUp中接收: 菜单按键的消息在onCreateOptionsMenu.onKeyDown或o ...

  9. oracle数据库监听问题,分享一个有意思的Oracle19c数据库监听异常

    概述 今天主要分享一个最近排查的监听问题,还是有点意思的,一起来看看吧~ 环境:oracle19c 单实例 用plsql连接提示,这里排除防火墙.账号密码问题,连接字符串按监听文件格式写 1. 测试监 ...

  10. java获取局域网内主机,java怎么获取局域网内所有主机ip

    java怎么获取局域网内所有主机ip 关注:53  答案:2  mip版 解决时间 2021-01-27 21:31 提问者夢岭杺 2021-01-27 12:02 java怎么获取局域网内所有主机i ...

最新文章

  1. matlab做交互作用图,MatlabMatlab工程应用案例精要.ppt
  2. 1—YOLO2:环境搭建
  3. 计算机视觉与深度学习 | 基于多源传感器数据融合的动态场景SLAM研究
  4. ijkplayer 消息循环处理过程分析
  5. Windows平台下go编译器LiteIDE的安装和使用
  6. leaflet-webpack 入门开发系列三地图分屏对比(附源码下载)
  7. PHP ERROR : Call to undefined function curl_init()
  8. 新品Demo —— ZStack Mini 超融合一体机
  9. 易语言编写影视大全的整体思路及ACF浏览器和cheni纯组件列表灵活运用
  10. 姓名国别分类代码:PyTorch深度学习实践 - Lecture_13_RNN Classifier
  11. Mac大小写切换需长按caps lock键解决办法
  12. chrome清楚缓存并硬性重新加载
  13. 【专题5:硬件设计】 之 【50.运算放大器详解b - 负反馈电路、正反馈电路和共模干扰】
  14. 职场篇(一):明哥的职场礼仪七堂课笔记
  15. MyEclipse连接oracle数据库及代码
  16. LED电子时钟显示屏(NTP时间同步服务器)是如何完成授时服务的?
  17. Application的启动流程
  18. RIDE 访问数据库
  19. Linux命令大全总结(看这一篇就够了)
  20. HNUST OJ 2295 嘉嘉的队伍配置

热门文章

  1. USB_PD_R3_0 V1.1 PD协议 协议层 中文翻译
  2. 关于电脑使用的实用技巧
  3. JavaScript中的事件
  4. 计算机控制系统机器人,机器人的控制系统
  5. 很多次游戏的最后取胜实际上都有很强的偶然性
  6. 和刘备相关的人(八 )
  7. opengles加载obj格式3D模型含光照和纹理
  8. 人民币转换美金的c语言代码大全,人民币和美元大写格式在线工具,美元美金数字金额转换大写,外币大写金额...
  9. 记一次学习爬取豆瓣数据于Excel表的爬虫
  10. 人睡眠时做恶梦以及梦魇或鬼压身的原因