作者 |小小明

来源 |杰哥的IT之旅

今天我们的目标是就是将Ping整个网段IP的总耗时降低到5秒以内,这样我们就能够在5秒内知道指定mac地址设备的上下线,例如开发一个BOSS来了的摸鱼神器,只要老板的手机一连上wifi,这边在5秒内收到通知,立马停止摸鱼,就保证了平时放心大胆的摸鱼⚡。

那么如何提速呢?经过我几天的苦思冥想,并在学习了一些网络知识后,自己实现了PING命令,成功的实现了放心大胆的摸鱼。于是,在我看了几本书,写了几千行代码,踩了几百个坑后,终于把相关知识理解透了。下面是我将涉及到的核心知识点总结成了这篇文章,所以这篇文章都是非常精简的干货,强烈❤️建议收藏❤️。

学完本文,你的力量将不仅仅止于此,还能够底层化开发任何基于IP协议的自定义协议,当然这要看你自己是否具有举一反三的能力。甚至你还能继续自己深挖,去研究开发比IP协议更底层的协议。

渴望吗?渴望那就学起来吧⁉️

01

socket 套接字核心知识

socket 简介

进程间通信指运行的程序之间的数据共享,在1台电脑上可以通过进程号(PID)来唯一标识一个进程进行通信。

在网络中,TCP/IP协议族网络层的“ip地址”可以唯一标识网络中的主机,而传输层的“协议+端口”可以唯一标识主机中的应用进程(进程)。网络中的进程通信就可以通过ip地址,协议,端口这个标志与其它进程进行交互。

socket(简称 套接字) 就是实现网络进程间通信的一种方式,网络上各种各样的服务大多都是基于 Socket 来完成通信的。为了建立通信通道,网络通信的每个端点拥有一个socket套接字对象,它们允许程序接受并进行连接,如发送和接受数据。

socket 链接

在 Python 中 使用socket 模块的函数 socket 就可以完成:

import socket
socket.socket(family=-1, type=-1, proto=-1, fileno=None)

参数说明:

family为指定的地址族,主要有三种:

  • socket.AF_UNIX :用于同一台机器进程间通信

  • socket.AF_INET :基于ipv4协议的Internet 进程间通信

  • socket.AF_INET6 :基于ipv6协议的Internet 进程间通信

更多的地址族还包括,socket.AF_BLUETOOTH蓝牙相关、socket.AF_VSOCK虚拟机通信、socket.AF_PACKET直连网络设备底层接口等。

type为指定的套接字类型,主要有三种:

  • socket.SOCK_STREAM :流式套接字,使用面向连接的TCP协议实现字节流的传输

  • socket.SOCK_DGRAM :数据报套接字,使用面向非连接的UDP实现数据报套接字

  • socket.SOCK_RAW:原始套接字,该套接字允许对较低层协议(如 IP或 ICMP)进行直接访问

更多套接字类型还包括socket.SOCK_RDMsocket.SOCK_SEQPACKET等。

TCP 与 UDP 通信模型

对于tcp或udp套接字可以直接使用以下方式进行创建:

import socket# 创建tcp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建udp的套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 不用的时候,关闭套接字
s.close()

UDP通信模型:在通信开始之前,不需要建立相关的链接,只需要发送数据即可。

UDP服务端示例代码:

from socket import *
# 创建套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 绑定本地的相关信息,不绑定系统会随机分配
udp_socket.bind(('0.0.0.0', 8080))
# 等待接收对方发送的数据
recv_data = udp_socket.recvfrom(1024) #  1024表示本次接收的最大字节数
# 显示接收到的数据,第1个元素是对方发送的数据,第2个元素是对方的ip和端口
print(recv_data[0].decode('u8'))
# 关闭套接字
udp_socket.close()

UDP客户端示例代码:

from socket import *
# 创建udp套接字
udp_socket = socket(AF_INET, SOCK_DGRAM)
# 发送数据到指定的电脑上的指定程序中
udp_socket.sendto("你好,服务器~".encode('u8'), ('192.168.1.103', 8080))
# 关闭套接字
udp_socket.close()

TCP通信模型:在通信开始之前,一定要先建立相关的链接,才能发送数据。

TCP服务端示例代码:

