用Python实现ModbusTcp协议

去年2021年写了两篇用Python实现Modbus-RTU串行通信协议的文章,今年有个项目用现场上位计算机通过ModbusTcp网关来读写现场的Modbus-RTU协议的仪表设备和IO设备。我则写了一个Python程序来测试采购的这台ModbusTcp网关设备。

首先是需要设置一下这台ModbucTcp网关,其IP地址是192.168.16.253,ip地址暂时就不动了,将自己计算机的网卡IP地址改成同一网段下的192.168.16.10。打开浏览器进入网关的设置界面。这是一台4个RS485口,1个100M以太网口的ModbusTcp网关,我就测试第一个485口,在第一个485口上连接了一台Modbus-RTU协议仪表。打开网关设置页面,将此485口的工作模式设置为ModbusTcp服务器端,端口号是502,502端口是国际认可的ModbusTcp协议使用端口,当然也可以使用其它端口。将此网关485口的串行参数设定的和仪表一致即,9600,8,N,1。具体设置不同品牌的ModbusTcp网关不太一样请自行参考使用手册。
我们知道ModbusTcp协议是基于TCP/IP协议的现场总线协议,故要实现改协议则要使用socket编程。关于socket编程推荐这篇文章。更详细的手册则这篇翻译的手册很不错。在本文中,上位机作为TCP协议的客户端,ModbusTcp网关则作为TCP协议的服务器端(502端口)。由上位机客户端向网关发起连接请求,连接后使用ModbusTcp协议规定的数据帧格式进行通信。关于ModbucTcp协议的帧格式可以参考官方文档,其实就是在Modbus-RTU协议基础上增加了MBAP头,去掉了CRC校验(TCP协议自身已有校验)。下面我们用Python来实现ModbusTcp协议的客户端代码。

tcp客户端连接ModbusTcp网关函数

def connectserver(ip, port):try:mysocket = socket.socket()mysocket.settimeout(10)# mysocket.bind(("192.168.16.11",5000))ret = mysocket.connect((ip,port))if ret == socket.error:# print("Connect ModbusTcp server fail!")return Noneelse:# print("Connect ModbusTcp server sucess!")return mysocket;except Exception as e:logging.debug(e)return None

改函数有两个参数,一个是网关的ip地址,一个是端口号,成功则返回tcp客户端的socket对象实例,失败则返回None。此函数用于测试开始时连接网关时使用。

ModbusTcp协议的03和04号功能的发送帧打包和接收帧解析函数

03和04功能的发送帧打包函数

# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def modbus03or04s(add, startregadd, regnum, funcode=3):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:print("Error: parameter error")returnif funcode != 3 and funcode != 4:print("Error: parameter error")return# MBAP的实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):#     print(f"{b:02x}")return sendbytes

此函数有4个参数,分别是Modbus-RTU设备的从站地址,开始寄存器地址,寄存器个数,功能号(03或04)默认03。函数根据几个参数构成ModbusTcp协议的发送帧,以字节串类型返回给调用者。发送帧由MBAP头和PDU两部分构成,MBAP头由7个字节构成,第1,2两个字节为一个帧标识,是一个两字节的随机整数用来标识此发送帧(字节顺序是高字节在前低字节在后),返回帧的MBAP头的第1,2两个字节和此标识一致则说明返回帧是此发送帧的响应帧。MBAP头的第3,4字节里是协议类型标识,都是0表示是Modbus协议。MBAP头的第5,6字节是第6字节后所有数据的字节个数,对于03和04功能号应该是6个字节。MBAP头的第7字节是如果发送对象是Modbus-RTU设备,这个字节是Modbus-RTU设备的地址。从第7字节开始其实就是ModBus-RTU协议的去掉CRC校验的部分,即:从站地址+PDU。

03或04功能的接收帧解析函数

# Modbus协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def modbus03or04p(recvdata, valueformat=0, intsigned=False):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x3 and datalist[7] != 0x4:print("Error: recv data funcode error")returnbytenums = datalist[8]if bytenums % 2 != 0:print("Error: recv data reg data error")returnretdata = []if valueformat == 0:floatnums = bytenums / 4# print("float nums: ", str(floatnums))floatlist = [0, 0, 0, 0]for i in range(int(floatnums)):floatlist[1] = datalist[9+i*4]floatlist[0] = datalist[10+i*4]floatlist[3] = datalist[11+i*4]floatlist[2] = datalist[12+i*4]bfloatdata = bytes(floatlist)[fvalue] = struct.unpack('f', bfloatdata)retdata.append(fvalue)# print(f'Data{i+1}: {fvalue:.3f}')elif valueformat == 1:shortintnums = bytenums / 2# print("short int nums: ", str(shortintnums))for i in range(int(shortintnums)):btemp = recvdata[9+i*2:11+i*2]shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)retdata.append(shortvalue)# print(f"Data{i+1}: {shortvalue}")return retdata

