视频教程已经放在B站
请大家狠狠地三连我
虽然我没有稚晖君那么强

教程!基于树莓派+传感器+阿里云IoT的智能家居管理(2)

主文件


#!/usr/bin/python3import aliLink,mqttd,rpi
import time,json
import Adafruit_DHT
import time
import LCD1602
import flame_sensor
import buzzer_1
import rain_detector
import gas_sensor
import relay
from threading import Threadpin = 19  # DHT11 温湿度传感器管脚定义
Buzzer = 20    # 有源蜂鸣器管脚定义# GPIO口定义
sensor = Adafruit_DHT.DHT11# 三元素(iot后台获取)
ProductKey = 'a11lzCDSgZP'
DeviceName = 'IU6aSETyiImFPSkpcywm'
DeviceSecret = "2551eb5f630c372743c538e9b87bfe6d"
# topic (iot后台获取)
POST = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post'  # 上报消息到云
POST_REPLY = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/event/property/post_reply'
SET = '/sys/a11lzCDSgZP/IU6aSETyiImFPSkpcywm/thing/service/property/set'  # 订阅云端指令#窗户开关
window = 0
window_status = 0
Thread(target=relay.close).start()# 消息回调(云端下发消息的回调函数)
def on_message(client, userdata, msg):#print(msg.payload)Msg = json.loads(msg.payload)global window,window_statuswindow = Msg['params']['window']print(msg.payload)  # 开关值if window_status != window:window_status = windowif window == 1:Thread(target=relay.open).start()else:Thread(target=relay.close).start()#连接回调(与阿里云建立链接后的回调函数)
def on_connect(client, userdata, flags, rc):pass# 链接信息
Server,ClientId,userNmae,Password = aliLink.linkiot(DeviceName,ProductKey,DeviceSecret)# mqtt链接
mqtt = mqttd.MQTT(Server,ClientId,userNmae,Password)
mqtt.subscribe(SET) # 订阅服务器下发消息topic
mqtt.begin(on_message,on_connect)# 信息获取上报,每2秒钟上报一次系统参数
while True:#获取指示灯状态power_stats=int(rpi.getLed())if(power_stats == 0):power_LED = 0else:power_LED = 1# CPU 信息CPU_temp = float(rpi.getCPUtemperature())  # 温度   ℃CPU_usage = float(rpi.getCPUuse())         # 占用率 %# RAM 信息RAM_stats =rpi.getRAMinfo()RAM_total =round(int(RAM_stats[0]) /1000,1)    #RAM_used =round(int(RAM_stats[1]) /1000,1)RAM_free =round(int(RAM_stats[2]) /1000,1)# Disk 信息DISK_stats =rpi.getDiskSpace()DISK_total = float(DISK_stats[0][:-1])DISK_used = float(DISK_stats[1][:-1])DISK_perc = float(DISK_stats[3][:-1])#温度,湿度humidity, temperature = Adafruit_DHT.read_retry(sensor, pin)# LCD显示LCD = 0try:LCD1602.init(0x27, 1)          # 初始化显示屏LCD1602.write(0, 0, 'humidity:    ' + str(int(humidity)) + '%')LCD1602.write(0, 1, 'temperature: ' + str(int(temperature)) + '\'')  # 在第二行显示world!LCD = 1except:print("显示屏连接不稳定,请检查")LCD = 0# 蜂鸣器buzzer = 0# 火焰传感器flame_sensor.setup()if flame_sensor.fire() == 0:flame = 1buzzer_1.buzzer_on() #让铃声叫buzzer = 1else:flame = 0buzzer_1.buzzer_off() #铃声不叫buzzer = 0# 烟雾传感器gas = gas_sensor.gas()# 雨滴传感器rain_detector.setup()if rain_detector.rain() == 0:rain = 1else:rain = 0# 构建与云端模型一致的消息结构updateMsn = {'cpu_temperature':CPU_temp,'cpu_usage':CPU_usage,'RAM_total':RAM_total,'RAM_used':RAM_used,'RAM_free':RAM_free,'DISK_total':DISK_total,'DISK_used_space':DISK_used,'DISK_used_percentage':DISK_perc,'PowerLed':power_LED,'temperature':temperature,'humidity':humidity,'window':window,'LCD':LCD,'buzzer':buzzer,'flame':flame,'rain':rain,'gas':gas}JsonUpdataMsn = aliLink.Alink(updateMsn)print(JsonUpdataMsn)mqtt.push(POST,JsonUpdataMsn) # 定时向阿里云IOT推送我们构建好的Alink协议数据time.sleep(3)

