dpkt是一个用于解析和操作网络数据包的Python模块。它提供了对各种网络协议(如以太网、IP、TCP、UDP、ICMP等)的解析功能,并允许访问和操作数据包的各个字段和参数。可以用于解析从各种抓包工具捕获的数据包,并提取和分析包内的参数。

dpkt包的一些主要特点和用途:

  • 数据包解析:dpkt提供了解析网络数据包的功能,可以读取和解析从各种抓包工具(如Wireshark)导出的数据包文件(如pcap文件)。

  • 支持多种协议:dpkt支持解析和操作多种网络协议,包括以太网、IP、TCP、UDP、ICMP等。它可以识别和提取数据包中的各个协议头部,并提供了访问协议字段的方法。

  • 简单易用:dpkt的API设计简单易用,使得解析和操作数据包变得容易。您可以使用Python代码轻松地读取数据包文件,遍历数据包,并访问和处理数据包中的各个字段。

  • 数据包生成:除了解析数据包,dpkt还提供了创建和构造数据包的功能。您可以使用dpkt来生成特定协议的数据包,并设置各个字段的值。

  • 协议分析和网络流量分析:借助dpkt,您可以进行协议分析和网络流量分析。通过解析数据包并提取关键信息(如源IP地址、目标IP地址、端口号等),您可以进行网络流量统计、行为分析、异常检测等操作。

  • 兼容性:dpkt与Python的标准库兼容,并可以与其他Python库(如socket、struct等)结合使用。

总体而言,dpkt是一个功能强大且易于使用的Python模块,适用于网络数据包的解析、分析和操作。无论是进行网络安全研究、网络流量分析、网络协议分析还是开发网络应用程序,dpkt都可以为您提供便捷的工具和功能。

使用Dpkt分析数据包

代码使用了dpkt模块来读取和解析pcap文件(例如"D://aaa.pcap")中的数据包,并搜索其中的HTTP请求中是否包含指定的关键词(例如"wang.zip")。

#coding=utf-8
import dpkt
import socketdef FindPcapWord(pcap,WordKey):for ts,buf in pcap:try:eth = dpkt.ethernet.Ethernet(buf)ip = eth.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)tcp = ip.datahttp = dpkt.http.Request(tcp.data)if(http.method == "GET"):uri = http.uri.lower()if WordKey in uri:print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))except Exception:passfp = open("D://aaa.pcap","rb")
pcap = dpkt.pcap.Reader(fp)
FindPcapWord(pcap,"wang.zip")

也可以使用dpkt解析本机数据包中是否包含后门。

#coding=utf-8
import dpkt
import socketdef FindPcapWord(pcap,WordKey):for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)tcp = ip.datahttp = dpkt.http.Request(tcp.data)if(http.method == "GET"):uri = http.uri.lower()if WordKey in uri:print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))except Exception:passdef Banner():print("  _          ____  _                _    ")print(" | |   _   _/ ___|| |__   __ _ _ __| | __")print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")print(" | |__| |_| |___) | | | | (_| | |  |   < ")print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")print("       |___/                             \n")print("E-Mail: me@lyshark.com")def FindHivemind(pcap):for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datatcp = ip.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)sport = tcp.sportdport = tcp.dport# print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))if dport == 80 and dst == "125.39.247.226":# 如果数据流中存在cmd等明文命令则说明可能存在后门if '[cmd]# ' in tcp.data.lower():print("[+] {}:{}".format(dst,dport))except Exception:passBanner()
fp = open("D://aaa.pcap","rb")
pcap = dpkt.pcap.Reader(fp)
FindHivemind(pcap)

实时检测DDoS攻击

主要通过设置检测不正常数据包数量的阈值来判断是否存在DDoS攻击。

#coding=utf-8
import dpkt
import socketdef FindPcapWord(pcap,WordKey):for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)tcp = ip.datahttp = dpkt.http.Request(tcp.data)if(http.method == "GET"):uri = http.uri.lower()if WordKey in uri:print("[+] 源地址: {} --> 目标地址: {} 检索到URL中存在 {}".format(src,dst,uri))except Exception:passdef FindHivemind(pcap):for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datatcp = ip.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)sport = tcp.sportdport = tcp.dport# print("[+] 源地址: {}:{} --> 目标地址:{}:{}".format(src,sport,dst,dport))if dport == 80 and dst == "125.39.247.226":# 如果数据流中存在cmd等明文命令则说明可能存在后门if '[cmd]# ' in tcp.data.lower():print("[+] {}:{}".format(dst,dport))except Exception:passdef Banner():print("  _          ____  _                _    ")print(" | |   _   _/ ___|| |__   __ _ _ __| | __")print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")print(" | |__| |_| |___) | | | | (_| | |  |   < ")print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")print("       |___/                             \n")print("E-Mail: me@lyshark.com")def FindDDosAttack(pcap):pktCount = {}for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datatcp = ip.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)sport = tcp.sport# 累计判断各个src地址对目标地址80端口访问次数if dport == 80:stream = src + ":" + dstif pktCount.has_key(stream):pktCount[stream] = pktCount[stream] + 1else:pktCount[stream] = 1except Exception:passfor stream in pktCount:pktSent = pktCount[stream]# 如果超过设置的检测阈值500,则判断为DDOS攻击行为if pktSent > 500:src = stream.split(":")[0]dst = stream.split(":")[1]print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))if __name__ == "__main__":Banner()fp = open("D://data.pcap","rb")pcap = dpkt.pcap.Reader(fp)FindPcapWord(pcap,"wang.zip")