此函数将03或04功能号的返回帧解析为单精度浮点数或短整型数。解析单精度数时,是按照2,1,4,3则字节顺序解析,这是现场设备用的浮点数字节顺序,一般仪表常用的是4,3,2,1顺序。此函数有3个参数,第1个是socket接收到完整帧数据。第2个参数是解析出数据的格式,0代表单精度数,1代表短整型数。第3个参数是当第2个参数为1时,短整型数是有符号还是无符号。此函数的返回一个列表,里面是读取的寄存器数据值。

ModbusTcp协议的01或02功能发送帧打包和接收帧解析函数

和03或04功能的打包和解析函数差不多就是Modbus协议的功能号之间的区别,01功能是读取线圈寄存器值,02功能是读取数字量输入寄存器值,读回的数据是用位bit表示一个寄存器值是0或1,具体参考Modbus文档吧,直接上代码。

# modbus的01或02功能号命令打包函数
def modbus01or02s(add, startregadd, regnum, funcode=2):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D0:print("Error: parameter error")returnif funcode != 1 and funcode != 2:print("Error: parameter error")return# MBAP实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):#     print(f"{b:02x}")return sendbytes# modbus的01或02功能号的返回包解析函数
def modbus01or02p(recvdata):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x1 and datalist[7] != 0x2:print("Error: recv data funcode error")returnbytenums = datalist[8]ret_data = []for i in range(bytenums):intvalue = int(recvdata[9+i])for bit in range(8):nowvalue = intvalue & 0x01intvalue = intvalue >> 1ret_data.append(nowvalue)return ret_data

读取Modbus寄存器数值函数

使用socket来通过ModbusTcp网关读取Modbus-RTU从站设备的数据。有两个函数,一个是用03或04功能号读取保持寄存器或输入寄存器的函数,一个是用01或02功能号读取线圈寄存器和数字量寄存器的函数。

# 读取仪表数据并解析返回
def readmeterdata(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus03or04s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus03or04p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}")        return# 读取仪表数据并解析返回
def readmeterdata2(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus01or02s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus01or02p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}")        return

完整代码

这样基本构成函数都有了。然后就是在主程序中调用以上函数。下面是完整代码。
完整代码里使用了rich库,用于在终端构造一个实时数据表格进行显示,相应内容请参考我的另一篇文章:用Python实现Modbus-RTU协议及串口调试(二)

