pythonserial函数_python3.5 中serial模块的问题
在python3.5中使用serial模块进行串口通信,通过while 1循环进行持续性的设备通信,但是第一个设备完成之后对下一个设备进行通信会有问题:Attempting to use a port that is not open
一下是我的代码:
import serial
import time
import base64
import hmac
import pymysql
from esptool import ESP8266StubLoader
from esptool import erase_flash, read_mac, write_flash
from log import TNLog
values = ['0x01000', 'D:\esp8266\user1.4096.new.4(3).bin']
values = ['0x01000', '/home/pi/Interface/Anthink/esp8266/user1.4096.new.4(3).bin']
v1 = ['0x00000', 'D:\esp8266\boot_v1.7.bin']
v1 = ['0x00000', '/home/pi/Interface/Anthink/esp8266/boot_v1.7.bin']
v2 = ['0x3fb000', 'D:\esp8266\blank.bin']
v2 = ['0x3fb000', '/home/pi/Interface/Anthink/esp8266/blank.bin']
v3 = ['0x3fe000', 'D:\esp8266\blank.bin']
v3 = ['0x3fe000', '/home/pi/Interface/Anthink/esp8266/blank.bin']
v4 = ['0x3fc000', 'D:\esp8266\esp_init_data_default_v08.bin']
v4 = ['0x3fc000', '/home/pi/Interface/Anthink/esp8266/esp_init_data_default_v08.bin']
argfile = open(values[1], 'rb')
a1 = open(v1[1], 'rb')
a2 = open(v2[1], 'rb')
a3 = open(v3[1], 'rb')
a4 = open(v4[1], 'rb')
db = pymysql.connect("192.168.0.250", "dev_v7", "vvvvvv7", "db_iot_v7", charset='utf8')
cursor = db.cursor()
logger = TNLog()
prorts = serial.serial_for_url('COM3')
prorts = serial.serial_for_url('COM3')
prorts.baudrate = 115200
s = 1019300100006626 # 扫码得到的设备SN
自定义属性值
class WriteOpthons(object):
def init(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
设备初始化
class MyClass(object):
def init(self):
self._port = prorts
self._trace_enabled = False
logger.info('接口初始化已经完成:{}'.format(self._port))
设备调试
class SerialPort():
logger.info('现在开始进行调试硬件设备操作!')
message = ''
def connectport(self, com, baud):
try:
super(SerialPort, self).__init__()
self.port = serial.Serial(com, baud)
try:
if self.port.isOpen():
logger.info('已经连接OK:blfadd:connect port is ok')
print("blfadd:connect port is ok")
return 1
except Exception as e: # 捕获异常输出并终止运行
logger.error('硬件连接失败:{}'.format(e))
return 0
raise
except Exception as e:
logger.error('硬件调试连接操作出错:{}'.format(e))
def restart(self):
self.port.write()
def restart(self):
self.port.setDTR(True) # 0wei gao
self.port.setRTS(True)
time.sleep(1.5)
self.port.setDTR(True)
self.port.setRTS(False)
def setDTR(self, state):
self.port.setDTR(state)
def read_data(self):
global timeStamp
test1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
start_time = time.strptime(test1, "%Y-%m-%d %H:%M:%S")
# strftime是转换为特定格式输出,而strptime是将一个(时间)字符串解析为时间的一个类型对象。一个是按照想要的格式,去转换。重点是格式!另外一个不管什么格式,我只要把特定的时间字符串转成时间类型即可!
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
while True:
self.message = self.port.readline().decode('utf8', 'ignore')
test2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
end_time = time.strptime(test2, "%Y-%m-%d %H:%M:%S")
timeStamp = int(time.mktime(end_time)) - int(time.mktime(start_time))
if (self.message[0:10] == "uart begin" and timeStamp <= 80):
return 1
elif (timeStamp > 90):
raise Exception("blfadd:Time out")
def getwifiinfo(self, query_command, catchnum): # catchnum增加参数为捕获的wifi数量
try:
global count # 定义变量couunt用来计数
global macId
count = 0
self.port.write((query_command + '\n').encode())
huixian = self.port.readline().decode('utf8', 'ignore')
while (huixian[0:8] == "scandone"):
continue
while 100:
m = self.port.readline()
data_2 = m.decode('utf8', 'ignore')
data = m.decode('utf8', 'ignore')[0:-3]
if (data[0:5] == "data="): # 只读取开头为data=的信息
data_1 = data[6:]
# m1=data_1.split(',')[0:1]
m2 = data_1.split(',')[1:2]
m3 = data_1.split(',')[2:3]
m4 = data_1.split(',')[3:4]
# m5=data_1.split(',')[4:5]
wifiName = str(m2[0]).replace('"', '') # 替换掉“”
macId = str(m4[0]).replace('"', '')
# cursor =db.cursor()
count += 1 # if 语句每执行一次计数器加1
# 下面将wifi信息插入到数据库
# sql = """INSERT INTO WIFIDATA(INFO_1,INFO_2,INFO_3,INFO_4,INFO_5) VALUES(%s,%s,%s,%s,%s)"""
# try:
# val=(m1[0], wifiName, m3[0],macId, m5[0] )
# print("1111"+str(val))
# cursor.execute(sql,val)
# db.commit()
# except Exception as e:
# print("Error!"+e)
# db.rollback()
# db.close()
# print(m1[0],wifiName,m3[0],macId,m5[0])
print(wifiName, m3[0], macId) # blfalter:不展示搜索到的wifi
logger.info('获取wifi的信息:{}-{}-{}'.format(wifiName, m3[0], macId))
if (data_2[0:8] == "data_end"): # 这里没有进入if语句的原因是前面data收集信息时已经把
break # data_end截取掉了改用data_2收信息
if (count < catchnum): # 当前wifi数量与阈值作比
print("blfadd:Cannot catch nomal wifi singal") # 如果小于阈值将输出未能捕获wifi信号
elif (count >= catchnum):
logger.info('wifi模块完美检查完毕')
print("blfadd:Wifi Module checks ok")
print("blfadd:Now WifiNum:%d" % count) # 输出当前搜索到的wifi总数量
logger.info('搜索到的wifi个数:{}'.format(count))
return count
except Exception as e:
logger.error('硬件获取wifi操作报错:{}'.format(e))
def getcpuid(self, query_command):
try:
global cpuid
self.port.write((query_command + '\n').encode())
while 1:
m = self.port.readline()
data_2 = m.decode('utf8', 'ignore')
data = m.decode('utf8', 'ignore')[0:-2]
if (data[0:4] == "rec="): # 只读取开头为data=的信息
cpuid = data[4:]
if (data_2[0:16] == "command run over"):
return cpuid
except Exception as e:
logger.error('硬件获取cpuid操作报错:{}'.format(e))
def getmemid(self, query_command):
try:
global memid
self.port.write((query_command + '\n').encode())
while 1:
m = self.port.readline()
data_2 = m.decode('utf8', 'ignore')
data = m.decode('utf8', 'ignore')[0:-2]
if (data[0:4] == "rec="): # 只读取开头为data=的信息
memid = data[4:]
if (data_2[0:16] == "command run over"):
return memid
except Exception as e:
logger.error('硬件获取memid操作报错:{}'.format(e))
def ramdomId(self,s): # 获取20位产品序列号devicekey
res = {}
try:
sql = "SELECT * FROM device_status WHERE device = %s"
device_info = cursor.execute(sql, (s,))
if device_info:
devicekey = cursor.fetchone()[1]
# global devicekey
# devicekey = random.randint(10000000000000000000, 99999999999999999999) # random.sample()生成不相同的随机数
logger.info('数据库中与设备的SN匹配:{}'.format(devicekey))
sql = "UPDATE device_status SET `status` = 2 WHERE device = %s"
cursor.execute(sql, (s,))
db.commit()
logger.info('更改SN状态为2成功!!')
res.update({'status': 200, 'devicekey': devicekey})
# return str(devicekey)
else:
logger.info('SN与数据库中的SN扫描结果不一致')
res.update({'status': 403})
except Exception as e:
logger.info('硬件获取SN操作报错:{}'.format(e))
finally:
return res
def generate_token(self, key, expire):
r'''
@Args:
key: str (用户给定的key,需要用户保存以便之后验证token,每次产生token时的key 都可以是同一个key)
expire: int(最大有效时间,单位为s)
@Return:
state: str
'''
try:
ts_str = str(time.time() + expire)
ts_byte = ts_str.encode("utf-8")
sha1_tshexstr = hmac.new(key.encode("utf-8"), ts_byte, 'sha1').hexdigest()
# hmac是密钥相关的哈希运算消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出。
token = ts_str + ':' + sha1_tshexstr
b64_token = base64.urlsafe_b64encode(token.encode("utf-8")) # Base64是一种用64个字符来表示任意二进制数据的方法
return b64_token.decode("utf-8")
except Exception as e:
logger.error('硬件生成token操作报错:{}'.format(e))
def certify_token(self, key, token):
r'''
@Args:
key: str
token: str
@Returns:
boolean
'''
try:
token_str = base64.urlsafe_b64decode(token).decode('utf-8')
token_list = token_str.split(':')
if len(token_list) != 2:
return 0
ts_str = token_list[0]
if float(ts_str) < time.time():
print("blfadd222222222222222>") # bldadd buzou
# token expired
return 0
known_sha1_tsstr = token_list[1]
sha1 = hmac.new(key.encode("utf-8"), ts_str.encode('utf-8'), 'sha1')
calc_sha1_tsstr = sha1.hexdigest()
if calc_sha1_tsstr != known_sha1_tsstr:
# token certification failed
print("Not ok") # 不走这一步
return 0
print("ok") # bldadd zou
return 1
except Exception as e:
logger.error('硬件验证token操作报错:{}'.format(e))
def setdev(self, query_command):
try:
self.port.write((query_command).encode())
while 1:
m = self.port.readline()
data = m.decode('utf8', 'ignore')
if (data[0:16] == "command run over"):
break
except Exception as e:
logger.error('硬件TOKEN/SN写入板子操作报错:{}'.format(e))
def setflag(self, query_command):
try:
self.port.write((query_command + '\n').encode())
while 1:
m = self.port.readline()
data = m.decode('utf8', 'ignore')
if (data[0:16] == "command run over"):
break
except Exception as e:
logger.error('硬件完成调试操作报错:{}'.format(e))
def getwifiname1(self, query_command, Abvalue, valuechange):
try:
# wifiName
global Intensity
self.port.write((query_command + '\n').encode())
huixian = self.port.readline().decode('utf8', 'ignore')
while (huixian[0:8] == "scandone"):
continue
while 100:
m = self.port.readline()
data_2 = m.decode('utf8', 'ignore')
data = m.decode('utf8', 'ignore')[0:-3]
if (data[0:5] == "data="): # 只读取开头为data=的信息
data_1 = data[6:]
m2 = data_1.split(',')[1:2]
m3 = data_1.split(',')[2:3]
m4 = data_1.split(',')[3:4]
Intensity = m3[0]
wifiName = str(m2[0]).replace('"', '') # 替换掉“”
if (wifiName == "XunMingIOT" and Abvalue + valuechange >= int(
m3[0]) >= Abvalue - valuechange): # 判断输出wifi信号强度值
print("blfadd:Standard Intensity:" + m2[0], m3[0], m4[0])
return m2[0], m3[0], m4[0]
elif (wifiName == "XunMingIOT" and Abvalue - valuechange > int(m3[0])):
print("blfadd:Low Intensity:" + m2[0], m3[0], m4[0]) # 输出wifi名称
return m2[0], m3[0], m4[0]
elif (wifiName == "XunMingIOT"):
print("blfadd:High Intensity:" + m2[0], m3[0], m4[0])
return m2[0], m3[0], m4[0]
if (data_2[0:8] == "data_end"): # 检测到data end结尾时候跳出循环
break
except Exception as e:
logger.error('硬件获取wifi操作报错:{}'.format(e))
# def mysql_hardinfo(self):
# key = s
# cpu = cpuid
# mac = macId
# mem = memid
# wificount = count
# time = timeStamp
# wifivalue = Intensity
# return key, mac, cpu, mem, wificount, wifivalue, time
主函数
def _main():
while 1:
count = 0
s = input('请输入SN号:').strip()
try:
obj = ESP8266StubLoader(obA)
except Exception as e:
print(type(e))
print(e)
pass
continue
obj.connect()
read_mac(obj, None) # mac地址
obj.IS_STUB = False
esp = obj.run_stub()
print('>>>>>>>>>>>>>>>>>>')
print(obj)
print(prorts)
print(esp)
res = erase_flash(esp, None) # 擦除操作
if res['status'] == int(404):
logger.info('设备擦除命令执行失败,失败的原因是:擦除任务超时')
logger.info('接口已经关闭')
count = count+1
if count < 3:
logger.info('现在设备执行次数为:{}'.format(count))
return _main()
else:
logger.info('设备已经执行3次还是有问题现在怀疑是设备硬件的问题!')
obj._port.close()
prorts.close()
prorts.isOpen()
else:
# 下载文件操作
write_option = WriteOpthons(
addr_filename=[(4096, argfile), (0, a1), (4173824, a2), (4186112, a3), (4177920, a4)],
after='hard_reset', baud=115200, before='default_reset', chip='auto', compress=None,
erase_all=False, flash_freq='40m', flash_mode='dio', flash_size='4MB',
no_compress=False, no_progress=False, no_stub=False, operation='write_flash',
override_vddsdio=None, port='COM3', spi_connection=None, trace=False, verify=False)
res = write_flash(esp=esp, args=write_option)
if res:
logger.info('文件下载完成')
obj._port.close()
prorts.close()
print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
nb = prorts.isOpen() # False 关闭端口了
print(nb)
else:
logger.info('文件下载失败')
obj._port.close()
prorts.close()
prorts.isOpen()
# 调试命令操作
f1 = SerialPort() # f1为实例化接口
f1.connectport('COM3', 74880)
f1.read_data() # 初始化串口返回串口信息 # 1
f1.getwifiinfo('command: getwifiinfo', 2) # 读取wifi信息并插入数据库
f1.getwifiname1('command: getwifiinfo', -50, 4) # 读取wifi信息并插入数据库
cpuid = f1.getcpuid('command: getcpuid')
logger.info('设备cpuid:{}'.format(cpuid))
memid = f1.getmemid('command: getmemid')
logger.info('设备memid是:{}'.format(memid))
serial_number = f1.ramdomId(s)
if serial_number['status'] == 403:
logger.info('获取sn不一致导致串口关闭')
f1.port.close()
prorts.close()
else:
logger.info('生成产品SN:{}'.format(serial_number['devicekey']))
key = cpuid + memid # 使用cpuid与memid拼接作为key
token = f1.generate_token(key, 3600) # 生成token
f1.certify_token(key, token) # 验证token成功返回ok
token_str = token[20:60]
logger.info('依据cpuid+memid生成的token值为:{}'.format(token_str))
f1.setdev('command: settockenkey' + str(token_str))
logger.info('token已经写入到板子之中')
f1.setdev('command: setdevkey' + str(serial_number['devicekey']))
logger.info('SN已经写入到板子之中了')
f1.setflag('command: setflag')
logger.info('调试已经完成')
f1.port.close() # 关闭串口
# prorts.close()
ss = f1.port.isOpen()
print(ss) # 串口已经关闭
print('<>')
# print(prorts.is_open())
# obj._port.close()
logger.info('串口已经关闭!')
print(obj._port.isOpen())
obj._port.open()
print(obj._port.isOpen())
continue
if name == 'main':
while 1:
# obA = MyClass()
class MyClass(object):
def init(self):
self._port = prorts
self._trace_enabled = False
logger.info('接口初始化已经完成:{}'.format(self._port))
obA = MyClass()
_main()
# count = 0
# s = input('请输入SN号:').strip()
# try:
# obj = ESP8266StubLoader(obA)
# except Exception as e:
# print(type(e))
# print(e)
# pass
# continue
# obj.connect()
# read_mac(obj, None) # mac地址
# obj.IS_STUB = False
# esp = obj.run_stub()
#
# res = erase_flash(esp, None) # 擦除操作
# if res['status'] == int(404):
# logger.info('设备擦除命令执行失败,失败的原因是:擦除任务超时')
# logger.info('接口已经关闭')
# count = count + 1
# if count < 3:
# logger.info('现在设备执行次数为:{}'.format(count))
# # return _main()
# _main()
# else:
# logger.info('设备已经执行3次还是有问题现在怀疑是设备硬件的问题!')
# obj._port.close()
# prorts.close()
#
#
# else:
# # 下载文件操作
# write_option = WriteOpthons(
# addr_filename=[(4096, argfile), (0, a1), (4173824, a2), (4186112, a3), (4177920, a4)],
# after='hard_reset', baud=115200, before='default_reset', chip='auto', compress=None,
# erase_all=False, flash_freq='40m', flash_mode='dio', flash_size='4MB',
# no_compress=False, no_progress=False, no_stub=False, operation='write_flash',
# override_vddsdio=None, port='COM3', spi_connection=None, trace=False, verify=False)
# res = write_flash(esp=esp, args=write_option)
# if res:
# logger.info('文件下载完成')
# obj._port.close()
# prorts.close()
# print('<<<<<<<<<<<<<<<<<<<<<<<<<<<<
# print(prorts.isOpen())
# else:
# logger.info('文件下载失败')
# obj._port.close()
# prorts.close()
#
# # 调试命令操作
# f1 = SerialPort() # f1为实例化接口
# f1.connectport('COM3', 74880)
# f1.read_data() # 初始化串口返回串口信息 # 1
#
# f1.getwifiinfo('command: getwifiinfo', 2) # 读取wifi信息并插入数据库
# f1.getwifiname1('command: getwifiinfo', -50, 4) # 读取wifi信息并插入数据库
#
# cpuid = f1.getcpuid('command: getcpuid')
# logger.info('设备cpuid:{}'.format(cpuid))
#
# memid = f1.getmemid('command: getmemid')
# logger.info('设备memid是:{}'.format(memid))
#
# serial_number = f1.ramdomId(s)
# if serial_number['status'] == 403:
# logger.info('获取sn不一致导致串口关闭')
# prorts.close()
# else:
# logger.info('生成产品SN:{}'.format(serial_number['devicekey']))
#
# key = cpuid + memid # 使用cpuid与memid拼接作为key
# token = f1.generate_token(key, 3600) # 生成token
# f1.certify_token(key, token) # 验证token成功返回ok
# token_str = token[20:60]
# logger.info('依据cpuid+memid生成的token值为:{}'.format(token_str))
#
# f1.setdev('command: settockenkey' + str(token_str))
# logger.info('token已经写入到板子之中')
#
# f1.setdev('command: setdevkey' + str(serial_number['devicekey']))
# logger.info('SN已经写入到板子之中了')
#
# f1.setflag('command: setflag')
# logger.info('调试已经完成')
#
# f1.port.close() # 关闭串口
# # prorts.close()
# ss = f1.port.isOpen()
# print(ss) # 串口已经关闭
# print('<>')
# # print(prorts.is_open())
# # obj._port.close()
# logger.info('串口已经关闭!')
pythonserial函数_python3.5 中serial模块的问题相关推荐
- python中serial模块的使用_python中pyserial模块使用方法
一.概述 pyserial模块封装了对串口的访问. 二.特性 在支持的平台上有统一的接口. 通过python属性访问串口设置. 支持不同的字节大小.停止位.校验位和流控设置. 可以有或者没有接收超时. ...
- 通过mem函数在MicroPython中访问模块寄存器
简 介: 通过mem函数直接访问MCU内部的寄存器,可以完成一些在原来的MicroPython中内核没有实现的模块.通过测试可以看到,通过mem访问GPIO并没有明显增加访问的速度.使用mem访问CR ...
- python isalpha函数用法_python中string模块各属性以及函数的用法
任何语言都离不开字符,那就会涉及对字符的操作,尤其是脚本语言更是频繁,不管是生产环境还是面试考验都要面对字符串的操作. python的字符串操作通过2部分的方法函数基本上就可以解决所有的字符串操作需求 ...
- python中的模块_python3.0中重载模块
在python中,每一个以 .py结尾的Python文件都是一个模块.其他的文件可以通过导入一个模块来读取该模块的内容.导入从本质上来讲,就是载入另一个文件,并能够读取那个文件的内容.一个模块的内容通 ...
- python中time函数用法_python 中time模块使用
在开始之前,首先要说明这几点: 1.在Python中,通常有这几种方式来表示时间:1)时间戳 2)格式化的时间字符串 3)元组(struct_time)共九个元素.由于Python的time模块实现主 ...
- python3 random函数_Python3 中 random模块
Python3 中 random模块 Python中的random模块用于生成随机数. 下面具体介绍random模块的功能: 1.random.random() 用于生成一个0到1的 随机浮点数:0& ...
- python中search用法_Python3中正则模块re.compile、re.match及re.search函数用法详解
本文实例讲述了Python3中正则模块re.compile.re.match及re.search函数用法.分享给大家供大家参考,具体如下: re模块 re.compile.re.match. re.s ...
- python threading join_Python中threading模块join函数用法实例分析
本文实例讲述了Python中threading模块join函数用法.分享给大家供大家参考.具体分析如下: join的作用是众所周知的,阻塞进程直到线程执行完毕.通用的做法是我们启动一批线程,最后joi ...
- Lua中的模块与module函数详解
很快就要开始介绍Lua里的"面向对象"了,在此之前,我们先来了解一下Lua的模块. 1.编写一个简单的模块 Lua的模块是什么东西呢?通常我们可以理解为是一个table,这个tab ...
最新文章
- protobuf message定义_ProtoBuf 协议设计与开发
- 在狮驼岭,孙悟空救了猪八戒,猪八戒为何不帮孙悟空?
- 再分享 5 个 vs 调试技巧
- 【iVX 初级工程师培训教程 10篇文拿证】09 聊天室制作
- pycharm引入其他目录的包报错,import报错
- Go embed 简明教程
- zz:NETCONF协议详解
- Java手写线程池(不带返回值、带返回值)
- cmake的一些小经验
- JDBC05 ResultSet结果集
- [转自华尔街的强帖]怎样才能嫁给有钱人
- python-83:公务员时间源码 V-0.1
- wowza 降低延迟
- (十)DSP28335基础教程——ECAP实验(超声波测距)
- 二手交易app manifest.xml
- 对webkit-font-smoothing和-moz-osx-font-smoothing的理解
- Ubuntu下添加开机启动项的2种方法
- android多个module打包aar,Android 多 Module 合并打包 AAR
- 怎样买保险才不会被坑?用亲身经历告诉你!
- springboot整合websocket异常集合
热门文章
- linux+tar+man,Linux常用命令
- pandas判断dataframe中一列是否为日期格式
- pandas从dataframe中选择部分行、列
- Neo4j配置安装与测试
- java实现马尔科夫链_java – 马尔可夫链文本生成
- python范围运算符_Python的海象运算符
- js中自执行函数(function(){})()和(function(){}())区别
- PyCharm Active Code Generator
- mysql_用户_操作
- angularjs ui-router