单片机- >网络模块(WIFI or 4G)->logstash->kafka->写入服务->es->grafana
一、单片机->Wifi
1整体框架图
2注意事项
1.重新上电=开始编程
复位=一直透传
2.将wifi同时连上USB转TTL,再连上电脑
用XCOM打开非单片机串口的串口,可以看到WiFi模块发出去的指令
用XCOM用单片机的串口,可以看到该串口的输出,即printf的内容
3遇到的坑
3.1给单片机编程的软件界面如下,一定要点“编程后执行”
3.2串口2一直发不出去数据给设备,可能是因为网口转串口模块被reset了
解决办法:
网口转串口模块连上USB转TTL,再连上电脑,打开上面的软件,将界面按照下面的来配置
二、单片机->4G
1花生壳的配置
2 XCOM的配置
二、WiFi->Logstash
1.配置WiFi,打开XCOM,用AT指令
1.1多条发送
AT //测试模块是否能正常通讯,返回"ok"即为正常
AT+CWMODE=3 //配置成 AP+STA模式
AT+RST //重启生效
AT+CWJAP="xuhui","11111111" //连接电脑热点“xuhui”为热点的名称,"11111111"为密码
2.配置logstash目录下的itcast-wifi.conf
input {tcp {port => 5044mode => "server"ssl_enable => false}
}filter{mutate{split => ["message",","] #按,进行split切割messageadd_field => {"name" => "%{[message][0]}"}add_field => {"age" => "%{[message][1]}"}}
}output {stdout { codec => rubydebug }
}
3.启动logstash
进入logstash-8.1.2主目录,cmd
写命令:
logstash -f itcast-wifi.conf
4.配置WiFi
4.1多条发送
AT+CIPSTART="TCP","192.168.137.1",5044 //本机IP,logstash端口号
AT+CIPMODE=1 //开启透传模式
AT+CIPSEND //开始透传
4.2单条发送
xuhui,20
5.Logstash输出截图
三、Logstash->Kafka
1.启动Zookeeper
进入zookeeper的bin文件,cmd
zkServer
2.启动Kafka
2.1启动Kafka
进入Kafka主目录文件,cmd
.\bin\windows\kafka-server-start.bat .\config\server.properties
2.2创建一个Topic
进入Kafka的bin\windows文件,cmd
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafkatest0427
2.3查看创建的topic
kafka-topics.bat --list --bootstrap-server localhost:9092
2.4启动生产者 producer
直接在创建完Topic的命令行界面输入以下命令
kafka-console-producer.bat --broker-list localhost:9092 --topic kafkatest0427
2.5启动消费者customer
进入Kafka的bin\windows文件,cmd
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafkatest0427 --from-beginning
3.修改Logstash的SCM-wifi-logstash-kafka.conf
input {tcp {port => 5044mode => "server"ssl_enable => false}
}filter{mutate{split => ["message",","] #按 , 进行split切割messageadd_field => {"name" => "%{[message][0]}"}add_field => {"age" => "%{[message][1]}"}}
}output { kafka {bootstrap_servers => "192.168.137.1:9092" #kafka服务器地址topic_id => "kafkatest0427"batch_size => 5codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式}
}
4.启动Logstash
进入logstash-8.1.2主目录,cmd
写命令:
logstash -f SCM-wifi-logstash-kafka.conf
5.Wifi模块发送单条指令
xuhui,20
6.消费者界面截图
7.遇到的坑
7.1
WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
- 修改kafka中server.properties配置
- 同时,在kafka中创建topic 、生产者、消费者的时候,也要在指令中进行以下改动
7.2
ZooKeeper audit is disabled
在 zkServer.cmd 添加"-Dzookeeper.audit.enable=true",如下图所示
7.3
ERROR Shutdown broker because all log dirs in D:\develop\Kafka\kafka_2.13-3.1.0\kafka-logs have failed (kafka.log.LogManager)
删除错误信息中的目录(D:\develop\Kafka\kafka_2.13-3.1.0\kafka-logs)下的所有文件,然后重启Kafka
使用maven官方仓库直接下载项目需要的jar包方法
1.首先我们得知道我们需要下载那个jar包,本文中我们以 mysql-connector-java.jar 包为例。
2.打开maven在线仓库,https://mvnrepository.com/
3.我们在搜索框输入jar包名:mysql-connector-java,如图能够找到对应的jar
4.然后我们点击上图标注位置(红色框)进入选择我们需要的jar版本
- 进入页面下载jar包,如下图(备注,有的jar不是点击jar进行,点击view All,大家可以自己试着去点)
java学习——IDEA导入jar包并调用
项目目录下新建 lib 目录,并将需要导入的 jar 包复制到该文件夹
如:(以导入 mysql驱动包为例)
将 lib 目录下的所有依赖导入到指定模块
打开 File -> Project Structure (Ctrl + Shift + Alt + S)-> 点击 加号 -> 选择我们创建的 lib 目录 -> 确认即可
然后选择需要该依赖的模块,点击 OK
此时我们打开该模块的依赖,可以看到 lib 下的依赖已经导入成功
导入成功后该jar包还可以打开查看源码 (▽)
如何导入 com.alibaba.fastjson.JSONObject
1: 先在百度搜索 : 从github上阿里 fastJson包
2: 点击下载 jar包 (注意maven是中央仓,可以在配置文件中配置自动下载)
注:记得下载.jar,不要下载javadoc.jar和sources.jar
四、写入服务
1.用java(Maven)写一个工程,从Kafka中获取数据,并发到ES
1.1参考链接
https://www.cnblogs.com/zling/p/10450259.html
1.2自己的代码
class KafkaPoolData
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.time.Duration;
import java.util.*;public class KafkaPoolData {private KafkaConsumer<String, String> consumer;private String topic;public KafkaPoolData(String topic) {Properties consumerConfig = new Properties();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//连接KafkaconsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "xh-source-group1"); //消费者组,保证要跟其他的不一样consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(读取消息队列中最新的消息)或者earliest(从最老的消息开始消费)/**kafka根据key值确定消息发往哪个分区(如果分区被指定则发往指定的分区),具有相同key的消息被发往同一个分区,如果key为NONE则随机选择分区,可以使用key_serializer参数序列化为字节类型value为要发送的消息值,必须为bytes类型,如果这个值为空,则必须有对应的key值,并且空值被标记为删除。可以通过配置value_serializer参数序列化为字节类型*///设置值反序列化器consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置key反序列化器consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //用来做反序列化的,也就是将字节数组转换成对象//创建一个消费者consumer = new KafkaConsumer<>(consumerConfig);this.topic = topic;}public void testConsumer(long offset,int number) throws IOException {//与subscirbe方法不同,assign方法由用户直接手动consumer实例消费哪些具体分区,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。assign的方法不能和subscribe方法同时使用。TopicPartition partition = new TopicPartition(topic, 0);//i是指定分区partitionconsumer.assign(Arrays.asList(partition));consumer.seek(partition, offset);// 指定从这个topic和partition的哪个位置获取//consumer.close(); //主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期int count = 0;System.out.println("\nConsumer begin!\n");System.out.println("\nConsumer data begin!\n");ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //拉取消费记录System.out.println("开始读取消息");System.out.println("---------------"+records.count());for (ConsumerRecord<String, String> record : records) {//获取字符串数据String text = record.value();//输出字符串数据System.out.println(text);//生成随机idUUID uuid = UUID.randomUUID();String id = uuid.toString();//将text id输出到ESPutDataToEs02.PutToEs(text,id);count++;//count大于拉取数据时,停止拉取数据if (count > number) {break;}}//System.out.println("\nParse data Finished! count=" + count + "\n");System.out.println("\nConsumer Finished!\n");}public static void main(String[] args) throws IOException {//初始化ESPutDataToEs02.initClient();//Write2Es.initClient();//topic名称String topic = "kafkatest0427";//String topic = args[0];long offset =80; //从哪里开始// long offset = Long.parseLong(args[0]);int number=175; //需要拉取多少数据// int number = Integer.parseInt(args[1]);KafkaPoolData kafkaconsumer = new KafkaPoolData(topic);kafkaconsumer.testConsumer(offset,number);PutDataToEs02.closeClient();//Write2Es.closeClient();}
}
class PutDataToEs02
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;import org.elasticsearch.rest.RestStatus;import java.io.IOException;public class PutDataToEs02 {private static RestHighLevelClient client = null;public static void initClient() {//连接ESclient = new RestHighLevelClient(RestClient.builder(new HttpHost("192.168.137.1", 9200, "http")));//ES的本地端口号System.out.println(client);}//关闭服务public static void closeClient() throws IOException {client.close();}public static void PutToEs(String text, String id) throws IOException {//将字符串text转成JSONJSONObject jsonObject = JSONObject.parseObject(text);String message = "";String timestamp = "";//提取text里面的messageif (jsonObject.containsKey("message")) {message = jsonObject.getString("message");//System.out.println(message1);}//提取text里面的timestampif (jsonObject.containsKey("@timestamp")) {timestamp = jsonObject.getString("@timestamp");//System.out.println(timestamp);}//1.创建索引请求IndexRequest request = new IndexRequest("kafkatoes0429","data",id);//2.用XContentBuilder来构建文档XContentBuilder builder = XContentFactory.jsonBuilder();builder.startObject();{builder.field("timestamp", timestamp);builder.field("message", message);}builder.endObject();request.source(builder);//3.发送请求//3.1 同步方式发送请求IndexResponse indexResponse = null;try {indexResponse = client.index(request, RequestOptions.DEFAULT);} catch(ElasticsearchException e) {//捕获,并处理异常//判断是否版本冲突、create但文档已存在冲突if (e.status() == RestStatus.CONFLICT) {System.out.println("版本冲突\n" + e.getDetailedMessage());}System.out.println("索引异常");}//4.处理响应if(indexResponse != null) {String index = indexResponse.getIndex();String type = indexResponse.getType();String id1 = indexResponse.getId();//long version = indexResponse.getVersion();if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {System.out.println("新增文档成功\n" + "index = " + index + " type = " + type + " id = " + id1);} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {System.out.println("修改文档成功\n" + "index = " + index + " type = " + type + " id = " + id1);}// 分片处理信息/*ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();if (shardInfo.getTotal() != shardInfo.getSuccessful()) {}// 如果有分片副本失败,可以获得失败原因信息if (shardInfo.getFailed() > 0) {for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {String reason = failure.reason();System.out.println("副本失败原因:" + reason);}}*/}}
}
1.3java控制台的截图
2.输出到Elasticsearch的截图
五、grafana
1.启动grafana
进入bin目录,通过执行grafana-server.exe启动Grafana
2.登录grafana
打开浏览器并转到http://localhost:3000/,然后登录即可,默认用户和密码均为admin
单片机- >网络模块(WIFI or 4G)->logstash->kafka->写入服务->es->grafana相关推荐
- 15单片机通过WIFI模块ESP8266实现手机远程监控可燃气体浓度
15单片机通过WIFI模块ESP8266实现手机远程监控可燃气体浓度 版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 一,系统方案 1.方案描 ...
- C51单片机连接wifi模块,发送AT指令
一.AT指令 AT 指令集是从终端设备( Terminal Equipment , TE) 或 数据终端设备 ( Data Terminal Equipment , DTE) 向终端适配器 (Term ...
- 【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图
storm+kafka+logstash+springBoot+高德地图 项目概述: 作用:交通信息化,智慧城市 需求:实时统计人流量并通过热力图展示. 类似于腾讯热力图的景区人流量统计 如何采集某个 ...
- 苹果wifi网速慢怎么办_所以,WiFi和4G到底哪个更耗电?
来源 | 中科院物理所(ID:cas-iop) 编辑 | 椒盐猫巨烦 现代人行走江湖,必备三件法宝:手机,网络,充电宝. 即便在4G基站遍布各个旮旮角角的今天,当你带着心仪的人儿走进一家咖啡店,第一件 ...
- plc单片机组态软件php_STC8单片机扩展WiFi通信实现功能与组态王组态软件详细组态...
STC8单片机扩展WiFi通信模块和组态王组态软件实现Modbus TCP以太网通信 一.通信实现功能: 1.Modbus TCP Client:组态王组态软件 2.Modbus TCP Server ...
- Flinksql读取Kafka写入Iceberg 实践亲测
Flink sql实时读取Kafka写入Iceberg 实践亲测 前言 本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceber ...
- 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError
1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...
- 【Spark】Spark 2.4 Stream 读取kafka 写入kafka
1.概述 昨天一网友写了一个spark程序 读取kafka写入kafka,结果数据就是无法写入,然后交给我看看,这个程序是spark stream ,这个东东我都没玩过,我用过spark struct ...
- 最简单DIY基于STM32单片机的WIFI智能小车设计方案
STM32库函数开发系列文章目录 第一篇:STM32F103ZET6单片机双串口互发程序设计与实现 第二篇:最简单DIY基于STM32单片机的蓝牙智能小车设计方案 第三篇:最简单DIY基于STM32F ...
- Swift获取当前网络状态Wifi/5G/4G/3G/2G
通过Swift获取当前网络状态 通过第三方库Alamofire获取网络状态只能获取到ethernetOrWiFi.cellular.notReachable.unknown这几种网络状态,不能准确的获 ...
最新文章
- T2821 天使之城 codevs
- 【概念原理】四种SQL事务隔离级别和事务ACID特性
- 【Python】Python语言学习:pip工具使用知识,模型保存pickle,PDF与docx相互转换处理...
- jQuery对Table一个字段排序
- Servlet使用适配器模式进行增删改查案例(IDeptService.java)
- python寻找多数元素_寻找多数元素
- 【Python】Jupyter Notebook 配置路径
- 动态规划——乘积最大子数组(Leetcode 152)
- 三星调侃iPhone13苍岭绿配色:受宠若惊
- ajax control toolkit vs2013,VS2008 .net framework 3.5使用Ajax Control Toolkit完整解决方案
- html5触摸界面设计与开发_原生APP的开发步骤主要分为哪些?
- 朝鲜欲对韩国发起大规模网络攻击 但计划被韩方挫败
- 宏碁笔记本linux,Acer宏碁(Acer宏碁)Acer 4752G-2332G50Mnkk Linux笔记本电脑整体评测-ZOL中关村在线...
- 基本类型,指针,双指针作为函数参数
- 使用OpenCore引导黑苹果
- Redis系列——Redis实战
- openwrt - transmission
- 问答题库(路由与交换){简答版}
- 【整理】训练序列与导频序列的概念辨析
- “第二课堂”开课啦~