rpi

# 树莓派数据与控制import os
# Return CPU temperature as a character stringdef getCPUtemperature():res =os.popen('vcgencmd measure_temp').readline()return(res.replace("temp=","").replace("'C\n",""))# Return RAM information (unit=kb) in a list
# Index 0: total RAM
# Index 1: used RAM
# Index 2: free RAM
def getRAMinfo():p =os.popen('free')i =0while 1:i =i +1line =p.readline()if i==2:return(line.split()[1:4])# Return % of CPU used by user as a character string
def getCPUuse():data = os.popen("top -n1 | awk '/Cpu\(s\):/ {print $2}'").readline().strip()return(data)# Return information about disk space as a list (unit included)
# Index 0: total disk space
# Index 1: used disk space
# Index 2: remaining disk space
# Index 3: percentage of disk used
def getDiskSpace():p =os.popen("df -h /")i =0while True:i =i +1line =p.readline()if i==2:return(line.split()[1:5])
def  powerLed(swatch):led = open('/sys/class/leds/led1/brightness', 'w', 1)led.write(str(swatch))led.close()# LED灯状态检测
def getLed():led = open('/sys/class/leds/led1/brightness', 'r', 1)state=led.read()led.close()return stateif __name__ == "__main__":# CPU informatiomCPU_temp =getCPUtemperature()CPU_usage =getCPUuse()print(CPU_usage)# RAM information# Output is in kb, here I convert it in Mb for readabilityRAM_stats =getRAMinfo()RAM_total = round(int(RAM_stats[0]) /1000,1)RAM_used = round(int(RAM_stats[1]) /1000,1)RAM_free = round(int(RAM_stats[2]) /1000,1)print(RAM_total,RAM_used,RAM_free)# Disk informationDISK_stats =getDiskSpace()DISK_total = DISK_stats[0][:-1]DISK_used = DISK_stats[1][:-1]DISK_perc = DISK_stats[3][:-1]print(DISK_total,DISK_used,DISK_perc)

继电器

import RPi.GPIO as GPIO
import time
import asyncioGPIO.setmode(GPIO.BCM)              # 管脚映射,采用BCM编码
GPIO.setwarnings(False)             # 忽略GPIO 警告CH1 = 17
CH2 = 16 #继电器输入信号管脚GPIO.setup(CH1,GPIO.OUT)
GPIO.setup(CH2,GPIO.OUT)def open():GPIO.output(CH1, GPIO.LOW)time.sleep(10)GPIO.output(CH1, GPIO.HIGH)def close():GPIO.output(CH2, GPIO.LOW)time.sleep(10)GPIO.output(CH2, GPIO.HIGH)

阿里云连接

import time,json,random
import hmac,hashlibdef linkiot(DeviceName,ProductKey,DeviceSecret,server = 'iot-as-mqtt.cn-shanghai.aliyuncs.com'):serverUrl = serverClientIdSuffix = "|securemode=3,signmethod=hmacsha256,timestamp="# 拼合Times = str(int(time.time()))  # 获取登录时间戳Server = ProductKey+'.'+serverUrl    # 服务器地址ClientId = DeviceName + ClientIdSuffix + Times +'|'  # ClientIduserNmae = DeviceName + "&" + ProductKeyPasswdClear = "clientId" + DeviceName + "deviceName" + DeviceName +"productKey"+ProductKey + "timestamp" + Times  # 明文密码# 加密h = hmac.new(bytes(DeviceSecret,encoding= 'UTF-8'),digestmod=hashlib.sha256)  # 使用密钥h.update(bytes(PasswdClear,encoding = 'UTF-8'))Passwd = h.hexdigest()return Server,ClientId,userNmae,Passwd# 阿里Alink协议实现(字典传入,json str返回)
def Alink(params):AlinkJson = {}AlinkJson["id"] = random.randint(0,999999)AlinkJson["version"] = "1.0"AlinkJson["params"] = paramsAlinkJson["method"] = "thing.event.property.post"return json.dumps(AlinkJson)if __name__ == "__main__":pass

