一、单片机->Wifi

1整体框架图

2注意事项

1.重新上电=开始编程
复位=一直透传

2.将wifi同时连上USB转TTL,再连上电脑
用XCOM打开非单片机串口的串口,可以看到WiFi模块发出去的指令
用XCOM用单片机的串口,可以看到该串口的输出,即printf的内容

3遇到的坑

3.1给单片机编程的软件界面如下,一定要点“编程后执行”

3.2串口2一直发不出去数据给设备,可能是因为网口转串口模块被reset了

解决办法:

网口转串口模块连上USB转TTL,再连上电脑,打开上面的软件,将界面按照下面的来配置

二、单片机->4G

1花生壳的配置



2 XCOM的配置

二、WiFi->Logstash

1.配置WiFi,打开XCOM,用AT指令

1.1多条发送

AT          //测试模块是否能正常通讯,返回"ok"即为正常
AT+CWMODE=3         //配置成 AP+STA模式
AT+RST          //重启生效
AT+CWJAP="xuhui","11111111"         //连接电脑热点“xuhui”为热点的名称,"11111111"为密码

2.配置logstash目录下的itcast-wifi.conf

input {tcp {port => 5044mode => "server"ssl_enable => false}
}filter{mutate{split => ["message",","]   #按,进行split切割messageadd_field => {"name" => "%{[message][0]}"}add_field => {"age" => "%{[message][1]}"}}
}output {stdout { codec => rubydebug }
}

3.启动logstash

进入logstash-8.1.2主目录,cmd
写命令:

logstash -f itcast-wifi.conf

4.配置WiFi

4.1多条发送

AT+CIPSTART="TCP","192.168.137.1",5044          //本机IP,logstash端口号
AT+CIPMODE=1         //开启透传模式
AT+CIPSEND         //开始透传

4.2单条发送

xuhui,20

5.Logstash输出截图

三、Logstash->Kafka

1.启动Zookeeper

进入zookeeper的bin文件,cmd

zkServer

2.启动Kafka

2.1启动Kafka

进入Kafka主目录文件,cmd

.\bin\windows\kafka-server-start.bat .\config\server.properties

2.2创建一个Topic

进入Kafka的bin\windows文件,cmd

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafkatest0427

2.3查看创建的topic

kafka-topics.bat --list --bootstrap-server localhost:9092

2.4启动生产者 producer

直接在创建完Topic的命令行界面输入以下命令

kafka-console-producer.bat --broker-list localhost:9092 --topic kafkatest0427

2.5启动消费者customer

进入Kafka的bin\windows文件,cmd

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafkatest0427 --from-beginning

3.修改Logstash的SCM-wifi-logstash-kafka.conf

input {tcp {port => 5044mode => "server"ssl_enable => false}
}filter{mutate{split => ["message",","]   #按 , 进行split切割messageadd_field => {"name" => "%{[message][0]}"}add_field => {"age" => "%{[message][1]}"}}
}output {  kafka {bootstrap_servers => "192.168.137.1:9092" #kafka服务器地址topic_id => "kafkatest0427"batch_size => 5codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式}
}

4.启动Logstash

进入logstash-8.1.2主目录,cmd
写命令:

logstash -f SCM-wifi-logstash-kafka.conf

5.Wifi模块发送单条指令

xuhui,20

6.消费者界面截图

7.遇到的坑

7.1

 WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  1. 修改kafka中server.properties配置
  2. 同时,在kafka中创建topic 、生产者、消费者的时候,也要在指令中进行以下改动

7.2

ZooKeeper audit is disabled

在 zkServer.cmd 添加"-Dzookeeper.audit.enable=true",如下图所示

7.3

 ERROR Shutdown broker because all log dirs in D:\develop\Kafka\kafka_2.13-3.1.0\kafka-logs have failed (kafka.log.LogManager)

删除错误信息中的目录(D:\develop\Kafka\kafka_2.13-3.1.0\kafka-logs)下的所有文件,然后重启Kafka

使用maven官方仓库直接下载项目需要的jar包方法
1.首先我们得知道我们需要下载那个jar包,本文中我们以 mysql-connector-java.jar 包为例。

2.打开maven在线仓库,https://mvnrepository.com/

3.我们在搜索框输入jar包名:mysql-connector-java,如图能够找到对应的jar
4.然后我们点击上图标注位置(红色框)进入选择我们需要的jar版本

  1. 进入页面下载jar包,如下图(备注,有的jar不是点击jar进行,点击view All,大家可以自己试着去点)

java学习——IDEA导入jar包并调用
项目目录下新建 lib 目录,并将需要导入的 jar 包复制到该文件夹
如:(以导入 mysql驱动包为例)

将 lib 目录下的所有依赖导入到指定模块
打开 File -> Project Structure (Ctrl + Shift + Alt + S)-> 点击 加号 -> 选择我们创建的 lib 目录 -> 确认即可

然后选择需要该依赖的模块,点击 OK

此时我们打开该模块的依赖,可以看到 lib 下的依赖已经导入成功

导入成功后该jar包还可以打开查看源码 (▽)

如何导入 com.alibaba.fastjson.JSONObject
1: 先在百度搜索 : 从github上阿里 fastJson包
2: 点击下载 jar包 (注意maven是中央仓,可以在配置文件中配置自动下载)

注:记得下载.jar,不要下载javadoc.jar和sources.jar

四、写入服务

1.用java(Maven)写一个工程,从Kafka中获取数据,并发到ES

1.1参考链接

https://www.cnblogs.com/zling/p/10450259.html

1.2自己的代码

class KafkaPoolData

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.time.Duration;
import java.util.*;public class KafkaPoolData {private KafkaConsumer<String, String> consumer;private String topic;public KafkaPoolData(String topic) {Properties consumerConfig = new Properties();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//连接KafkaconsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "xh-source-group1"); //消费者组,保证要跟其他的不一样consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(读取消息队列中最新的消息)或者earliest(从最老的消息开始消费)/**kafka根据key值确定消息发往哪个分区(如果分区被指定则发往指定的分区),具有相同key的消息被发往同一个分区,如果key为NONE则随机选择分区,可以使用key_serializer参数序列化为字节类型value为要发送的消息值,必须为bytes类型,如果这个值为空,则必须有对应的key值,并且空值被标记为删除。可以通过配置value_serializer参数序列化为字节类型*///设置值反序列化器consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置key反序列化器consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  //用来做反序列化的,也就是将字节数组转换成对象//创建一个消费者consumer = new KafkaConsumer<>(consumerConfig);this.topic = topic;}public void testConsumer(long offset,int number) throws IOException {//与subscirbe方法不同,assign方法由用户直接手动consumer实例消费哪些具体分区,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。assign的方法不能和subscribe方法同时使用。TopicPartition partition = new TopicPartition(topic, 0);//i是指定分区partitionconsumer.assign(Arrays.asList(partition));consumer.seek(partition, offset);// 指定从这个topic和partition的哪个位置获取//consumer.close(); //主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期int count = 0;System.out.println("\nConsumer begin!\n");System.out.println("\nConsumer data begin!\n");ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //拉取消费记录System.out.println("开始读取消息");System.out.println("---------------"+records.count());for (ConsumerRecord<String, String> record : records) {//获取字符串数据String text = record.value();//输出字符串数据System.out.println(text);//生成随机idUUID uuid = UUID.randomUUID();String id = uuid.toString();//将text id输出到ESPutDataToEs02.PutToEs(text,id);count++;//count大于拉取数据时,停止拉取数据if (count > number) {break;}}//System.out.println("\nParse data Finished! count=" + count + "\n");System.out.println("\nConsumer Finished!\n");}public static void main(String[] args) throws IOException {//初始化ESPutDataToEs02.initClient();//Write2Es.initClient();//topic名称String topic = "kafkatest0427";//String topic = args[0];long offset =80; //从哪里开始// long offset = Long.parseLong(args[0]);int number=175;  //需要拉取多少数据// int number = Integer.parseInt(args[1]);KafkaPoolData kafkaconsumer = new KafkaPoolData(topic);kafkaconsumer.testConsumer(offset,number);PutDataToEs02.closeClient();//Write2Es.closeClient();}
}

class PutDataToEs02

import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;import org.elasticsearch.rest.RestStatus;import java.io.IOException;public class PutDataToEs02 {private static RestHighLevelClient client = null;public static void initClient() {//连接ESclient = new RestHighLevelClient(RestClient.builder(new HttpHost("192.168.137.1", 9200, "http")));//ES的本地端口号System.out.println(client);}//关闭服务public static void closeClient() throws IOException {client.close();}public static void PutToEs(String text, String id) throws IOException {//将字符串text转成JSONJSONObject jsonObject = JSONObject.parseObject(text);String message = "";String timestamp = "";//提取text里面的messageif (jsonObject.containsKey("message")) {message = jsonObject.getString("message");//System.out.println(message1);}//提取text里面的timestampif (jsonObject.containsKey("@timestamp")) {timestamp = jsonObject.getString("@timestamp");//System.out.println(timestamp);}//1.创建索引请求IndexRequest request = new IndexRequest("kafkatoes0429","data",id);//2.用XContentBuilder来构建文档XContentBuilder builder = XContentFactory.jsonBuilder();builder.startObject();{builder.field("timestamp", timestamp);builder.field("message", message);}builder.endObject();request.source(builder);//3.发送请求//3.1 同步方式发送请求IndexResponse indexResponse = null;try {indexResponse = client.index(request, RequestOptions.DEFAULT);} catch(ElasticsearchException e) {//捕获,并处理异常//判断是否版本冲突、create但文档已存在冲突if (e.status() == RestStatus.CONFLICT) {System.out.println("版本冲突\n" + e.getDetailedMessage());}System.out.println("索引异常");}//4.处理响应if(indexResponse != null) {String index = indexResponse.getIndex();String type = indexResponse.getType();String id1 = indexResponse.getId();//long version = indexResponse.getVersion();if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {System.out.println("新增文档成功\n" + "index = " + index + " type = " + type + " id = " + id1);} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {System.out.println("修改文档成功\n" + "index = " + index + " type = " + type + " id = " + id1);}// 分片处理信息/*ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();if (shardInfo.getTotal() != shardInfo.getSuccessful()) {}// 如果有分片副本失败,可以获得失败原因信息if (shardInfo.getFailed() > 0) {for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {String reason = failure.reason();System.out.println("副本失败原因:" + reason);}}*/}}
}

1.3java控制台的截图

2.输出到Elasticsearch的截图

五、grafana

1.启动grafana

进入bin目录,通过执行grafana-server.exe启动Grafana

2.登录grafana

打开浏览器并转到http://localhost:3000/,然后登录即可,默认用户和密码均为admin

单片机- >网络模块(WIFI or 4G)->logstash->kafka->写入服务->es->grafana相关推荐

  1. 15单片机通过WIFI模块ESP8266实现手机远程监控可燃气体浓度

    15单片机通过WIFI模块ESP8266实现手机远程监控可燃气体浓度 版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 一,系统方案 1.方案描 ...

  2. C51单片机连接wifi模块,发送AT指令

    一.AT指令 AT 指令集是从终端设备( Terminal Equipment , TE) 或 数据终端设备 ( Data Terminal Equipment , DTE) 向终端适配器 (Term ...

  3. 【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图

    storm+kafka+logstash+springBoot+高德地图 项目概述: 作用:交通信息化,智慧城市 需求:实时统计人流量并通过热力图展示. 类似于腾讯热力图的景区人流量统计 如何采集某个 ...

  4. 苹果wifi网速慢怎么办_所以,WiFi和4G到底哪个更耗电?

    来源 | 中科院物理所(ID:cas-iop) 编辑 | 椒盐猫巨烦 现代人行走江湖,必备三件法宝:手机,网络,充电宝. 即便在4G基站遍布各个旮旮角角的今天,当你带着心仪的人儿走进一家咖啡店,第一件 ...

  5. plc单片机组态软件php_STC8单片机扩展WiFi通信实现功能与组态王组态软件详细组态...

    STC8单片机扩展WiFi通信模块和组态王组态软件实现Modbus TCP以太网通信 一.通信实现功能: 1.Modbus TCP Client:组态王组态软件 2.Modbus TCP Server ...

  6. Flinksql读取Kafka写入Iceberg 实践亲测

    Flink sql实时读取Kafka写入Iceberg 实践亲测 前言 本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceber ...

  7. 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError

    1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...

  8. 【Spark】Spark 2.4 Stream 读取kafka 写入kafka

    1.概述 昨天一网友写了一个spark程序 读取kafka写入kafka,结果数据就是无法写入,然后交给我看看,这个程序是spark stream ,这个东东我都没玩过,我用过spark struct ...

  9. 最简单DIY基于STM32单片机的WIFI智能小车设计方案

    STM32库函数开发系列文章目录 第一篇:STM32F103ZET6单片机双串口互发程序设计与实现 第二篇:最简单DIY基于STM32单片机的蓝牙智能小车设计方案 第三篇:最简单DIY基于STM32F ...

  10. Swift获取当前网络状态Wifi/5G/4G/3G/2G

    通过Swift获取当前网络状态 通过第三方库Alamofire获取网络状态只能获取到ethernetOrWiFi.cellular.notReachable.unknown这几种网络状态,不能准确的获 ...

最新文章

  1. T2821 天使之城 codevs
  2. 【概念原理】四种SQL事务隔离级别和事务ACID特性
  3. 【Python】Python语言学习:pip工具使用知识,模型保存pickle,PDF与docx相互转换处理...
  4. jQuery对Table一个字段排序
  5. Servlet使用适配器模式进行增删改查案例(IDeptService.java)
  6. python寻找多数元素_寻找多数元素
  7. 【Python】Jupyter Notebook 配置路径
  8. 动态规划——乘积最大子数组(Leetcode 152)
  9. 三星调侃iPhone13苍岭绿配色:受宠若惊
  10. ajax control toolkit vs2013,VS2008 .net framework 3.5使用Ajax Control Toolkit完整解决方案
  11. html5触摸界面设计与开发_原生APP的开发步骤主要分为哪些?
  12. 朝鲜欲对韩国发起大规模网络攻击 但计划被韩方挫败
  13. 宏碁笔记本linux,Acer宏碁(Acer宏碁)Acer 4752G-2332G50Mnkk Linux笔记本电脑整体评测-ZOL中关村在线...
  14. 基本类型,指针,双指针作为函数参数
  15. 使用OpenCore引导黑苹果
  16. Redis系列——Redis实战
  17. openwrt - transmission
  18. 问答题库(路由与交换){简答版}
  19. 【整理】训练序列与导频序列的概念辨析
  20. “第二课堂”开课啦~

热门文章

  1. 如何快速转载网页博客
  2. rc3ctf 逆向logmein writeup
  3. python-破译密码
  4. 用python爬虫写一个属于自己的彩虹屁生成器!
  5. 名字作诗,增添你的印象分
  6. 2021-06-05按键精灵实现远程获取消息内容
  7. wlan、wifi、蜂窝、无线局域网的区别
  8. c语言牛顿法求整数平方根,牛顿法求平方根-编程练习
  9. C语言工程网络图,三分钟教你学会 双代号网络图的绘制
  10. 国外硕博论文下载网址