from socket import *# 创建socket
tcp_server_socket = socket(AF_INET, SOCK_STREAM)
# 服务器绑定本机ip和端口
tcp_server_socket.bind(('0.0.0.0', 8080))
# 监听端口,128表示最大同时接收128个客户端链接
tcp_server_socket.listen(128)
# 如果有新的客户端来链接服务器,那么就产生一个新的套接字专门为这个客户端服务
client_socket, clientAddr = tcp_server_socket.accept()
# 接收对方发送过来的数据
recv_data = client_socket.recv(1024)  # 接收1024个字节
print('接收到的数据为:', recv_data.decode('u8'))
# 发送一些数据到客户端
client_socket.send("你好客户端!".encode('u8'))
# 关闭为这个客户端服务的套接字
client_socket.close()

TCP客户端示例代码:

from socket import *# 创建socket
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
# 链接服务器
tcp_client_socket.connect(('192.168.3.31', 8080))
tcp_client_socket.send("测试发送的内容".encode("u8"))
# 接收对方发送过来的数据,最大接收1024个字节
recvData = tcp_client_socket.recv(1024)
print('接收到的数据为:', recvData.decode('u8'))
# 关闭套接字
tcp_client_socket.close()

SOCK_RAW 原始套接字

上述两种套接字是常规的套接字模式,第三个参数省略或为零(IP协议)会自动选择正确的协议(TCP协议和UDP协议)。

当我们指定套接字类型为socket.SOCK_RAW原始套接字时,第三个参数就需要指定proto协议号。

python的socket库预定义的协议号有:

  • socket.IPPROTO_TCP:TCP传输协议,值为6

  • socket.IPPROTO_UDP:UDP传输协议,值为17

  • socket.IPPROTO_ICMP:ICMP协议,值为1

  • socket.IPPROTO_IP:IP协议,值为0

  • socket.IPPROTO_RAW:可自行构建IP头部构建更底层的协议,值为1

也可以通过协议名称获取协议号常量:

import socketprint(socket.IPPROTO_ICMP, socket.getprotobyname("icmp"),socket.IPPROTO_ICMP == socket.getprotobyname("icmp"))
1 1 True

可以看到两种方式获取协议号均可。

通过原始套接字我们可以使用ICMP或更底层的协议进行通讯从而实现更高级的功能。

我们需要使用ICMP协议进行网络通信就可以使用SOCK_RAW原始套接字:

icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

socket 模块和对象的其他常用方法

socket模块的其他常用方法:

socket.gethostbyname:将主机名转换为IPv4地址格式。IPv4地址以字符串形式返回

socket.gethostname:返回包含Python解释器当前正在执行的机器的主机名的字符串

socket.gethostbyaddr:根据IP地址获取主机名

socket.getprotobyname:将Internet协议名称转换为协议号常量

在主机字节顺序与网络字节顺序不相同的机器上,使用以下方法转换:

网络顺序转换为主机字节顺序 主机顺序转换为网络字节顺序
32位正整数
4字节的交换操作
socket.ntohl socket.htonl
16位正整数
2字节的交换操作
socket.ntohs socket.htons

在主机字节顺序与网络字节顺序相同的机器上,执行以上方法是无操作的。

socket.inet_aton:将字符串格式的IPv4地址打包为32位4字节的字节对象

获取本机ip地址方法1:先获取本机主机名,再通过主机名获取ip

import socketip = socket.gethostbyname(socket.gethostname())
print(ip)
192.168.3.31

获取本机所有网卡的IP:

ips = socket.gethostbyname_ex(socket.gethostname())[-1]
print(ips)
['192.168.3.31']

⚠️注意:如果本机没有正确设置主机名时可能无法获取本机ip地址。

socket套接字对象的公用函数套接字函数:

  • s.getpeername()  :返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)

  • s.getsockname()  :返回套接字自己的地址。通常是一个元组(ipaddr,port)

  • s.setsockopt(level,optname,value)  :设置给定套接字选项的值。

  • s.getsockopt(level,optname[.buflen])  :返回套接字选项的值。

  • s.settimeout(timeout)  :设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect)

  • s.gettimeout()  :返回当前超时期的值,单位是秒,如果没有设置超时期,则返回None

  • s.fileno()  :返回套接字的文件描述符

  • s.setblocking(flag)  :如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。

  • s.makefile()  :创建一个与该套接字相关联的文件。