蜂鸣器

import RPi.GPIO as GPIO
import timeBuzzerPin = 20    # 有源蜂鸣器管脚定义# GPIO设置函数
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)                       # 关闭GPIO警告提示
GPIO.setup(BuzzerPin, GPIO.OUT)     # 设置有源蜂鸣器管脚为输出模式
GPIO.output(BuzzerPin, GPIO.HIGH)   # 蜂鸣器设置为高电平,关闭蜂鸟器#  打开蜂鸣器
def buzzer_on():GPIO.output(BuzzerPin, GPIO.LOW)  # 蜂鸣器为低电平触发,所以使能蜂鸣器让其发声
# 关闭蜂鸣器
def buzzer_off():GPIO.output(BuzzerPin, GPIO.HIGH) # 蜂鸣器设置为高电平,关闭蜂鸟器# 控制蜂鸣器鸣叫
def beep(x):buzzer_on()     # 打开蜂鸣器控制time.sleep(x)            # 延时时间buzzer_off()    # 关闭蜂鸣器控制time.sleep(x)            # 延时时间# 循环函数
def loop():while True:beep(1) # 控制蜂鸣器鸣叫,延时时间为500mmdef destroy():GPIO.output(BuzzerPin, GPIO.HIGH) # 关闭蜂鸣器鸣叫GPIO.cleanup()                     # 释放资源# 程序入口
if __name__ == '__main__':try:                            # 检测异常loop()                      # 调用循环函数except KeyboardInterrupt:  # 当按下Ctrl+C时,将执行destroy()子程序。destroy()              # 释放资源

火焰传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import mathDO = 26  # 火焰传感器数字IO口
GPIO.setmode(GPIO.BCM) # 管脚映射,采用BCM编码# 初始化工作
def setup():ADC.setup(0x48)    # 设置PCF8591模块地址GPIO.setup(DO, GPIO.IN) # 设置火焰传感器数字IO口为输入模式# 打印信息,打印出火焰传感器的状态值
def Print(x):if x == 1:      # 安全print ('')print ('   *******************')print ('   *  Makerobo Safe~ *')print ('   *******************')print ('')if x == 0:     # 有火焰print ('')print ('   ******************')print ('   * Makerobo Fire! *')print ('   ******************')print ('')# 功能函数
def fire():status = 1      # 状态值# 读取火焰传感器数字IO口return GPIO.input(DO)# 程序入口
if __name__ == '__main__':try:setup() # 初始化fire()except KeyboardInterrupt:pass

烟雾传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import mathDO = 18                    # 烟雾传感器数字IO口
GPIO.setmode(GPIO.BCM)              # 管脚映射,采用BCM编码
GPIO.setwarnings(False)             # 忽略GPIO 警告# 初始化工作
def setup():ADC.setup(0x48)                      # 设置PCF8591模块地址GPIO.setup (DO,GPIO.IN)    # 烟雾传感器数字IO口,设置为输入模式# 打印信息,打印出是否检测到烟雾信息
def Print(x):if x == 1:     # 安全print ('')print ('   ******************')print ('   * Makerobo Safe~ *')print ('   ******************')print ('')if x == 0:    # 检测到烟雾print ('')print ('   ************************')print ('   * Makerobo Danger Gas! *')print ('   ************************')print ('')# 循环函数
def gas():setup()return GPIO.input(DO)  # 读取GAS烟雾传感器数字IO口值# 程序入口
if __name__ == '__main__':setup()   # 初始化函数loop()    # 循环函数

LCD1602

