在python3.5中使用serial模块进行串口通信,通过while 1循环进行持续性的设备通信,但是第一个设备完成之后对下一个设备进行通信会有问题:Attempting to use a port that is not open

一下是我的代码:

import serial

import time

import base64

import hmac

import pymysql

from esptool import ESP8266StubLoader

from esptool import erase_flash, read_mac, write_flash

from log import TNLog

values = ['0x01000', 'D:\esp8266\user1.4096.new.4(3).bin']

values = ['0x01000', '/home/pi/Interface/Anthink/esp8266/user1.4096.new.4(3).bin']

v1 = ['0x00000', 'D:\esp8266\boot_v1.7.bin']

v1 = ['0x00000', '/home/pi/Interface/Anthink/esp8266/boot_v1.7.bin']

v2 = ['0x3fb000', 'D:\esp8266\blank.bin']

v2 = ['0x3fb000', '/home/pi/Interface/Anthink/esp8266/blank.bin']

v3 = ['0x3fe000', 'D:\esp8266\blank.bin']

v3 = ['0x3fe000', '/home/pi/Interface/Anthink/esp8266/blank.bin']

v4 = ['0x3fc000', 'D:\esp8266\esp_init_data_default_v08.bin']

v4 = ['0x3fc000', '/home/pi/Interface/Anthink/esp8266/esp_init_data_default_v08.bin']

argfile = open(values[1], 'rb')

a1 = open(v1[1], 'rb')

a2 = open(v2[1], 'rb')

a3 = open(v3[1], 'rb')

a4 = open(v4[1], 'rb')

db = pymysql.connect("192.168.0.250", "dev_v7", "vvvvvv7", "db_iot_v7", charset='utf8')

cursor = db.cursor()

logger = TNLog()

prorts = serial.serial_for_url('COM3')

prorts = serial.serial_for_url('COM3')

prorts.baudrate = 115200

s = 1019300100006626 # 扫码得到的设备SN

自定义属性值

class WriteOpthons(object):

def init(self, **kwargs):

for k, v in kwargs.items():

setattr(self, k, v)

设备初始化

class MyClass(object):

def init(self):

self._port = prorts

self._trace_enabled = False

logger.info('接口初始化已经完成:{}'.format(self._port))

设备调试

class SerialPort():

logger.info('现在开始进行调试硬件设备操作!')

message = ''

def connectport(self, com, baud):

try:

super(SerialPort, self).__init__()

self.port = serial.Serial(com, baud)

try:

if self.port.isOpen():

logger.info('已经连接OK:blfadd:connect port is ok')

print("blfadd:connect port is ok")

return 1

except Exception as e: # 捕获异常输出并终止运行

logger.error('硬件连接失败:{}'.format(e))

return 0

raise

except Exception as e:

logger.error('硬件调试连接操作出错:{}'.format(e))

def restart(self):

self.port.write()

def restart(self):

self.port.setDTR(True) # 0wei gao

self.port.setRTS(True)

time.sleep(1.5)

self.port.setDTR(True)

self.port.setRTS(False)

def setDTR(self, state):

self.port.setDTR(state)

def read_data(self):

global timeStamp

test1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

start_time = time.strptime(test1, "%Y-%m-%d %H:%M:%S")

# strftime是转换为特定格式输出,而strptime是将一个(时间)字符串解析为时间的一个类型对象。一个是按照想要的格式,去转换。重点是格式!另外一个不管什么格式,我只要把特定的时间字符串转成时间类型即可!

print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

while True:

self.message = self.port.readline().decode('utf8', 'ignore')

test2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

end_time = time.strptime(test2, "%Y-%m-%d %H:%M:%S")

timeStamp = int(time.mktime(end_time)) - int(time.mktime(start_time))

if (self.message[0:10] == "uart begin" and timeStamp <= 80):

return 1

elif (timeStamp > 90):

raise Exception("blfadd:Time out")

def getwifiinfo(self, query_command, catchnum): # catchnum增加参数为捕获的wifi数量

try:

global count # 定义变量couunt用来计数

global macId

count = 0

self.port.write((query_command + '\n').encode())

huixian = self.port.readline().decode('utf8', 'ignore')

while (huixian[0:8] == "scandone"):

continue

while 100:

m = self.port.readline()

data_2 = m.decode('utf8', 'ignore')

data = m.decode('utf8', 'ignore')[0:-3]

if (data[0:5] == "data="): # 只读取开头为data=的信息

data_1 = data[6:]

# m1=data_1.split(',')[0:1]

m2 = data_1.split(',')[1:2]

m3 = data_1.split(',')[2:3]

m4 = data_1.split(',')[3:4]

# m5=data_1.split(',')[4:5]

wifiName = str(m2[0]).replace('"', '') # 替换掉“”

macId = str(m4[0]).replace('"', '')

# cursor =db.cursor()

count += 1 # if 语句每执行一次计数器加1

# 下面将wifi信息插入到数据库

# sql = """INSERT INTO WIFIDATA(INFO_1,INFO_2,INFO_3,INFO_4,INFO_5) VALUES(%s,%s,%s,%s,%s)"""

# try:

# val=(m1[0], wifiName, m3[0],macId, m5[0] )

# print("1111"+str(val))

# cursor.execute(sql,val)

# db.commit()

# except Exception as e:

# print("Error!"+e)

# db.rollback()

# db.close()

# print(m1[0],wifiName,m3[0],macId,m5[0])

print(wifiName, m3[0], macId) # blfalter:不展示搜索到的wifi

logger.info('获取wifi的信息:{}-{}-{}'.format(wifiName, m3[0], macId))

if (data_2[0:8] == "data_end"): # 这里没有进入if语句的原因是前面data收集信息时已经把

break # data_end截取掉了改用data_2收信息

if (count < catchnum): # 当前wifi数量与阈值作比

print("blfadd:Cannot catch nomal wifi singal") # 如果小于阈值将输出未能捕获wifi信号

elif (count >= catchnum):

logger.info('wifi模块完美检查完毕')

print("blfadd:Wifi Module checks ok")

print("blfadd:Now WifiNum:%d" % count) # 输出当前搜索到的wifi总数量

logger.info('搜索到的wifi个数:{}'.format(count))

return count

except Exception as e:

logger.error('硬件获取wifi操作报错:{}'.format(e))

def getcpuid(self, query_command):

try:

global cpuid

self.port.write((query_command + '\n').encode())

while 1:

m = self.port.readline()

data_2 = m.decode('utf8', 'ignore')

data = m.decode('utf8', 'ignore')[0:-2]

if (data[0:4] == "rec="): # 只读取开头为data=的信息

cpuid = data[4:]

if (data_2[0:16] == "command run over"):

return cpuid

except Exception as e:

logger.error('硬件获取cpuid操作报错:{}'.format(e))

def getmemid(self, query_command):

try:

global memid

self.port.write((query_command + '\n').encode())

while 1:

m = self.port.readline()

data_2 = m.decode('utf8', 'ignore')

data = m.decode('utf8', 'ignore')[0:-2]

if (data[0:4] == "rec="): # 只读取开头为data=的信息

memid = data[4:]

if (data_2[0:16] == "command run over"):

return memid

except Exception as e:

logger.error('硬件获取memid操作报错:{}'.format(e))

def ramdomId(self,s): # 获取20位产品序列号devicekey

res = {}

try:

sql = "SELECT * FROM device_status WHERE device = %s"

device_info = cursor.execute(sql, (s,))

if device_info:

devicekey = cursor.fetchone()[1]

# global devicekey

# devicekey = random.randint(10000000000000000000, 99999999999999999999) # random.sample()生成不相同的随机数

logger.info('数据库中与设备的SN匹配:{}'.format(devicekey))

sql = "UPDATE device_status SET `status` = 2 WHERE device = %s"

cursor.execute(sql, (s,))

db.commit()

logger.info('更改SN状态为2成功!!')