获取本机ip地址方法2:向任意网络地址发送一个无状态的UDP请求后,再通过套接字对象获取自己的地址从而获取本机地址

import socketdef get_local_ip():with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.connect(('1.1.1.1', 80))ip, port = s.getsockname()return ip
# 获取本机IP
ip = get_local_ip()
print(ip)
192.168.3.31

✅即使无法连接Internet目标地址无法访问(发出报文会丢失),也可以使用该方法获取本机ip地址。

struct 二进制数据的转换

Python提供了一个struct模块来解决bytes和其他二进制数据类型的转换。

struct的pack函数把任意数据类型变成bytes。

import struct
print(struct.pack('>I', 10240099))
b'\x00\x9c@c'

pack 的第一个参数是处理指令:

  • >:表示字节顺序是 big-endian,也就是网络序

  • I:表示 4 字节无符号整数

  • H:2 字节无符号整数。

后面的参数字节个数要和处理指令一致。

unpack 把 bytes 变成相应的数据类型:

>>> struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80')
(4042322160, 32896)

struct模块定义的数据类型可以参考Python官方文档:

https://docs.python.org/zh-cn/3/library/struct.html#format-characters

格式 C 类型 Python 类型 标准大小 注释
x 填充字节
c char 长度为 1 的字节串 1
b signed char 整数 1 (1), (2)
B unsigned char 整数 1 -2
? _Bool bool 1 -1
h short 整数 2 -2
H unsigned short 整数 2 -2
i int 整数 4 -2
I unsigned int 整数 4 -2
l long 整数 4 -2
L unsigned long 整数 4 -2
q long long 整数 8 -2
Q unsigned long long 整数 8 -2
n ssize_t 整数 -3
N size_t 整数 -3
e -6 浮点数 2 -4
f float 浮点数 4 -4
d double 浮点数 8 -4
s char [] 字节串
p char [] 字节串
P void * 整数 -5

02

Ping 的工作原理

ping 基于 ICMP 协议工作的,ICMP 全称是 Internet Control Message Protocol,也就是互联网控制报文协议。ping 发出的ICMP 报文实际上是以侦察网络状态的形式实现了控制,反馈网络状态,从而调整传输策略以此控制整个局面。

ICMP 主要的功能包括:确认 IP 包是否成功送达目标地址、报告发送过程中 IP 包被废弃的原因和改善网络设置等。ICMP 协议主要负责在 IP 通信中通知某个 IP 包未能达到目标地址的原因。

ICMP 报文格式

上述报文格式中,左边的IP头部分不需要太关心,因为我们使用socket的原始套接字模式会自动帮我们封装IP头部分,右边的ICMP报文才是我们需要关心的部分。

⚠️注意:相比原生的 ICMP,Ping命令发出的ICMP报文多出了标识符和序号两个字段。

对于ICMP报文的类型,有两大类:

  1. 查询报文类型:用于诊断的查询消息

  2. 差错报文类型:通知出错原因的错误消息

不过咱们使用的PING只需要使用查询报文类型中的回送应答和回送请求。

ICMP 查询报文类型

回送消息:0表示回送应答,8表示回送请求。用于进行通信的主机或路由器之间,判断所发送的数据包是否已经成功到达对端的一种消息。

ping 命令是通过ICMP协议的回送消息实现的。

发送端主机向接收端主机发送一个回送请求(ICMP Echo Request Message,类型 8),只要正常接收到接收端返回的回送响应(ICMP Echo Reply Message,类型 0),则代表发送端主机到接收端主机可达。

ICMP 差错报文类型

对于差错报文类型,在本次编码中不会用到,无需深究,简单了解一下即可。

ICMP 常见差错报文:

  • 目标不可达消息 —— 类型 为 3

  • 原点抑制消息 —— 类型 4

  • 重定向消息 —— 类型 5

  • 超时消息 —— 类型 11

目标不可达消息(Destination Unreachable Message):

IP 路由器无法将 IP 数据包发送给目标地址时,会给发送端主机返回一个目标不可达的 ICMP 消息,并在这个消息中显示不可达的具体原因,原因记录在 ICMP 包头的代码字段。

由此,根据 ICMP 不可达的具体消息,发送端主机也就可以了解此次发送不可达的具体原因