import time
import smbusBUS = smbus.SMBus(1)# IIC LCD1602 液晶模块写入字
def write_word(addr, data):global BLENtemp = dataif BLEN == 1:temp |= 0x08else:temp &= 0xF7BUS.write_byte(addr ,temp) # 设置IIC LCD1602 液晶模块地址# IIC LCD1602 发送命令
def  send_command(comm):# 首先发送 bit7-4 位lcd_buf = comm & 0xF0lcd_buf |= 0x04               # RS = 0, RW = 0, EN = 1write_word(LCD_ADDR ,lcd_buf)time.sleep(0.002)lcd_buf &= 0xFB               # Make EN = 0write_word(LCD_ADDR ,lcd_buf)# 其次发送 bit3-0 位lcd_buf = (comm & 0x0F) << 4lcd_buf |= 0x04               # RS = 0, RW = 0, EN = 1write_word(LCD_ADDR ,lcd_buf)time.sleep(0.002)lcd_buf &= 0xFB               # Make EN = 0write_word(LCD_ADDR ,lcd_buf)def send_data(data):# 首先发送 bit7-4 位lcd_buf = data & 0xF0lcd_buf |= 0x05               # RS = 1, RW = 0, EN = 1write_word(LCD_ADDR ,lcd_buf)time.sleep(0.002)lcd_buf &= 0xFB               # Make EN = 0write_word(LCD_ADDR ,lcd_buf)# 其次发送 bit3-0 位lcd_buf = (data & 0x0F) << 4lcd_buf |= 0x05               # RS = 1, RW = 0, EN = 1write_word(LCD_ADDR ,lcd_buf)time.sleep(0.002)lcd_buf &= 0xFB               # Make EN = 0write_word(LCD_ADDR ,lcd_buf)# IIC LCD1602 初始化
def init(addr, bl):global LCD_ADDRglobal BLENLCD_ADDR = addrBLEN = bltry:send_command(0x33) # 必须先初始化到8线模式time.sleep(0.005)send_command(0x32) # 然后初始化为4行模式time.sleep(0.005)send_command(0x28) # 2 行 & 5*7 点位time.sleep(0.005)send_command(0x0C) # 启用无光标显示time.sleep(0.005)send_command(0x01) # 清除显示BUS.write_byte(LCD_ADDR, 0x08)except:return Falseelse:return True# LCD 1602 清空显示函数
def clear():send_command(0x01)  # 清除显示# LCD 1602 使能背光显示
def openlight():BUS.write_byte(0x27,0x08)  # 使能背光显示命令BUS.close()                # 关闭总线# LCD 1602 显示函数
def write(lcd_x, lcd_y, lcd_str):# 选择行与列if lcd_x < 0:lcd_x = 0if lcd_x > 15:lcd_x = 15if lcd_y <0:lcd_y = 0if lcd_y > 1:lcd_y = 1# 移动光标lcd_addr = 0x80 + 0x40 * lcd_y + lcd_xsend_command(lcd_addr)    # 发送地址for chr in lcd_str:                  # 获取字符串长度send_data(ord(chr)) # 发送显示# 程序入口
if __name__ == '__main__':init(0x27, 1)          # 初始化显示屏write(0, 0, 'Hello')   # 在第一行显示Hellowrite(0, 1, 'world!')  # 在第二行显示world!

mqtt

#!/usr/bin/python3# pip install paho-mqtt
import paho.mqtt.client# =====初始化======
class MQTT():def __init__(self,host,CcientID,username=None,password=None,port=1883,timeOut=60):self.Host = hostself.Port = portself.timeOut = timeOutself.username =usernameself.password = passwordself.CcientID = CcientIDself.mqttc = paho.mqtt.client.Client(self.CcientID)    #配置IDif self.username is not None:    #判断用户名密码是否为空self.mqttc.username_pw_set(self.username, self.password)    #不为空则配置账号密码self.mqttc.connect(self.Host, self.Port, self.timeOut) #初始化服务器  IP  端口  超时时间# 初始化def begin(self,message,connect):self.mqttc.on_connect = connectself.mqttc.on_message = messageself.mqttc.loop_start()  # 后台新进程循环监听# =====发送消息==========def push(self,tag,date,_Qos = 0):self.mqttc.publish(tag,date,_Qos)#print('OK',date)# =======订阅tips=====def subscribe(self,_tag):self.mqttc.subscribe(_tag)   #监听标签