res.update({'status': 200, 'devicekey': devicekey})

# return str(devicekey)

else:

logger.info('SN与数据库中的SN扫描结果不一致')

res.update({'status': 403})

except Exception as e:

logger.info('硬件获取SN操作报错:{}'.format(e))

finally:

return res

def generate_token(self, key, expire):

r'''

@Args:

key: str (用户给定的key,需要用户保存以便之后验证token,每次产生token时的key 都可以是同一个key)

expire: int(最大有效时间,单位为s)

@Return:

state: str

'''

try:

ts_str = str(time.time() + expire)

ts_byte = ts_str.encode("utf-8")

sha1_tshexstr = hmac.new(key.encode("utf-8"), ts_byte, 'sha1').hexdigest()

# hmac是密钥相关的哈希运算消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出。

token = ts_str + ':' + sha1_tshexstr

b64_token = base64.urlsafe_b64encode(token.encode("utf-8")) # Base64是一种用64个字符来表示任意二进制数据的方法

return b64_token.decode("utf-8")

except Exception as e:

logger.error('硬件生成token操作报错:{}'.format(e))

def certify_token(self, key, token):

r'''

@Args:

key: str

token: str

@Returns:

boolean

'''

try:

token_str = base64.urlsafe_b64decode(token).decode('utf-8')

token_list = token_str.split(':')

if len(token_list) != 2:

return 0

ts_str = token_list[0]

if float(ts_str) < time.time():

print("blfadd222222222222222>") # bldadd buzou

# token expired

return 0

known_sha1_tsstr = token_list[1]

sha1 = hmac.new(key.encode("utf-8"), ts_str.encode('utf-8'), 'sha1')

calc_sha1_tsstr = sha1.hexdigest()

if calc_sha1_tsstr != known_sha1_tsstr:

# token certification failed

print("Not ok") # 不走这一步

return 0

print("ok") # bldadd zou

return 1

except Exception as e:

logger.error('硬件验证token操作报错:{}'.format(e))

def setdev(self, query_command):

try:

self.port.write((query_command).encode())

while 1:

m = self.port.readline()

data = m.decode('utf8', 'ignore')

if (data[0:16] == "command run over"):

break

except Exception as e:

logger.error('硬件TOKEN/SN写入板子操作报错:{}'.format(e))

def setflag(self, query_command):

try:

self.port.write((query_command + '\n').encode())

while 1:

m = self.port.readline()

data = m.decode('utf8', 'ignore')

if (data[0:16] == "command run over"):

break

except Exception as e:

logger.error('硬件完成调试操作报错:{}'.format(e))

def getwifiname1(self, query_command, Abvalue, valuechange):

try:

# wifiName

global Intensity

self.port.write((query_command + '\n').encode())

huixian = self.port.readline().decode('utf8', 'ignore')

while (huixian[0:8] == "scandone"):

continue

while 100:

m = self.port.readline()

data_2 = m.decode('utf8', 'ignore')

data = m.decode('utf8', 'ignore')[0:-3]

if (data[0:5] == "data="): # 只读取开头为data=的信息

data_1 = data[6:]

m2 = data_1.split(',')[1:2]

m3 = data_1.split(',')[2:3]

m4 = data_1.split(',')[3:4]

Intensity = m3[0]

wifiName = str(m2[0]).replace('"', '') # 替换掉“”

if (wifiName == "XunMingIOT" and Abvalue + valuechange >= int(

m3[0]) >= Abvalue - valuechange): # 判断输出wifi信号强度值

print("blfadd:Standard Intensity:" + m2[0], m3[0], m4[0])

return m2[0], m3[0], m4[0]

elif (wifiName == "XunMingIOT" and Abvalue - valuechange > int(m3[0])):

print("blfadd:Low Intensity:" + m2[0], m3[0], m4[0]) # 输出wifi名称

return m2[0], m3[0], m4[0]

elif (wifiName == "XunMingIOT"):

print("blfadd:High Intensity:" + m2[0], m3[0], m4[0])

return m2[0], m3[0], m4[0]