目标不可达的原因有:

  • 网络不可达代码为 0

  • 主机不可达代码为 1

  • 协议不可达代码为 2

  • 端口不可达代码为 3

  • 需要进行分片但设置了不分片位代码为 4

原点抑制消息(ICMP Source Quench Message):

ICMP 原点抑制消息的目是为了缓和网络拥堵的问题,当路由器向低速线路发送数据时,其发送队列的缓存变为零而无法发送出去时,可以向 IP 包的源地址发送一个 ICMP 原点抑制消息

但是收到这种 ICMP 消息的主机并不见得真的会增大 IP 包的传输间隔,还可能会引起不公平的网络通信,所以一般不被使用。

重定向消息(ICMP Redirect Message):

在路由器持有更好的路由信息时,发现发送端主机使用了不是最优的路径发送数据,那么路由器会返回一个 ICMP 重定向消息给这个主机。这个消息中包含了最合适的路由信息和源数据,发送端下次可以发给另外一个更近的路由器。

超时消息(ICMP Time Exceeded Message):

IP 包中有一个8位的字段叫做 TTLTime To Live,生存周期),它的值随着每经过一次路由器就会减 1,直到减到 0 时该 IP 包会被丢弃。

此时,IP 路由器将会发送一个ICMP超时消息给发送端主机,并通知该包已被丢弃。设置 IP 包生存周期的主要目的是为了在路由控制遇到问题发生循环状况时,避免 IP 包无休止地在网络上被转发。

也可以通过设置一个较小的 TTL 值 控制包的到达范围。

03

socket 原始套接字实现 ping 命令

学了这么多基础的网络知识,我们最终为了什么?就是为了能够自己实现PING命令。相关的网络知识还有很多,但对于我们实现PING命令并没有太大关系,就暂不做深究。

下面我们从实战出现,一步步调试继续深挖PING命令的实现原理。

首先我们创建ICMP协议的原始套接字链接:

import socketicmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

发送回送请求

然后需要向目标发送一个回送请求

下面开始组织报文数据(对于系列号,我们可以自行决定要发送的值):

import os
import time
import struct# 校验需要后面再计算,这里先设置为0
ICMP_ECHO_REQUEST, code, checksum, identifier, serial_num = 8, 0, 0, os.getpid() & 0xFFFF, 0
# 初步打包ICMP头部
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)
# 打包选项数据,包含当前时间戳,后面用Q补齐到192位
data = struct.pack("d", time.time()).ljust(192, b"Q")

计算校验和的规则这里我已经写成代码,大家可以直接看代码:

def calc_checksum(src_bytes):"""用于计算ICMP报文的校验和"""total = 0max_count = len(src_bytes)count = 0while count < max_count:val = src_bytes[count + 1]*256 + src_bytes[count]total = total + valtotal = total & 0xffffffffcount = count + 2if max_count < len(src_bytes):total = total + ord(src_bytes[len(src_bytes) - 1])total = total & 0xfffffffftotal = (total >> 16) + (total & 0xffff)total = total + (total >> 16)answer = ~totalanswer = answer & 0xffffanswer = answer >> 8 | (answer << 8 & 0xff00)return socket.htons(answer)

⚠️注意:最终返回时通过socket.htons方法将数据从主机序转换为网络序。

然后就可以计算出校验和重新打包header:

checksum = calc_checksum(header + data)
header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)

然后就可以发送了:

# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填
target_addr = "192.168.3.31"
icmp_socket.sendto(header + data, (target_addr, 1))

⚠️注意:虽然发送给了1号端口,但其实发送给任意端口都可以。

接收回送响应

回送响应与回送请求结构一致

发送完消息后,我们就可以接收回送相应:

# 接收回送请求
recv_packet, addr = icmp_socket.recvfrom(1024)
# 前20字节是ip协议的ip头
icmp_header = recv_packet[20:28]
data = recv_packet[28:]
ICMP_Echo_Reply, code, checksum, identifier, serial_num = struct.unpack("bbHHh", icmp_header
)
time_sent, = struct.unpack("d", data[:struct.calcsize("d")])

⚠️注意:我们接收的回送请求中包含了前20自己的IP头。

从选项数据中可解析出了这个包发送的时间(之前发出时写入的时间)。

完善 ping 命令的开发