DPKT动态抓包解析

首先使用scapy动态抓包,然后调用不同的函数对抓到的数据包进行处理提取出想要的数据.

import os,argparse,dpkt
from scapy.all import *
pkts=[]
count=0# 检查数据包的IP层,提取出IP和TTL字段的值
def Get_TTL(pkt):try:if pkt.haslayer(IP):ip_src = pkt.getlayer(IP).srcip_sport = pkt.getlayer(IP).sportip_dst = pkt.getlayer(IP).dstip_dport = pkt.getlayer(IP).dportip_ttl = str(pkt.ttl)print("[+] 源地址: %-15s:%-5s --> 目标地址: %-15s:%-5s --> TTL: %-5s"%(ip_src,ip_sport,ip_dst,ip_dport,ip_ttl))except Exception:pass# 获取本机发送出去的DNS请求所对应的网站地址
def Get_DNSRR(pkt):if pkt.haslayer(DNSRR):rrname = pkt.getlayer(DNSRR).rrnamerdata = pkt.getlayer(DNSRR).rdatattl = pkt.getlayer(DNSRR).ttlprint("[+] 域名: {} --> 别名: {} --> TTL: {}".format(rrname,rdata,ttl))# 解析网页的DNS查询记录
def Get_DNSQR(pkt):# 判断是否含有DNSRR且存在UDP端口53if pkt.haslayer(DNSRR) and pkt.getlayer(UDP).sport == 53:rcode = pkt.getlayer(DNS).rcodeqname = pkt.getlayer(DNSQR).qname# 若rcode为3,则表示该域名不存在if rcode == 3:print("[-] 域名解析不存在")else:print("[+] 解析DNSQR存在:" + str(qname))# 检测主机是否被DDOS攻击了
def FindDDosAttack(pcap):pktCount = {}for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datatcp = ip.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)sport = tcp.sport# 累计判断各个src地址对目标地址80端口访问次数if dport == 80:stream = src + ":" + dstif pktCount.has_key(stream):pktCount[stream] = pktCount[stream] + 1else:pktCount[stream] = 1except Exception:passfor stream in pktCount:pktSent = pktCount[stream]# 如果超过设置的检测阈值500,则判断为DDOS攻击行为if pktSent > 500:src = stream.split(":")[0]dst = stream.split(":")[1]print("[+] 源地址: {} 攻击: {} 流量: {} pkts.".format(src,dst,str(pktSent)))# FindPcapURL 监控提取数据包中的所有URL
def FindPcapURL(pcap):Url = []for timestamp,packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)ip = eth.datasrc = socket.inet_ntoa(ip.src)tcp = ip.datahttp = dpkt.http.Request(tcp.data)if(http.method == "GET"):UrlHead = http.headersfor key,value in UrlHead.items():url = re.findall('^https*://.*',str(value))if url:print("[+] 源地址: %10s --> 访问URL: %-80s"%(src, url[0]))except Exception:passreturn set(Url)# 动态保存pcap文件(每1024字节保存一次pcap文件),并读取出其中的网址解析出来
def write_cap(pkt):global pktsglobal countpkts.append(pkt)count += 1if count == 1024:wrpcap("data.pcap",pkts)fp = open("./data.pcap","rb")pcap = dpkt.pcap.Reader(fp)FindPcapURL(pcap)fp.close()pkts,count = [],0def Banner():print("  _          ____  _                _    ")print(" | |   _   _/ ___|| |__   __ _ _ __| | __")print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")print(" | |__| |_| |___) | | | | (_| | |  |   < ")print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")print("       |___/                             \n")print("E-Mail: me@lyshark.com")if __name__ == "__main__":Banner()parser = argparse.ArgumentParser()parser.add_argument("--mode",dest="mode",help="模式选择<TTL/DNSRR/DNSQR/URL>")args = parser.parse_args()if args.mode == "TTL":print("[*] 开始抓取本机TTL流量")sniff(prn=Get_TTL,store=0)elif args.mode == "DNSRR":print("[*] 开始抓取本机发送出去的DNS查询请求所对应的网站URL")sniff(prn=Get_DNSRR,store=0)elif args.mode == "DNSQR":print("[*] 解析网页的DNS查询记录")sniff(prn=Get_DNSQR,store=0)elif args.mode == "URL":print("[+] 开始抓包,pcap文件并读取出其中的网址")sniff(prn=write_cap,store=0)else:parser.print_help()