# ModbusTcp协议客户端模块import socket
import random
import struct
from rich.console import Console
from rich.table import Column, Table
from rich.live import Live
from rich.panel import Panel
import time, sys
import loggingLOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(filename='modbustcp.log', level=logging.DEBUG, format=LOG_FORMAT)def connectserver(ip, port):try:mysocket = socket.socket()mysocket.settimeout(10)# mysocket.bind(("192.168.16.11",5000))ret = mysocket.connect((ip,port))if ret == socket.error:# print("Connect ModbusTcp server fail!")return Noneelse:# print("Connect ModbusTcp server sucess!")return mysocket;except Exception as e:logging.debug(e)return None# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def modbus03or04s(add, startregadd, regnum, funcode=3):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:print("Error: parameter error")returnif funcode != 3 and funcode != 4:print("Error: parameter error")return# MBAP的实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):#     print(f"{b:02x}")return sendbytes# Modbus协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def modbus03or04p(recvdata, valueformat=0, intsigned=False):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x3 and datalist[7] != 0x4:print("Error: recv data funcode error")returnbytenums = datalist[8]if bytenums % 2 != 0:print("Error: recv data reg data error")returnretdata = []if valueformat == 0:floatnums = bytenums / 4# print("float nums: ", str(floatnums))floatlist = [0, 0, 0, 0]for i in range(int(floatnums)):floatlist[1] = datalist[9+i*4]floatlist[0] = datalist[10+i*4]floatlist[3] = datalist[11+i*4]floatlist[2] = datalist[12+i*4]bfloatdata = bytes(floatlist)[fvalue] = struct.unpack('f', bfloatdata)retdata.append(fvalue)# print(f'Data{i+1}: {fvalue:.3f}')elif valueformat == 1:shortintnums = bytenums / 2# print("short int nums: ", str(shortintnums))for i in range(int(shortintnums)):btemp = recvdata[9+i*2:11+i*2]shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)retdata.append(shortvalue)# print(f"Data{i+1}: {shortvalue}")return retdata    # modbus的01或02功能号命令打包函数
def modbus01or02s(add, startregadd, regnum, funcode=2):if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D0:print("Error: parameter error")returnif funcode != 1 and funcode != 2:print("Error: parameter error")return# MBAP实现ranvalue = random.randint(0, 0xFFFF)sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)sendbytes = sendbytes + b"\x00\x00\x00\x06"sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)# PDU实现sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \regnum.to_bytes(2, byteorder="big", signed=False)# for b in list(sendbytes):#     print(f"{b:02x}")return sendbytes# modbus的01或02功能号的返回包解析函数
def modbus01or02p(recvdata):if not recvdata:print("Error: data error")returndatalist = list(recvdata)if datalist[7] != 0x1 and datalist[7] != 0x2:print("Error: recv data funcode error")returnbytenums = datalist[8]ret_data = []for i in range(bytenums):intvalue = int(recvdata[9+i])for bit in range(8):nowvalue = intvalue & 0x01intvalue = intvalue >> 1ret_data.append(nowvalue)return ret_data# 读取仪表数据并解析返回
def readmeterdata(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus03or04s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus03or04p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}")        return# 读取仪表数据并解析返回
def readmeterdata2(mysocket, meter_add, start_reg, reg_num):try:send_data = modbus01or02s(meter_add, start_reg, reg_num)if not send_data:print("读取命令处理错误!")returnstarttime = time.time()mysocket.send(send_data)recv_data = mysocket.recv(1024) #(reg_num*2+9)endtime = time.time()# print(f"Used time is {endtime-starttime:.3f}")if recv_data and len(recv_data) > 0:retdata = modbus01or02p(recv_data)if retdata:return retdataelse:returnelse:returnexcept Exception as e:# print(f"Exception : {e}")endtime = time.time()print(f"读取超时时间: {endtime-starttime:.3f}")        return   def generate_table(regdata, nowdata) -> Table:table = Table(show_header=True, header_style="bold magenta")table.add_column("No", width = 4)table.add_column('RegAdd', width=12)table.add_column("Data", width=12)for i in range(len(nowdata)):table.add_row("[red]"+str(i + 1), f"[yellow]{regdata[i]}", f"[green]{nowdata[i]:.3f}")return table        if __name__ == "__main__":funcode = 2slaveadd = 2startreg = 1regnums = 15serverip = "192.168.16.253"serverport = 502regStartName = 40001logging.debug("Modbus/Tcp Start!")funcode = int(input("Modbus功能号(01或02或03或04):"))slaveadd = int(input("Modbus从站地址:"))startreg = int(input("开始寄存器地址:"))regnums = int(input("寄存器个数:"))# 连接MODBUSTCP服务器mysocket = connectserver(serverip, serverport)if not mysocket:print("Connect MoudbusTcp Server Fail!")else:# 读取寄存器数据值,用rich模块的表格实时显示数据,没有数据则模拟随机数据if funcode == 3 or funcode == 4 :if funcode == 3:regStartName = 40001else:regStartName = 30001now_data = readmeterdata(mysocket, slaveadd, startreg, regnums)if not now_data:now_data = []for i in range(int(regnums/2)):value = random.random() * 100now_data.append(value)readnums = 10errnums = 0regdata = [ regStartName+startreg+reg*2 for reg in range(int(regnums/2)) ]with Live(generate_table(regdata, now_data), refresh_per_second=4) as live:for _ in range(readnums):time.sleep(0.4)now_data = readmeterdata(mysocket, slaveadd, startreg, regnums)if not now_data:now_data = []for i in range(int(regnums / 2)):value = random.random() * 100now_data.append(value)errnums += 1live.update(generate_table(regdata, now_data))# print(f"\nread nums={readnums},  err nums={errnums}")console = Console()strmsg = f"读取次数={readnums}, 错误次数={errnums}"console.print(Panel("[yellow]" + strmsg, title="通信统计"))if funcode == 1 or funcode == 2 :if funcode == 1:regStartName = 0else:regStartName = 10000now_data = readmeterdata2(mysocket, slaveadd, startreg, regnums)if not now_data:for x in range(regnums):now_data.append(0)else:if len(now_data) > regnums:now_data = now_data[:regnums]            readnums = 15errnums = 0regdata = [ regStartName+startreg+reg for reg in range(int(regnums)) ]with Live(generate_table(regdata, now_data), refresh_per_second=4) as live:for _ in range(readnums):time.sleep(0.4)now_data = readmeterdata2(mysocket, slaveadd, startreg, regnums)if not now_data:errnums += 1for x in range(regnums):now_data.append(0)else:if len(now_data) > regnums:now_data = now_data[:regnums]live.update(generate_table(regdata, now_data))console = Console()strmsg = f"读取次数={readnums}, 错误次数={errnums}"console.print(Panel("[yellow]" + strmsg, title="通信统计"))mysocket.close()

码字不易,如果本文对您有用请随手点个赞,谢谢!^_^