虽然标准的PING命令是用以上协议规则实现的,但我们并不需要完全按照上述规范,例如标识符可以发送任何16位的值,序号可以从任意数值开始,选项数据192位的空间也可以用来存放任何数据。

我们在接收回送响应时需要检查包的标识符,确定是自己发出的包才接收。

最终封装出如下方法:

import struct
import time
import os
import socket
import selectdef calc_checksum(src_bytes):"""用于计算ICMP报文的校验和"""total = 0max_count = len(src_bytes)count = 0while count < max_count:val = src_bytes[count + 1]*256 + src_bytes[count]total = total + valtotal = total & 0xffffffffcount = count + 2if max_count < len(src_bytes):total = total + ord(src_bytes[len(src_bytes) - 1])total = total & 0xfffffffftotal = (total >> 16) + (total & 0xffff)total = total + (total >> 16)answer = ~totalanswer = answer & 0xffffanswer = answer >> 8 | (answer << 8 & 0xff00)return socket.htons(answer)def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,serial_num=0, data=None):# 校验需要后面再计算,这里先设置为0ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0# 初步打包ICMP头部header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 打包选项数据if data:data = data.ljust(192, b"Q")else:data = struct.pack("d", time.time()).ljust(192, b"Q")checksum = calc_checksum(header + data)header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填icmp_socket.sendto(header + data, (target_addr, 1))def receive_pong(icmp_socket, identifier=os.getpid() & 0xFFFF, serial_num=0, timeout=2):icmp_socket.settimeout(timeout)time_remaining = timeoutwhile True:start_time = time.time()# 接收回送请求recv_packet, (ip, port) = icmp_socket.recvfrom(1024)time_received = time.time()time_spent = time_received-start_time# 前20字节是ip协议的ip头icmp_header = recv_packet[20:28]data = recv_packet[28:]ICMP_Echo_Reply, code, checksum, identifier_reciver, serial_num_reciver = struct.unpack("bbHHh", icmp_header)if identifier_reciver != identifier or serial_num != serial_num_reciver:# 不是当前自己发的包则忽略time_remaining -= time_spentif time_remaining <= 0:raise socket.timeoutcontinuetime_sent, = struct.unpack("d", data[:struct.calcsize("d")])return int((time_received - time_sent)*1000), ip

192.168.3.31是我当前本机的局域网IP地址,测试一下:

icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
ip = '192.168.3.31'
sent_ping(icmp_socket, ip)
try:delay, ip_received = receive_pong(icmp_socket, timeout=2)print(f"延迟:{delay}ms,对方ip:{ip_received}")
except socket.timeout as e:print("超时")
延迟:0ms,对方ip:192.168.3.31

然后再批量ping一下指定当前网段的所有IP:

def get_local_ip():with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.connect(('1.1.1.1', 80))ip, port = s.getsockname()return ipicmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)local_ip = get_local_ip()
net_segment = local_ip[:local_ip.rfind(".")]
ips = []
for i in range(1, 255):ip = f"{net_segment}.{i}"sent_ping(icmp_socket, ip)print("ping", ip, end=" ")try:delay, ip_received = receive_pong(icmp_socket, timeout=0.1)print(f"延迟:{delay}ms,对方ip:{ip_received}")ips.append(ip)except socket.timeout as e:print("超时")
print(ips)
icmp_socket.close()

超时时间0.1秒时,总耗时30秒。

超时时间设置为0.01秒时,总耗时则为2.59秒。

借助 arp 表获取当前网段在线设备

如何尽量快的获取到当前在线的设备?经过测试发现,被ping后,ping不通的机器,arp表能够自动删除对应的条目,那么思路1就是快速的向全网段发送回送请求不等待回送响应,然后2秒后去查arp表,即可看到最新的在线设备。

实现思路1:

import struct
import time
import os
import re
import socket
import pandas as pddef calc_checksum(src_bytes):"""用于计算ICMP报文的校验和"""total = 0max_count = len(src_bytes)count = 0while count < max_count:val = src_bytes[count + 1]*256 + src_bytes[count]total = total + valtotal = total & 0xffffffffcount = count + 2if max_count < len(src_bytes):total = total + ord(src_bytes[len(src_bytes) - 1])total = total & 0xfffffffftotal = (total >> 16) + (total & 0xffff)total = total + (total >> 16)answer = ~totalanswer = answer & 0xffffanswer = answer >> 8 | (answer << 8 & 0xff00)return socket.htons(answer)def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,serial_num=0, data=None):# 校验需要后面再计算,这里先设置为0ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0# 初步打包ICMP头部header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 打包选项数据if data:data = data.ljust(192, b"Q")else:data = struct.pack("d", time.time()).ljust(192, b"Q")checksum = calc_checksum(header + data)header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填icmp_socket.sendto(header + data, (target_addr, 1))def get_arp_ip_mac():header = Nonewith os.popen("arp -a") as res:for line in res:line = line.strip()if not line or line.startswith("接口"):continueif header is None:header = re.split(" {2,}", line.strip())breakdf = pd.read_csv(res, sep=" {2,}",names=header, header=0, engine='python')return dfdef ping_net_segment_all(net_segment):with socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) as icmp_socket:for i in range(1, 255):ip = f"{net_segment}.{i}"sent_ping(icmp_socket, ip)net_segment = "192.168.3"
ping_net_segment_all(net_segment)
# 等待回送响应的到来,预计1秒之内
time.sleep(1)
# 读取最新的arp表
df = get_arp_ip_mac()
df

于是我们获取到了当前网段在线的设备列表。

双线程获取指定网段的在线设备

不过使用arp表查看有个缺陷,只能查看当前网段的,跨网段的在线设备似乎看不到。经分析我使用的台式机通过有线连接到3网段,而手机通过WiFi连接到2网段,所以必须能够分析2网段设备的在线设备才有意义。

思路2:用两个线程一个线程专门发回送请求,一个线程专门接收回送响应,可以通过回送响应获取IP地址,于是就可以得到指定网段的当前在线的设备的ip。

先完成获取在线设备列表:

from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pddef calc_checksum(src_bytes):"""用于计算ICMP报文的校验和"""total = 0max_count = len(src_bytes)count = 0while count < max_count:val = src_bytes[count + 1]*256 + src_bytes[count]total = total + valtotal = total & 0xffffffffcount = count + 2if max_count < len(src_bytes):total = total + ord(src_bytes[len(src_bytes) - 1])total = total & 0xfffffffftotal = (total >> 16) + (total & 0xffff)total = total + (total >> 16)answer = ~totalanswer = answer & 0xffffanswer = answer >> 8 | (answer << 8 & 0xff00)return socket.htons(answer)def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,serial_num=0, data=None):# 校验需要后面再计算,这里先设置为0ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0# 初步打包ICMP头部header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 打包选项数据if data:data = data.ljust(192, b"Q")else:data = struct.pack("d", time.time()).ljust(192, b"Q")checksum = calc_checksum(header + data)header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填icmp_socket.sendto(header + data, (target_addr, 1))def receive_pong(icmp_socket, net_segment, timeout=2):icmp_socket.settimeout(timeout)ips = set()while True:start_time = time.time()try:recv_packet, (ip, port) = icmp_socket.recvfrom(1024)if ip.startswith(net_segment):ips.add(ip)except socket.timeout as e:breakreturn ipsdef ping_net_segment_all(icmp_socket, net_segment):for i in range(1, 255):ip = f"{net_segment}.{i}"sent_ping(icmp_socket, ip)icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
with ThreadPoolExecutor() as p:p.submit(ping_net_segment_all, icmp_socket, "192.168.2")future = p.submit(receive_pong, icmp_socket, "192.168.2", 3)ips = future.result()ips

运行结果,目前我的手机ip为192.168.2.122,运行后被顺利检测到:

{'192.168.2.1','192.168.2.122','192.168.2.17','192.168.2.18','192.168.2.19','192.168.2.20','192.168.2.21','192.168.2.22','192.168.2.23','192.168.2.49'}

关闭手机WiFi后,再次运行,顺利看到该IP的下线。

完成 BOSS 来了的摸鱼神器

在已经将更新时间缩短到5秒以内时,咱们就可以PING指定网段,最后完成分析设备上下线的功能,从而达到最终的目的完成BOSS来了的摸鱼神器。