if (data_2[0:8] == "data_end"): # 检测到data end结尾时候跳出循环

break

except Exception as e:

logger.error('硬件获取wifi操作报错:{}'.format(e))

# def mysql_hardinfo(self):

# key = s

# cpu = cpuid

# mac = macId

# mem = memid

# wificount = count

# time = timeStamp

# wifivalue = Intensity

# return key, mac, cpu, mem, wificount, wifivalue, time

主函数

def _main():

while 1:

count = 0

s = input('请输入SN号:').strip()

try:

obj = ESP8266StubLoader(obA)

except Exception as e:

print(type(e))

print(e)

pass

continue

obj.connect()

read_mac(obj, None) # mac地址

obj.IS_STUB = False

esp = obj.run_stub()

print('>>>>>>>>>>>>>>>>>>')

print(obj)

print(prorts)

print(esp)

res = erase_flash(esp, None) # 擦除操作

if res['status'] == int(404):

logger.info('设备擦除命令执行失败,失败的原因是:擦除任务超时')

logger.info('接口已经关闭')

count = count+1

if count < 3:

logger.info('现在设备执行次数为:{}'.format(count))

return _main()

else:

logger.info('设备已经执行3次还是有问题现在怀疑是设备硬件的问题!')

obj._port.close()

prorts.close()

prorts.isOpen()

else:

# 下载文件操作

write_option = WriteOpthons(

addr_filename=[(4096, argfile), (0, a1), (4173824, a2), (4186112, a3), (4177920, a4)],

after='hard_reset', baud=115200, before='default_reset', chip='auto', compress=None,

erase_all=False, flash_freq='40m', flash_mode='dio', flash_size='4MB',

no_compress=False, no_progress=False, no_stub=False, operation='write_flash',

override_vddsdio=None, port='COM3', spi_connection=None, trace=False, verify=False)

res = write_flash(esp=esp, args=write_option)

if res:

logger.info('文件下载完成')

obj._port.close()

prorts.close()

print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')

nb = prorts.isOpen() # False 关闭端口了

print(nb)

else:

logger.info('文件下载失败')

obj._port.close()

prorts.close()

prorts.isOpen()

# 调试命令操作

f1 = SerialPort() # f1为实例化接口

f1.connectport('COM3', 74880)

f1.read_data() # 初始化串口返回串口信息 # 1

f1.getwifiinfo('command: getwifiinfo', 2) # 读取wifi信息并插入数据库

f1.getwifiname1('command: getwifiinfo', -50, 4) # 读取wifi信息并插入数据库

cpuid = f1.getcpuid('command: getcpuid')

logger.info('设备cpuid:{}'.format(cpuid))

memid = f1.getmemid('command: getmemid')

logger.info('设备memid是:{}'.format(memid))

serial_number = f1.ramdomId(s)

if serial_number['status'] == 403:

logger.info('获取sn不一致导致串口关闭')

f1.port.close()

prorts.close()

else:

logger.info('生成产品SN:{}'.format(serial_number['devicekey']))

key = cpuid + memid # 使用cpuid与memid拼接作为key

token = f1.generate_token(key, 3600) # 生成token

f1.certify_token(key, token) # 验证token成功返回ok

token_str = token[20:60]

logger.info('依据cpuid+memid生成的token值为:{}'.format(token_str))

f1.setdev('command: settockenkey' + str(token_str))

logger.info('token已经写入到板子之中')

f1.setdev('command: setdevkey' + str(serial_number['devicekey']))

logger.info('SN已经写入到板子之中了')

f1.setflag('command: setflag')

logger.info('调试已经完成')

f1.port.close() # 关闭串口

# prorts.close()

ss = f1.port.isOpen()

print(ss) # 串口已经关闭

print('<>')

# print(prorts.is_open())

# obj._port.close()

logger.info('串口已经关闭!')

print(obj._port.isOpen())

obj._port.open()

print(obj._port.isOpen())

continue

if name == 'main':

while 1:

# obA = MyClass()