用Python实现ModbusTcp协议相关推荐

  1. python语言编写的modbus协议_基于Python的ModbusTCP客户端实现

    Modbus协议是由Modicon公司(现在的施耐德电气Schneider Electric)推出,主要建立在物理串口.以太网TCP/IP层之上,目前已经成为工业领域通信协议的业界标准,广泛应用在工业 ...

  2. 基于Python的ModbusTCP客户端实现

    Modbus协议是由Modicon公司(现在的施耐德电气Schneider Electric)推出,主要建立在物理串口.以太网TCP/IP层之上,目前已经成为工业领域通信协议的业界标准,广泛应用在工业 ...

  3. 通过python基于netconf协议获取网络中网元的配置数据,助力企业网络控制自动化轻松实现!

    摘要:在当今信息化时代,大多数企业都需要网络支撑企业的ICT运行,提升企业运行效率,针对企业网络中的网元设备(包括交换机,路由器,防火墙等),很多企业希望根据自身的业务特点定制网络管理,比如可以实现网 ...

  4. 从入门到入土:基于Python采用TCP协议实现通信功能的程序

    此博客仅用于记录个人学习进度,学识浅薄,若有错误观点欢迎评论区指出.欢迎各位前来交流.(部分材料来源网络,若有侵权,立即删除) 本人博客所有文章纯属学习之用,不涉及商业利益.不合适引用,自当删除! 若 ...

  5. python 下载文件-Python实现HTTP协议下的文件下载方法总结

    本文介绍了几种常用的python下载文件的方法,具体使用到了htttplib2,urllib等包,希望对大家有帮忙. 1.简单文件下载 使用htttplib2,具体代码如下: h = httplib2 ...

  6. python应用系列教程——python使用smtp协议发送邮件:html文本邮件、图片邮件、文件附件邮件

    全栈工程师开发手册 (作者:栾鹏) python教程全解 python使用smtp协议发送电子邮件.包含发送html文本邮件.包含图片附件的邮件,包含其他文件附件的邮件.可设置邮件的收发人,主题,内容 ...

  7. 如何快速实现西门子S7-200/300 PLC转Modbus-TCP协议与第三方数据对接

    如何快速实现西门子S7-200/300 PLC转Modbus-TCP协议与第三方数据对接 引言 西门子 SIMATIC 自动化控制系统在工业控制市场应用相当广泛,凭借其安全可靠性.全集成产品线和优异的 ...

  8. Python通过MQTT协议上传物联网数据给ThingsBoard

    第一步: 用租户账号登录,配置ThingsBoard 设备,使用MQTT协议 第二步: 参考这篇文章,使用MQTT客户端 发消息给ThingsBoard 主要是一个username的参数 第三步: 使 ...

  9. ModbusTCP协议报文解析

    ModbusTCP协议报文解析 报文格式 交互(通信)标识:2个字节 为此次通信事务处理标识符,一般每次通信之后将被要求加1以区别不同的通信数据报文. 协议标识:2个字节 表示该条指令遵循Modbus ...

最新文章

  1. 高校人工智能热的“冷”思考
  2. 安卓开发网络资源汇总
  3. 用 Go 语言实现 Raft 选主
  4. 小白教你一步一步安装Scrapy(西瓜皮)(带图带资源)
  5. spark从hbase读取写入数据
  6. Node.js构建可扩展的Web应用1
  7. 全局事件总线 (GlobalEventBus)
  8. 对Spring Boot还陌生吗?
  9. Lua4.0 实现#操作,获取table大小
  10. 2019-12-17 drivers/clocksource/arm_arch_timer.c
  11. Java多线程之同步与阻塞队列
  12. C++ 函数其实可以不用写返回值的声明!
  13. SqlServer 2008出现远程过程调用失败,错误代码[Ox800706be]
  14. 《数据通信与网络》笔记--广域网SONET/SDH
  15. 解决mount.nfs: /home/xxxx/mpi-install is busy or already mounted问题
  16. 【Cesium】加载互联网地图服务——天地图
  17. Linux命令之dos2unix
  18. “住过一晚两万的ICU后,我还是建议你不要轻易买保险”
  19. 英雄联盟 无法服务器未响应,Win7英雄联盟登陆服务器未响应的解决方法
  20. 建筑工程单位材料成本设计管理软件系统

热门文章

  1. 计算机视觉2021年3月28最新论文
  2. EF 使用事务SqlTransaction
  3. 面向初学者的蒙特卡洛树搜索MCTS详解及其实现
  4. 经典SQL题练习(MySQL版)
  5. Tomcat和SpringMVC的关系
  6. 绘画彩铅网课课程教程素描水彩教学画画手绘美术零基础国画视频课
  7. OLED如何播放badapple
  8. 手机定位技术术语收录之-MCC(Mobile Country Code,移动国家号码)
  9. 如何在html中快速的做出锚点图效果
  10. word中论文三段页码设置