from concurrent.futures import ThreadPoolExecutor
import _thread
import struct
import time
import os
import re
import socket
import pandas as pddef calc_checksum(src_bytes):"""用于计算ICMP报文的校验和"""total = 0max_count = len(src_bytes)count = 0while count < max_count:val = src_bytes[count + 1]*256 + src_bytes[count]total = total + valtotal = total & 0xffffffffcount = count + 2if max_count < len(src_bytes):total = total + ord(src_bytes[len(src_bytes) - 1])total = total & 0xfffffffftotal = (total >> 16) + (total & 0xffff)total = total + (total >> 16)answer = ~totalanswer = answer & 0xffffanswer = answer >> 8 | (answer << 8 & 0xff00)return socket.htons(answer)def sent_ping(icmp_socket, target_addr, identifier=os.getpid() & 0xFFFF,serial_num=0, data=None):# 校验需要后面再计算,这里先设置为0ICMP_ECHO_REQUEST, code, checksum = 8, 0, 0# 初步打包ICMP头部header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 打包选项数据if data:data = data.ljust(192, b"Q")else:data = struct.pack("d", time.time()).ljust(192, b"Q")checksum = calc_checksum(header + data)header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, code,checksum, identifier, serial_num)# 发送给目标地址,ICMP协议没有端口的概念端口可以随便填icmp_socket.sendto(header + data, (target_addr, 1))def receive_pong(icmp_socket, net_segment, timeout=2):icmp_socket.settimeout(timeout)ips = set()while True:start_time = time.time()try:recv_packet, (ip, port) = icmp_socket.recvfrom(1024)if ip.startswith(net_segment):ips.add(ip)except socket.timeout as e:breakreturn ipsdef ping_net_segment_all(icmp_socket, net_segment):for i in range(1, 255):ip = f"{net_segment}.{i}"sent_ping(icmp_socket, ip)last = None
while 1:icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)with ThreadPoolExecutor() as p:p.submit(ping_net_segment_all, icmp_socket, "192.168.2")future = p.submit(receive_pong, icmp_socket, "192.168.2")ips = future.result()if last is None:print("当前在线设备:", ips)if last:up = ips-lastif up:print("\r新上线设备:", up, end=" "*100)down = last-ipsif down:print("\r刚下线设备:", down, end=" "*100)last = ipstime.sleep(3)

结果示例:

当前在线设备: {'192.168.2.122', '192.168.2.18', '192.168.2.20', '192.168.2.1', '192.168.2.23', '192.168.2.49', '192.168.2.21', '192.168.2.17', '192.168.2.22', '192.168.2.19'}
刚下线设备: {'192.168.2.122'}

经测试,手工关闭或打开手机WiFi能够顺利看到设备IP的打印信息。这种方法虽然无法获取MAC地址,但是经测试,同一台机器都会被分配同一个IP,在我当前的网络下是满足要求的,只需要知道老板手机连接的IP就行了。或者观察一下,老板走之后,到底哪个IP下线了,专门去监控这个IP。

更安全的做法就是每看到有新的IP上线都额外警惕一点,如果你是win10系统可以使用如下方法实现系统通知:

from win10toast import ToastNotifiertoaster = ToastNotifier()
toaster.show_toast("通知标题", "通知内容!", duration=10)

上述三个参数分别是通知标题,通知的内容和通知持续的时间,对于摸鱼这种事持续时间可以调大掉,再手工关闭通知,通过pip install win10toast安装。

04

总结

总算做成了这个摸鱼神器,不过虽然我上面一本正经的讲的津津有味,但不会真有人打算拿这个代码去应用于实际去对付老板吧⁉️不会吧,不会吧⁉️

真打算做摸鱼神器的童鞋,我个人推荐搞个网络摄像头,写个人物图像识别的代码,发现有人进来了都自动提醒,这样才可以更放心的摸鱼。万一老板没连wifi就过来了,这就有点坑。

开发摸鱼神器不是本文本身的目的,学习网络知识自主实现网络协议,从通过实际例子理解网络协议才是本文真正的目的。

资讯

OpenAI开放GPT-3微调功能

技术

LTSM实现多元素时序数据预测

技术

9个好用的python操作文件方法

资讯

云游戏、AR等给元宇宙提供了哪些?

分享

点收藏

点点赞

点在看