PCF8591数模转化模块

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 说明:这是一个PCF8591模块的程序。
#      警告:模拟输入不能超过3.3V!
# 在这个程序中,我们使用电位计进行模拟输入和控制一个模拟电压
# 的LED灯,你可以导入这个程序到另一个程序中使用:
# import PCF8591 as ADC
# ADC.Setup(Address)  # 通过 sudo i2cdetect -y -1 可以获取到IIC的地址
# ADC.read(channal) # 通道选择范围为0-3
# ADC.write(Value)  # 值的范围为:0-255
#####################################################
import smbus
import time# 对应比较旧的版本如RPI V1 版本,则 "bus = smbus.SMBus(0)"
bus = smbus.SMBus(1)#通过 sudo i2cdetect -y -1 可以获取到IIC的地址
def setup(Addr):global addressaddress = Addr# 读取模拟量信息
def read(chn): #通道选择,范围是0-3之间try:if chn == 0:bus.write_byte(address,0x40)if chn == 1:bus.write_byte(address,0x41)if chn == 2:bus.write_byte(address,0x42)if chn == 3:bus.write_byte(address,0x43)bus.read_byte(address) # 开始进行读取转换except Exception as e:print ("Address: %s" % address)print (e)return bus.read_byte(address)# 模块输出模拟量控制,范围为0-255
def write(val):try:temp = val # 将数值赋给temmp 变量temp = int(temp) # 将字符串转换为整型# 在终端上打印temp以查看,否则将注释掉bus.write_byte_data(address, 0x40, temp)except Exception as e:print ("Error: Device address: 0x%2X" % address)print (e)if __name__ == "__main__":setup(0x48)while True:print ('AIN0 = ', read(0))print ('AIN1 = ', read(1))tmp = read(0)tmp = tmp*(255-125)/255+125 # 低于125时LED不会亮,所以请将“0-255”转换为“125-255”write(tmp)
#       time.sleep(0.3)

雨滴传感器

import PCF8591 as ADC
import RPi.GPIO as GPIO
import time
import mathDO = 22       # 雨滴传感器数字管脚GPIO.setmode(GPIO.BCM) # 采用BCM管脚给GPIO口# GPIO口定义
def setup():ADC.setup(0x48)      # 设置PCF8591模块地址GPIO.setup(DO, GPIO.IN)  # 设置雨滴传感器管脚为输入模式# 打印出雨滴传感器提示信息
def Print(x):if x == 1:          # 没有雨滴print ('')print ('   ************************')print ('   * Not raining *')print ('   ************************')print ('')if x == 0:          # 有雨滴print ('')print ('   **********************')print ('   * Raining!! *')print ('   **********************')print ('')
# 循环函数
def loop():status = 1      # 雨滴传感器状态while True:print (ADC.read(2))  # 打印出AIN3的模拟量数值tmp = GPIO.input(DO)      # 读取数字IO口电平,读取数字雨滴传感器DO端口if tmp != status:         # 状态发生改变Print(tmp)   # 打印出雨滴传感器检测信息status = tmp # 状态值重新赋值time.sleep(0.2)                    # 延时200ms# 功能函数
def rain():status = 1      # 雨滴传感器状态# 读取数字IO口电平,读取数字雨滴传感器DO端口return GPIO.input(DO)# 程序入口
if __name__ == '__main__':try:setup()   # GPIO定义loop()    # 调用循环函数except KeyboardInterrupt:pass

