用Python实现ModbusTcp协议
用Python实现ModbusTcp协议
去年2021年写了两篇用Python实现Modbus-RTU串行通信协议的文章,今年有个项目用现场上位计算机通过ModbusTcp网关来读写现场的Modbus-RTU协议的仪表设备和IO设备。我则写了一个Python程序来测试采购的这台ModbusTcp网关设备。
首先是需要设置一下这台ModbucTcp网关,其IP地址是192.168.16.253,ip地址暂时就不动了,将自己计算机的网卡IP地址改成同一网段下的192.168.16.10。打开浏览器进入网关的设置界面。这是一台4个RS485口,1个100M以太网口的ModbusTcp网关,我就测试第一个485口,在第一个485口上连接了一台Modbus-RTU协议仪表。打开网关设置页面,将此485口的工作模式设置为ModbusTcp服务器端,端口号是502,502端口是国际认可的ModbusTcp协议使用端口,当然也可以使用其它端口。将此网关485口的串行参数设定的和仪表一致即,9600,8,N,1。具体设置不同品牌的ModbusTcp网关不太一样请自行参考使用手册。
我们知道ModbusTcp协议是基于TCP/IP协议的现场总线协议,故要实现改协议则要使用socket编程。关于socket编程推荐这篇文章。更详细的手册则这篇翻译的手册很不错。在本文中,上位机作为TCP协议的客户端,ModbusTcp网关则作为TCP协议的服务器端(502端口)。由上位机客户端向网关发起连接请求,连接后使用ModbusTcp协议规定的数据帧格式进行通信。关于ModbucTcp协议的帧格式可以参考官方文档,其实就是在Modbus-RTU协议基础上增加了MBAP头,去掉了CRC校验(TCP协议自身已有校验)。下面我们用Python来实现ModbusTcp协议的客户端代码。
tcp客户端连接ModbusTcp网关函数
def connectserver(ip, port):try:mysocket = socket.socket()mysocket.settimeout(10)# mysocket.bind(("192.168.16.11",5000))ret = mysocket.connect((ip,port))if ret == socket.error:# print("Connect ModbusTcp server fail!")return Noneelse:# print("Connect ModbusTcp server sucess!")return mysocket;except Exception as e:logging.debug(e)return None
改函数有两个参数,一个是网关的ip地址,一个是端口号,成功则返回tcp客户端的socket对象实例,失败则返回None。此函数用于测试开始时连接网关时使用。
ModbusTcp协议的03和04号功能的发送帧打包和接收帧解析函数
03和04功能的发送帧打包函数
# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def modbus03or04s(add, startregadd, regnum, funcode=3):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:print("Error: parameter error")returnif funcode != 3 and funcode != 4:print("Error: parameter error")return# MBAP的实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):# print(f"{b:02x}")return sendbytes
此函数有4个参数,分别是Modbus-RTU设备的从站地址,开始寄存器地址,寄存器个数,功能号(03或04)默认03。函数根据几个参数构成ModbusTcp协议的发送帧,以字节串类型返回给调用者。发送帧由MBAP头和PDU两部分构成,MBAP头由7个字节构成,第1,2两个字节为一个帧标识,是一个两字节的随机整数用来标识此发送帧(字节顺序是高字节在前低字节在后),返回帧的MBAP头的第1,2两个字节和此标识一致则说明返回帧是此发送帧的响应帧。MBAP头的第3,4字节里是协议类型标识,都是0表示是Modbus协议。MBAP头的第5,6字节是第6字节后所有数据的字节个数,对于03和04功能号应该是6个字节。MBAP头的第7字节是如果发送对象是Modbus-RTU设备,这个字节是Modbus-RTU设备的地址。从第7字节开始其实就是ModBus-RTU协议的去掉CRC校验的部分,即:从站地址+PDU。
03或04功能的接收帧解析函数
# Modbus协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def modbus03or04p(recvdata, valueformat=0, intsigned=False):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x3 and datalist[7] != 0x4:print("Error: recv data funcode error")returnbytenums = datalist[8]if bytenums % 2 != 0:print("Error: recv data reg data error")returnretdata = []if valueformat == 0:floatnums = bytenums / 4# print("float nums: ", str(floatnums))floatlist = [0, 0, 0, 0]for i in range(int(floatnums)):floatlist[1] = datalist[9+i*4]floatlist[0] = datalist[10+i*4]floatlist[3] = datalist[11+i*4]floatlist[2] = datalist[12+i*4]bfloatdata = bytes(floatlist)[fvalue] = struct.unpack('f', bfloatdata)retdata.append(fvalue)# print(f'Data{i+1}: {fvalue:.3f}')elif valueformat == 1:shortintnums = bytenums / 2# print("short int nums: ", str(shortintnums))for i in range(int(shortintnums)):btemp = recvdata[9+i*2:11+i*2]shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)retdata.append(shortvalue)# print(f"Data{i+1}: {shortvalue}")return retdata
此函数将03或04功能号的返回帧解析为单精度浮点数或短整型数。解析单精度数时,是按照2,1,4,3则字节顺序解析,这是现场设备用的浮点数字节顺序,一般仪表常用的是4,3,2,1顺序。此函数有3个参数,第1个是socket接收到完整帧数据。第2个参数是解析出数据的格式,0代表单精度数,1代表短整型数。第3个参数是当第2个参数为1时,短整型数是有符号还是无符号。此函数的返回一个列表,里面是读取的寄存器数据值。
ModbusTcp协议的01或02功能发送帧打包和接收帧解析函数
和03或04功能的打包和解析函数差不多就是Modbus协议的功能号之间的区别,01功能是读取线圈寄存器值,02功能是读取数字量输入寄存器值,读回的数据是用位bit表示一个寄存器值是0或1,具体参考Modbus文档吧,直接上代码。
# modbus的01或02功能号命令打包函数
def modbus01or02s(add, startregadd, regnum, funcode=2):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D0:print("Error: parameter error")returnif funcode != 1 and funcode != 2:print("Error: parameter error")return# MBAP实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):# print(f"{b:02x}")return sendbytes# modbus的01或02功能号的返回包解析函数
def modbus01or02p(recvdata):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x1 and datalist[7] != 0x2:print("Error: recv data funcode error")returnbytenums = datalist[8]ret_data = []for i in range(bytenums):intvalue = int(recvdata[9+i])for bit in range(8):nowvalue = intvalue & 0x01intvalue = intvalue >> 1ret_data.append(nowvalue)return ret_data
读取Modbus寄存器数值函数
使用socket来通过ModbusTcp网关读取Modbus-RTU从站设备的数据。有两个函数,一个是用03或04功能号读取保持寄存器或输入寄存器的函数,一个是用01或02功能号读取线圈寄存器和数字量寄存器的函数。
# 读取仪表数据并解析返回
def readmeterdata(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus03or04s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus03or04p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}") return# 读取仪表数据并解析返回
def readmeterdata2(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus01or02s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus01or02p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}") return
完整代码
这样基本构成函数都有了。然后就是在主程序中调用以上函数。下面是完整代码。
完整代码里使用了rich库,用于在终端构造一个实时数据表格进行显示,相应内容请参考我的另一篇文章:用Python实现Modbus-RTU协议及串口调试(二)
# ModbusTcp协议客户端模块import socket
import random
import struct
from rich.console import Console
from rich.table import Column, Table
from rich.live import Live
from rich.panel import Panel
import time, sys
import loggingLOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(filename='modbustcp.log', level=logging.DEBUG, format=LOG_FORMAT)def connectserver(ip, port):try:mysocket = socket.socket()mysocket.settimeout(10)# mysocket.bind(("192.168.16.11",5000))ret = mysocket.connect((ip,port))if ret == socket.error:# print("Connect ModbusTcp server fail!")return Noneelse:# print("Connect ModbusTcp server sucess!")return mysocket;except Exception as e:logging.debug(e)return None# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def modbus03or04s(add, startregadd, regnum, funcode=3):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:print("Error: parameter error")returnif funcode != 3 and funcode != 4:print("Error: parameter error")return# MBAP的实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):# print(f"{b:02x}")return sendbytes# Modbus协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def modbus03or04p(recvdata, valueformat=0, intsigned=False):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x3 and datalist[7] != 0x4:print("Error: recv data funcode error")returnbytenums = datalist[8]if bytenums % 2 != 0:print("Error: recv data reg data error")returnretdata = []if valueformat == 0:floatnums = bytenums / 4# print("float nums: ", str(floatnums))floatlist = [0, 0, 0, 0]for i in range(int(floatnums)):floatlist[1] = datalist[9+i*4]floatlist[0] = datalist[10+i*4]floatlist[3] = datalist[11+i*4]floatlist[2] = datalist[12+i*4]bfloatdata = bytes(floatlist)[fvalue] = struct.unpack('f', bfloatdata)retdata.append(fvalue)# print(f'Data{i+1}: {fvalue:.3f}')elif valueformat == 1:shortintnums = bytenums / 2# print("short int nums: ", str(shortintnums))for i in range(int(shortintnums)):btemp = recvdata[9+i*2:11+i*2]shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)retdata.append(shortvalue)# print(f"Data{i+1}: {shortvalue}")return retdata # modbus的01或02功能号命令打包函数
def modbus01or02s(add, startregadd, regnum, funcode=2):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D0:print("Error: parameter error")returnif funcode != 1 and funcode != 2:print("Error: parameter error")return# MBAP实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):# print(f"{b:02x}")return sendbytes# modbus的01或02功能号的返回包解析函数
def modbus01or02p(recvdata):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x1 and datalist[7] != 0x2:print("Error: recv data funcode error")returnbytenums = datalist[8]ret_data = []for i in range(bytenums):intvalue = int(recvdata[9+i])for bit in range(8):nowvalue = intvalue & 0x01intvalue = intvalue >> 1ret_data.append(nowvalue)return ret_data# 读取仪表数据并解析返回
def readmeterdata(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus03or04s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus03or04p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}") return# 读取仪表数据并解析返回
def readmeterdata2(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus01or02s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus01or02p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}") return def generate_table(regdata, nowdata) -> Table:table = Table(show_header=True, header_style="bold magenta")table.add_column("No", width = 4)table.add_column('RegAdd', width=12)table.add_column("Data", width=12)for i in range(len(nowdata)):table.add_row("[red]"+str(i + 1), f"[yellow]{regdata[i]}", f"[green]{nowdata[i]:.3f}")return table if __name__ == "__main__":funcode = 2slaveadd = 2startreg = 1regnums = 15serverip = "192.168.16.253"serverport = 502regStartName = 40001logging.debug("Modbus/Tcp Start!")funcode = int(input("Modbus功能号(01或02或03或04):"))slaveadd = int(input("Modbus从站地址:"))startreg = int(input("开始寄存器地址:"))regnums = int(input("寄存器个数:"))# 连接MODBUSTCP服务器mysocket = connectserver(serverip, serverport)if not mysocket:print("Connect MoudbusTcp Server Fail!")else:# 读取寄存器数据值,用rich模块的表格实时显示数据,没有数据则模拟随机数据if funcode == 3 or funcode == 4 :if funcode == 3:regStartName = 40001else:regStartName = 30001now_data = readmeterdata(mysocket, slaveadd, startreg, regnums)if not now_data:now_data = []for i in range(int(regnums/2)):value = random.random() * 100now_data.append(value)readnums = 10errnums = 0regdata = [ regStartName+startreg+reg*2 for reg in range(int(regnums/2)) ]with Live(generate_table(regdata, now_data), refresh_per_second=4) as live:for _ in range(readnums):time.sleep(0.4)now_data = readmeterdata(mysocket, slaveadd, startreg, regnums)if not now_data:now_data = []for i in range(int(regnums / 2)):value = random.random() * 100now_data.append(value)errnums += 1live.update(generate_table(regdata, now_data))# print(f"\nread nums={readnums}, err nums={errnums}")console = Console()strmsg = f"读取次数={readnums}, 错误次数={errnums}"console.print(Panel("[yellow]" + strmsg, title="通信统计"))if funcode == 1 or funcode == 2 :if funcode == 1:regStartName = 0else:regStartName = 10000now_data = readmeterdata2(mysocket, slaveadd, startreg, regnums)if not now_data:for x in range(regnums):now_data.append(0)else:if len(now_data) > regnums:now_data = now_data[:regnums] readnums = 15errnums = 0regdata = [ regStartName+startreg+reg for reg in range(int(regnums)) ]with Live(generate_table(regdata, now_data), refresh_per_second=4) as live:for _ in range(readnums):time.sleep(0.4)now_data = readmeterdata2(mysocket, slaveadd, startreg, regnums)if not now_data:errnums += 1for x in range(regnums):now_data.append(0)else:if len(now_data) > regnums:now_data = now_data[:regnums]live.update(generate_table(regdata, now_data))console = Console()strmsg = f"读取次数={readnums}, 错误次数={errnums}"console.print(Panel("[yellow]" + strmsg, title="通信统计"))mysocket.close()
码字不易,如果本文对您有用请随手点个赞,谢谢!^_^
用Python实现ModbusTcp协议相关推荐
- python语言编写的modbus协议_基于Python的ModbusTCP客户端实现
Modbus协议是由Modicon公司(现在的施耐德电气Schneider Electric)推出,主要建立在物理串口.以太网TCP/IP层之上,目前已经成为工业领域通信协议的业界标准,广泛应用在工业 ...
- 基于Python的ModbusTCP客户端实现
Modbus协议是由Modicon公司(现在的施耐德电气Schneider Electric)推出,主要建立在物理串口.以太网TCP/IP层之上,目前已经成为工业领域通信协议的业界标准,广泛应用在工业 ...
- 通过python基于netconf协议获取网络中网元的配置数据,助力企业网络控制自动化轻松实现!
摘要:在当今信息化时代,大多数企业都需要网络支撑企业的ICT运行,提升企业运行效率,针对企业网络中的网元设备(包括交换机,路由器,防火墙等),很多企业希望根据自身的业务特点定制网络管理,比如可以实现网 ...
- 从入门到入土:基于Python采用TCP协议实现通信功能的程序
此博客仅用于记录个人学习进度,学识浅薄,若有错误观点欢迎评论区指出.欢迎各位前来交流.(部分材料来源网络,若有侵权,立即删除) 本人博客所有文章纯属学习之用,不涉及商业利益.不合适引用,自当删除! 若 ...
- python 下载文件-Python实现HTTP协议下的文件下载方法总结
本文介绍了几种常用的python下载文件的方法,具体使用到了htttplib2,urllib等包,希望对大家有帮忙. 1.简单文件下载 使用htttplib2,具体代码如下: h = httplib2 ...
- python应用系列教程——python使用smtp协议发送邮件:html文本邮件、图片邮件、文件附件邮件
全栈工程师开发手册 (作者:栾鹏) python教程全解 python使用smtp协议发送电子邮件.包含发送html文本邮件.包含图片附件的邮件,包含其他文件附件的邮件.可设置邮件的收发人,主题,内容 ...
- 如何快速实现西门子S7-200/300 PLC转Modbus-TCP协议与第三方数据对接
如何快速实现西门子S7-200/300 PLC转Modbus-TCP协议与第三方数据对接 引言 西门子 SIMATIC 自动化控制系统在工业控制市场应用相当广泛,凭借其安全可靠性.全集成产品线和优异的 ...
- Python通过MQTT协议上传物联网数据给ThingsBoard
第一步: 用租户账号登录,配置ThingsBoard 设备,使用MQTT协议 第二步: 参考这篇文章,使用MQTT客户端 发消息给ThingsBoard 主要是一个username的参数 第三步: 使 ...
- ModbusTCP协议报文解析
ModbusTCP协议报文解析 报文格式 交互(通信)标识:2个字节 为此次通信事务处理标识符,一般每次通信之后将被要求加1以区别不同的通信数据报文. 协议标识:2个字节 表示该条指令遵循Modbus ...
最新文章
- 高校人工智能热的“冷”思考
- 安卓开发网络资源汇总
- 用 Go 语言实现 Raft 选主
- 小白教你一步一步安装Scrapy(西瓜皮)(带图带资源)
- spark从hbase读取写入数据
- Node.js构建可扩展的Web应用1
- 全局事件总线 (GlobalEventBus)
- 对Spring Boot还陌生吗?
- Lua4.0 实现#操作,获取table大小
- 2019-12-17 drivers/clocksource/arm_arch_timer.c
- Java多线程之同步与阻塞队列
- C++ 函数其实可以不用写返回值的声明!
- SqlServer 2008出现远程过程调用失败,错误代码[Ox800706be]
- 《数据通信与网络》笔记--广域网SONET/SDH
- 解决mount.nfs: /home/xxxx/mpi-install is busy or already mounted问题
- 【Cesium】加载互联网地图服务——天地图
- Linux命令之dos2unix
- “住过一晚两万的ICU后,我还是建议你不要轻易买保险”
- 英雄联盟 无法服务器未响应,Win7英雄联盟登陆服务器未响应的解决方法
- 建筑工程单位材料成本设计管理软件系统