Geoip2定位IP来源

首先提取出Pcpa格式的数据包文件,然后通过使用离线数据库查询出指定IP地址的地理位置.

# pip install geoip2
# github地址下载:https://github.com/maxmind/GeoIP2-python
# 离线数据库:https://www.maxmind.com/en/accounts/current/geoip/downloads
import argparse
import socket,dpkt
import geoip2.databasedef AnalysisPace(DpktPack,Filter):respon = []with open(DpktPack,"rb") as fp:pcap = dpkt.pcap.Reader(fp)for timestamp, packet in pcap:try:eth = dpkt.ethernet.Ethernet(packet)# 解析过滤出网络层(三层)中的IP数据包if eth.data.__class__.__name__ == "IP":ip = eth.datasrc = socket.inet_ntoa(ip.src)dst = socket.inet_ntoa(ip.dst)# 解析过滤出传输层(四层)中的TCP数据包if eth.data.data.__class__.__name__ == "TCP":sport = eth.data.data.sportdport = eth.data.data.dport# 过滤出源地址是192.168.1.2且目的端口是80或者443的流量# if src == "192.168.1.2" and dport == 80 or dport == 443:if eval(Filter):dic = { "src":"None","sport":0 , "dst":"None","dport":0 }#print("[+] 时间戳: %-17s 源地址: %-14s:%-2s ---> 目标地址: %-16s:%-2s" %(timestamp,src, sport, dst, dport))RecvData = eth.data.data.dataif len(RecvData) and b"GET" in RecvData:#print("[*] 时间戳: {} 源地址: {} <--- 访问网页: {}".format(timestamp,src,bytes.decode(RecvData).split("\n")[1]))passdic['src'] = srcdic['dst'] = dstdic['sport'] = sportdic['dport'] = dportrespon.append(dic)except Exception:passreturn respondef AnalysisIP_To_Address(PcapFile,MmdbFile):IPDict = AnalysisPace(PcapFile,"dport ==80 or dport == 443")NoRepeat = []for item in range(len(IPDict)):NoRepeat.append(IPDict[item].get("dst"))NoRepeat = set(NoRepeat)reader = geoip2.database.Reader(MmdbFile)for item in NoRepeat:try:response = reader.city(item)print("[+] IP地址: %-16s --> " %item,end="")print("网段: %-16s --> " %response.traits.network,end="")print("经度: %-10s 纬度: %-10s --> " %(response.location.latitude, response.location.longitude),end="")print("定位: {} {} {}".format(response.country.names["zh-CN"],response.subdivisions.most_specific.name,response.city.name),end="\n")except Exception:print("定位: None None None")passdef Banner():print("  _          ____  _                _    ")print(" | |   _   _/ ___|| |__   __ _ _ __| | __")print(" | |  | | | \___ \| '_ \ / _` | '__| |/ /")print(" | |__| |_| |___) | | | | (_| | |  |   < ")print(" |_____\__, |____/|_| |_|\__,_|_|  |_|\_\\")print("       |___/                             \n")print("E-Mail: me@lyshark.com\n")if __name__ == '__main__':Banner()parser = argparse.ArgumentParser()parser.add_argument("-p", "--pcap", dest="pcap", help="设置抓到的数据包 *.pcap")parser.add_argument("-d", "--mmdb", dest="mmdb", help="设置城市数据库 GeoLite2-City.mmdb")args = parser.parse_args()# 使用方式: main.py -p data.pcap -d GeoLite2-City.mmdb (分析数据包中IP)if args.pcap and args.mmdb:AnalysisIP_To_Address(args.pcap,args.mmdb)else:parser.print_help()