基于树莓派+传感器+阿里云IoT的智能家居管理(代码实现)相关推荐

  1. 基于树莓派+STM32+OneNET云平台打造智能家居系统(一)硬件设计篇

      本次分享的是之前一个课程设计, 会分为几篇博文进行分享.智能家居是目前研究与发展的一大热点,本设计是结合STM32微处理器/树莓派(Raspberry Pi)3b+.温湿度传感器.继电器以及ESP ...

  2. 手把手实践丨基于STM32+NBIOT+华为云IOT设计智能井盖

    摘要:本文介绍基于STM32微控制器.BC26 NBIOT模组和华为云IOT平台,实现了一款智能井盖系统. 本文分享自华为云社区<基于STM32+NBIOT+华为云IOT设计的智能井盖>, ...

  3. 【STM32】基于stm32的阿里云智能家居

    摘 要 智能家居是一种通过物联网将家里的各种电器设备连接在一起,并由中心控制器统一管理的信息系统.系统的核心是各类家居信息的采集与处理.阿里云能够提供云端的数据存储和分析功能,可以作为智能家居中心控制 ...

  4. 揭秘阿里云IoT安全平台Link Security如何实现物联网产品全生命周期管理

      全世界99%的物体尚未联网,一场由物联网(IoT)技术引发的"万物智联"革命正在加速到来.但是随之而来的物联网安全问题,也显得非常重要.这里既包括物联网终端的安全,也包括物联网 ...

  5. 4G Modbus Json边缘网关接入阿里云IoT平台

    LTE-669P 4G边缘网关系列 阿里云平台连接教程 今天介绍物联设备 LTE-669P 4G边缘网关如何接入阿里云平台系列. LTE-669P是一款工业级4G JSON无线边缘解析网关,支持RS4 ...

  6. 10分钟虚拟设备接入阿里云IoT平台实战

    10分钟虚拟设备接入阿里云IoT平台实战 1. 准备工作 1.1 注册阿里云账号 使用个人淘宝账号或手机号,开通阿里云账号,并通过实名认证(可以用支付宝认证) 1.2 免费开通IoT物联网套件 产品官 ...

  7. 基于阿里云IOT Studio和STM32的电机远程监测设计

    今天来总结一下用阿里云的IOT Studio做的一个电机远程监控的小系统吧! 说来话长,在去年九月份的时候,我踏入了研究生的行列.我的导师是搞电机方向的,但我本科是测控的,考虑我的基础,导师给我推荐了 ...

  8. 文末赠书5本 | 附源码 | 三等奖作品 | 基于RA4M2和阿里云物联网平台的智能卧室小管家

    [RA4M2设计挑战赛]基于RA4M2和阿里云物联网平台的智能卧室小管家 摘要 本项目已RA4M2为主控,搭载了RT-Thread实时操作系统,配合Wi-Fi模块来提供网络通讯能力,外设接入了继电器. ...

  9. 阿里云IoT何云飞:智物Cloud AIoT Native 为何能让设备智能更快一步

    简介:在10月21日举行的2021云栖大会-IoT云端一体硬件与应用创新峰会上,阿里云智能IoT产品总经理何云飞在会上做了主题为"阿里云 Cloud AIoT Native 年度升级&quo ...

最新文章

  1. 自学入门不在困难,初学者挑战学习Python编程30天 (三)
  2. Nacos配置中心原理
  3. 经验传承:谈色彩设计方法
  4. 成功解决ModuleNotFoundError: No module named 'torch.utils.tensorboard'
  5. PHP版本如何选择?应该使用哪个版本?
  6. Spring IOC 组件概述
  7. Mac下给sublime text3配置Nodejs
  8. 全程软件测试之测试需求分析与计划(2)
  9. 经典排序算法(11)——计数排序算法详解
  10. 获得picker选项的当前年月值_如果你用OPPO手机!千万记得开启开发者选项,手机性能大幅度提升...
  11. 二手房买卖砍价最新攻略 帮你花少钱买好房
  12. Android 系统(223)---Android-打包与快速打包
  13. React学习(8)—— 高阶应用:不使用ES6、JSX实现React
  14. 字符串对象的charAt函数存在的意义
  15. python创建类mymath_构建DLL(MyMathFuncs)以在Python Ctypes中使用
  16. 基于Swing与JavaFx的音乐播放器——轻音
  17. stm32单片机驱动L298N模块
  18. Google Earth Engine(GEE)——设置经纬格网(日本东京)
  19. HE4484E原厂升压8.4v锂电池充电芯片
  20. 怎么把PDF页面删除?教你两种方法

热门文章

  1. 关于无法添加微信有隐私设置的好友
  2. uni-app自定义摄像头拍照添加人物框
  3. 网页实现掷骰子小游戏
  4. 一分钟读懂阿里云产品:ECS概述
  5. 只有在日本便利店才能吃到的『独家美食』
  6. 如何让打开mysql数据库
  7. matplotlib--------简单的折线图之x轴y轴刻度
  8. 词袋模型与TF-IDF模型
  9. 用Python批量重命名文件
  10. uniapp全网最简单实现夜间模式