手写了个 BOSS 来了的摸鱼神器!相关推荐

  1. 【摸鱼神器】基于python的BOSS识别系统

    [摸鱼神器]基于python的BOSS识别系统 前言 一.整体设计 二.调用摄像头 三.人脸识别 1. 构建白名单库 2. 人脸匹配 四.切换屏幕 五.完整代码 写在最后 前言 Tip:本文仅供技术学 ...

  2. 十行代码写个摸鱼神器,帮你自动化操作Excel

    十行代码写个摸鱼神器,帮你自动化操作Excel 为啥要做这个? 代码基于Python,行数很少,不重复代码十行左右 相关的视频: 为啥要做这个? 现在很多公司会监控员工行为,包括上厕所的时长,不在座位 ...

  3. 划水摸鱼的时候写一篇自己是怎么划水摸鱼的

    划水摸鱼的时候写一篇自己是怎么划水摸鱼的 划水摸鱼这种事情,实在是我等打工人的必备技能.往好了说,我们这是劳逸结合,既有助于我们的身心健康,使我们心情愉悦.往坏了说,咱们这是在剥削资本家(好家伙!!! ...

  4. 上班聊天,摸鱼神器,手写一款即时通讯工具(附源码!!!)

    文章目录 即时通讯工具 客户端 服务端 1.链接 2.登录 3.其他方法 3.1.读取客户端的消息 3.2.给客户端发送消息 3.3.日志记录 3.4.工具集合 3.5.ChatSocket 服务端部 ...

  5. 用 Python 写一个天天酷跑,在线摸鱼不烦恼

    来源丨Python小二 写出来的效果图就是这样了: 下面就更新一下全部的代码吧~ 还是老样子先定义 import pygame,sys import random 写一下游戏配置 width = 12 ...

  6. 一口气用 Python 写了13个小游戏,摸鱼达人!

    来源 | Python小二 1.吃金币 源码分享: import os import cfg import sys import pygame import random from modules i ...

  7. ⁉️socket实现Ping命令打造⚡BOSS来了⚡摸鱼神器⭐干货巨多❤️建议收藏❤️

    大家好,我是

  8. 手写带注册中心的rpc框架(Netty版和Socket版)

    之前使用socket实现了一个简单的RPC框架调用,不了解RPC的实现原理的可以看下那篇文章 手写实现RPC框架基础功能 之前的客户端里是写死了服务端的ip和端口号,这里代码做了个优化,使用zooke ...

  9. 网络与IO知识扫盲(七):仿照Netty工作架构图,手写多路复用模型

    Netty工作架构图 从图上看来: 一个线程在 Boss Group 中负责接收 另外两个线程在 Worker Group 中由接收之后的连接分配过去,负责读写 根据上图模型,仿照Netty手写一个多 ...

最新文章

  1. 遍历JavaScript中的数组
  2. c# 另存为excel
  3. 使用命令创建mysql_用命令创建MySQL数据库
  4. C++学习笔记目录链接(持续更新中)
  5. Qt数据库编程_基本
  6. YouTube键盘快捷键:速查表
  7. awk是命令还是编程语言
  8. 华为P30 Pro外观无悬念:双曲面水滴屏 屏占比超高
  9. 20145201 20145227 《信息安全系统设计基础》实验二 固件开发
  10. 蓝桥杯 基础练习 字母图形
  11. ActiveMQ常见配置
  12. 计算机视频不小心删了怎么恢复,误删电脑硬盘视频文件要怎么恢复
  13. SpringBoot日志logback-spring.xml分环境
  14. C 数据类型 常量 变量
  15. 一、了解JavaScript
  16. 电机学重读(一)基础知识
  17. 疯狂膜拜!万字长文轻松彻底入门spring
  18. 机器学习之深度学习简介
  19. 软件领域专利申请的基本特点
  20. c语言如何画简单图形,如何用C语言画基本图形

热门文章

  1. 个人使用linux软件分享
  2. Wget/httrack 爬取整站资源
  3. 触摸板右键不能用?搞了半天原来是……
  4. CRM客户管理软件系统有哪些功能模块
  5. 鲸鱼优化算法与大数据:高效网站分析优化技术
  6. 失业3个月终于上岸,分享几点经验:不要转行,不要一份简历通吃,要重复投简历,要主动降薪!...
  7. 声音编程(Voice Coding) Talon(一) 基本介绍与安装使用
  8. matlab 图片保存失真,如何将matlab画出的图片保存为要求精度
  9. git 查看当前分支是基于那个分支拉出来的
  10. access 英文是什么意思_有用的三个单词:access,crawl,maintain,快来学学吧!