Python 运用Dpkt库解析数据包相关推荐

  1. 基于python的npcap库与dpkt库实现抓包及存储

    基于python的npcap库与dpkt库实现抓包及存储 import pcap import dpkt import socket import sys import getopt import o ...

  2. python解析数据包_python – 解析UDP数据包

    我正在构建一个UDP服务器来解析和验证传入的UDP数据包.我能够接收和解析数据包,但标头值不是我所期望的. 这是传入数据包的结构 包ID(4个字节) 包序列(4个字节) XOR密钥(2个字节) 数据包 ...

  3. Python中pandas库实现数据缺失值判断isnull()函数

    [小白从小学Python.C.Java] [Python全国计算机等级考试] [Python数据分析考试必会题] ● 标题与摘要 Python中pandas库实现数据缺失值判断 isnull()函数 ...

  4. 使用python的turtle库画表情包

    使用python的turtle库画表情包 话不多说,先上效果图,然后是代码 代码如下: import turtleturtle.penup() turtle.goto(-80,20) turtle.c ...

  5. 【Python爬虫学习笔记4】结合Xpath与lxml库解析数据

    在之前的学习中了解了如何使用爬虫向目标服务器发送请求并获取响应,而此后便是要对响应进行处理,这里的处理在爬虫中通常指的是数据解析,即将相应内容数据化以方便我们进行有效数据的提取.在此过程中,有许多解析 ...

  6. python爬虫解析数据包_Python网络爬虫之三种数据解析方式

    引入 回顾requests实现数据爬取的流程 指定url 基于requests模块发起请求 获取响应对象中的数据 进行持久化存储 其实,在上述流程中还需要较为重要的一步,就是在持久化存储之前需要进行指 ...

  7. winpcap编程 解析数据包

    WinPcap和Libpcap的最强大的特性之一,是拥有过滤数据包的引擎. 它提供了有效的方法去获取网络中的某些数据包,这也是WinPcap捕获机制中的一个组成部分. 用来过滤数据包的函数是 pcap ...

  8. Qt工作笔记-发送端发送Json格式的数据包,接收端解析数据包

    原理以及运行 原理是因为QJsonDocument提供了一个函数toJson 可以直接把Json数据转成QByteArray,这样就可以直接发送数据包,然后再按照常规方法解析即可,本源码中含解析内容, ...

  9. java 解析数据包_java - 如何在Java中正确解析TCP数据包? - 堆栈内存溢出

    我目前有一个简单的TCP服务器,该服务器调用一个函数,并在每个新的传入数据包上以字节数组的形式将二进制TCP有效负载传递给它,解析它的正确方法是什么? 我试图通过将其切成不同的字节数组并分别进行处理来 ...

最新文章

  1. (五)Maven目录结构及常用命令说明
  2. python从零基础到项目实战怎么样-2018完整Python零基础到项目精通的学习书籍
  3. 第4章 与缓冲区有关的函数
  4. [Android]AndroidBucket增加碎片SubLayout功能及AISubLayout的注解支持
  5. 解析mysqlbinlog日志_mysqlbinlog日志分析 日志挖掘 违规操作
  6. c++ sleep函数_《PHP扩展开发》-hook-(hook原来的sleep)
  7. iOS25个性能优化,和内存优化
  8. HLA程序:HelloWorld.hla
  9. l#039;oracle 酒,2011 Stellenbosch Vineyards Oracle of the Sun Shiraz, Stellenbosch, South Africa
  10. 图像卷积详细解释 常用卷积核解释说明
  11. Atitit 架构师之道 attilax著 1.1. 认和评估系统需求, 2 1.2. 给出开发规范 2 1.3. ,搭建系统实现的核心构架, 2 1.4. 扫清主要难点的技术人员 2 1.5. 核
  12. 十分好用PDF转换成PPT转换器
  13. 区块链应用 | 高盛报告深度解读:区块链在未来的5大应用
  14. oracle错误信息提示中英文显示切换
  15. 如何用matlab编写分段函数_matlab 如何写分段函数
  16. matlab矩阵按位取反,第二章matlab的理基本使用方法.ppt
  17. FLV 格式:为什么直播首选这个流媒体格式?丨音视频基础
  18. python interpreter 中没有torch_python自动化办公之 Python 解析 PDF
  19. 【啊哈算法学习笔记】2.栈
  20. iOS 文件下载及断点续传

热门文章

  1. maven配置成功,但显示'cmd' 不是内部或外部命令,也不是可运行的程序 或批处理文件。...
  2. 微服务链路追踪-SkyWalking
  3. 智慧工厂用到的技术_规划智慧工厂的十种解决方案
  4. android麦克风的权限是什么,和平精英麦克风权限怎么开?麦克风权限开启方法[视频][多图]...
  5. outlook2010查看邮箱服务器,怎么设置Microsoft Outlook2010邮箱
  6. Doocker ubuntu 16.04 学习总结(四)- 使用 docker搭建 Jupyter 环境 运行 handson_ml 机器学习程序
  7. 为什么Chrome比其他浏览器快
  8. 换行与回车----\n与\enter的区别
  9. Mac如何通过 Multi-Touch Bar使用“切换控制”
  10. 胎压监测系统TPMS