class MyClass(object):

def init(self):

self._port = prorts

self._trace_enabled = False

logger.info('接口初始化已经完成:{}'.format(self._port))

obA = MyClass()

_main()

# count = 0

# s = input('请输入SN号:').strip()

# try:

# obj = ESP8266StubLoader(obA)

# except Exception as e:

# print(type(e))

# print(e)

# pass

# continue

# obj.connect()

# read_mac(obj, None) # mac地址

# obj.IS_STUB = False

# esp = obj.run_stub()

#

# res = erase_flash(esp, None) # 擦除操作

# if res['status'] == int(404):

# logger.info('设备擦除命令执行失败,失败的原因是:擦除任务超时')

# logger.info('接口已经关闭')

# count = count + 1

# if count < 3:

# logger.info('现在设备执行次数为:{}'.format(count))

# # return _main()

# _main()

# else:

# logger.info('设备已经执行3次还是有问题现在怀疑是设备硬件的问题!')

# obj._port.close()

# prorts.close()

#

#

# else:

# # 下载文件操作

# write_option = WriteOpthons(

# addr_filename=[(4096, argfile), (0, a1), (4173824, a2), (4186112, a3), (4177920, a4)],

# after='hard_reset', baud=115200, before='default_reset', chip='auto', compress=None,

# erase_all=False, flash_freq='40m', flash_mode='dio', flash_size='4MB',

# no_compress=False, no_progress=False, no_stub=False, operation='write_flash',

# override_vddsdio=None, port='COM3', spi_connection=None, trace=False, verify=False)

# res = write_flash(esp=esp, args=write_option)

# if res:

# logger.info('文件下载完成')

# obj._port.close()

# prorts.close()

# print('<<<<<<<<<<<<<<<<<<<<<<<<<<<<

# print(prorts.isOpen())

# else:

# logger.info('文件下载失败')

# obj._port.close()

# prorts.close()

#

# # 调试命令操作

# f1 = SerialPort() # f1为实例化接口

# f1.connectport('COM3', 74880)

# f1.read_data() # 初始化串口返回串口信息 # 1

#

# f1.getwifiinfo('command: getwifiinfo', 2) # 读取wifi信息并插入数据库

# f1.getwifiname1('command: getwifiinfo', -50, 4) # 读取wifi信息并插入数据库

#

# cpuid = f1.getcpuid('command: getcpuid')

# logger.info('设备cpuid:{}'.format(cpuid))

#

# memid = f1.getmemid('command: getmemid')

# logger.info('设备memid是:{}'.format(memid))

#

# serial_number = f1.ramdomId(s)

# if serial_number['status'] == 403:

# logger.info('获取sn不一致导致串口关闭')

# prorts.close()

# else:

# logger.info('生成产品SN:{}'.format(serial_number['devicekey']))

#

# key = cpuid + memid # 使用cpuid与memid拼接作为key

# token = f1.generate_token(key, 3600) # 生成token

# f1.certify_token(key, token) # 验证token成功返回ok

# token_str = token[20:60]

# logger.info('依据cpuid+memid生成的token值为:{}'.format(token_str))

#

# f1.setdev('command: settockenkey' + str(token_str))

# logger.info('token已经写入到板子之中')

#

# f1.setdev('command: setdevkey' + str(serial_number['devicekey']))

# logger.info('SN已经写入到板子之中了')

#

# f1.setflag('command: setflag')

# logger.info('调试已经完成')

#

# f1.port.close() # 关闭串口

# # prorts.close()

# ss = f1.port.isOpen()

# print(ss) # 串口已经关闭

# print('<>')

# # print(prorts.is_open())

# # obj._port.close()

# logger.info('串口已经关闭!')

pythonserial函数_python3.5 中serial模块的问题相关推荐

  1. python中serial模块的使用_python中pyserial模块使用方法

    一.概述 pyserial模块封装了对串口的访问. 二.特性 在支持的平台上有统一的接口. 通过python属性访问串口设置. 支持不同的字节大小.停止位.校验位和流控设置. 可以有或者没有接收超时. ...

  2. 通过mem函数在MicroPython中访问模块寄存器

    简 介: 通过mem函数直接访问MCU内部的寄存器,可以完成一些在原来的MicroPython中内核没有实现的模块.通过测试可以看到,通过mem访问GPIO并没有明显增加访问的速度.使用mem访问CR ...

  3. python isalpha函数用法_python中string模块各属性以及函数的用法

    任何语言都离不开字符,那就会涉及对字符的操作,尤其是脚本语言更是频繁,不管是生产环境还是面试考验都要面对字符串的操作. python的字符串操作通过2部分的方法函数基本上就可以解决所有的字符串操作需求 ...

  4. python中的模块_python3.0中重载模块

    在python中,每一个以 .py结尾的Python文件都是一个模块.其他的文件可以通过导入一个模块来读取该模块的内容.导入从本质上来讲,就是载入另一个文件,并能够读取那个文件的内容.一个模块的内容通 ...

  5. python中time函数用法_python 中time模块使用

    在开始之前,首先要说明这几点: 1.在Python中,通常有这几种方式来表示时间:1)时间戳 2)格式化的时间字符串 3)元组(struct_time)共九个元素.由于Python的time模块实现主 ...

  6. python3 random函数_Python3 中 random模块

    Python3 中 random模块 Python中的random模块用于生成随机数. 下面具体介绍random模块的功能: 1.random.random() 用于生成一个0到1的 随机浮点数:0& ...

  7. python中search用法_Python3中正则模块re.compile、re.match及re.search函数用法详解

    本文实例讲述了Python3中正则模块re.compile.re.match及re.search函数用法.分享给大家供大家参考,具体如下: re模块 re.compile.re.match. re.s ...

  8. python threading join_Python中threading模块join函数用法实例分析

    本文实例讲述了Python中threading模块join函数用法.分享给大家供大家参考.具体分析如下: join的作用是众所周知的,阻塞进程直到线程执行完毕.通用的做法是我们启动一批线程,最后joi ...

  9. Lua中的模块与module函数详解

    很快就要开始介绍Lua里的"面向对象"了,在此之前,我们先来了解一下Lua的模块. 1.编写一个简单的模块 Lua的模块是什么东西呢?通常我们可以理解为是一个table,这个tab ...

最新文章

  1. protobuf message定义_ProtoBuf 协议设计与开发
  2. 在狮驼岭,孙悟空救了猪八戒,猪八戒为何不帮孙悟空?
  3. 再分享 5 个 vs 调试技巧
  4. 【iVX 初级工程师培训教程 10篇文拿证】09 聊天室制作
  5. pycharm引入其他目录的包报错,import报错
  6. Go embed 简明教程
  7. zz:NETCONF协议详解
  8. Java手写线程池(不带返回值、带返回值)
  9. cmake的一些小经验
  10. JDBC05 ResultSet结果集
  11. [转自华尔街的强帖]怎样才能嫁给有钱人
  12. python-83:公务员时间源码 V-0.1
  13. wowza 降低延迟
  14. (十)DSP28335基础教程——ECAP实验(超声波测距)
  15. 二手交易app manifest.xml
  16. 对webkit-font-smoothing和-moz-osx-font-smoothing的理解
  17. Ubuntu下添加开机启动项的2种方法
  18. android多个module打包aar,Android 多 Module 合并打包 AAR
  19. 怎样买保险才不会被坑?用亲身经历告诉你!
  20. springboot整合websocket异常集合

热门文章

  1. linux+tar+man,Linux常用命令
  2. pandas判断dataframe中一列是否为日期格式
  3. pandas从dataframe中选择部分行、列
  4. Neo4j配置安装与测试
  5. java实现马尔科夫链_java – 马尔可夫链文本生成
  6. python范围运算符_Python的海象运算符
  7. js中自执行函数(function(){})()和(function(){}())区别
  8. PyCharm Active Code Generator
  9. mysql_用户_操作
  10. angularjs ui-router