有一个使用场景:将 tar.gz 读成字节数组(byte array) 然后发往 Kafka。我要查看 Kafka 里面的消息, 直接查看的话就是二进制的乱码,首先想到的是把 Kafka 里面保存的字节数组存储到本地,每一行存成一个 .tar.gz 文件:

# -*- coding: utf-8 -*-

import sys,os

from confluent_kafka import Consumer, KafkaError

import gzip

def run(args):

c = Consumer({

"bootstrap.servers" : args[1], # broker

'group.id': args[2], # group id, 每次不一样

'default.topic.config': {

'auto.offset.reset': 'earliest'

}

})

c.subscribe([ args[3] ]); # topic

i=0

while True:

msg = c.poll(1.0)

if msg is None:

continue

if msg.error():

if msg.error().code() == KafkaError._PARTITION_EOF:

continue

else:

print(msg.error())

break

try:

outF = file( '/data/app/tar/' + str(i) + ".tar.gz", 'wb')

outF.write(msg.value())

outF.close()

i+=1

print i

except Exception,e:

print e

c.close()

if __name__ == '__main__':

print sys.argv

run(sys.argv)

这样就能打开压缩文件查看了。但是这样也不方便。所以要直接将字节数组解压成文本文件:

# -*- coding: utf-8 -*-

import sys,os

from confluent_kafka import Consumer, KafkaError

import gzip,io

import zlib

def run(args):

c = Consumer({

"bootstrap.servers" : args[1],

'group.id': args[2], # group id

'default.topic.config': {

'auto.offset.reset': 'earliest'

}

})

c.subscribe([ args[3] ]); # topic

i=0

while True:

msg = c.poll(1.0)

if msg is None:

continue

if msg.error():

if msg.error().code() == KafkaError._PARTITION_EOF:

continue

else:

print(msg.error())

break

try:

decompressed_data = zlib.decompress(msg.value(),zlib.MAX_WBITS|32) # header 自动检测

print decompressed_data

except Exception,e:

print e

c.close()

if __name__ == '__main__':

print sys.argv

run(sys.argv)

解压缩字节数组并读取文件内容

# -*- coding: utf-8 -*-

import sys,os

from confluent_kafka import Consumer, KafkaError

import tarfile, io

def run(args):

c = Consumer({

"bootstrap.servers" : args[1], # "10.10.20.11:9092"

'group.id': args[2], # group id, 每次不一样

'default.topic.config': {

'auto.offset.reset': 'earliest'

}

})

c.subscribe([ args[3] ]); # dc-diagnostic-report

i=0

while True:

msg = c.poll(1.0)

if msg is None:

continue

if msg.error():

if msg.error().code() == KafkaError._PARTITION_EOF:

continue

else:

print(msg.error())

break

try:

file_like_object = io.BytesIO(msg.value())

tar = tarfile.open(fileobj=file_like_object)

# use "tar" as a regular TarFile object

for member in tar.getmembers():

print(member)

f = tar.extractfile(member)

content = f.read()

print(content)

tar.close()

except Exception,e:

print e

c.close()

if __name__ == '__main__':

print sys.argv

run(sys.argv)

参考链接

python3 gzip解压_使用 Python 解压缩 gzip 数据流相关推荐

  1. gzip 解压_简简单单_百度空间

    gzip 解压_简简单单_百度空间 gzip 解压_简简单单_百度空间 问题: 入问题,问题答案立即呈现在您眼前! gzip格式rfc 1952 http://www.ietf.org/rfc/rfc ...

  2. linux gzip 解压 函数,获取http的gzip内容,并解压

    问题: 入问题,问题答案立即呈现在您眼前! 寻找gzip 获取一个网页数据返回的编码类型是gzip,我该怎么解压缩 HTTP头获取? 如何用vb获取网络上的xml文件,并解析内容 关于GZIP的解码 ...

  3. python调用winrar解压_批量文件解压缩脚本(Python3.5 + WinRAR)

    import os s = os.sep #全局变量 List_Err = [] #dirroot = "D:" + s + "实验报告" + s #要遍历的目 ...

  4. java字符串压缩js解压_接口实现后台GZIP压缩,pako.js 前端解压

    import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOExceptio ...

  5. bundle文件解压_通过sourcemap解压缩webpack 实战

    现在许多网站都使用webpack对网站打包,许多前端框架也默认配置好webpack.这会在渗透测试或挖洞过程中带来一些麻烦.这让我们极其痛苦.但是开发者忽视起潜在风险,在线上环境使用了开发环境的配置, ...

  6. httplib 和 httplib2区别之 gzip解压

    HTTP请求头Accept-encoding: gzip信息告诉服务器,如果它有任何新数据要发送给时,请以压缩的格式发送.如果服务器支持压缩,它将返回由 gzip 压缩的数据并且使用Content-e ...

  7. HTML中chunked解码和gzip解压

    chunked编码 chunked编码的的好处 当访问的时动态页面时,服务器则无法预知内容的大小,因此需要一遍产生数据,一边发送数据,将数据分块发送(服务器通过响应头'Transfer-Encodin ...

  8. linux环境下常用的打包、压缩、解压命令(tar、gzip、bzip2、zip)

    文章目录 前言 基础概念 打包/归档 压缩 解压 打包压缩 压缩解压命令 总结 前言 经常使用电脑的人常常会接触到压缩文件,不管是软件.数据还是资料,下载之后通常就是一个压缩包,在Windows平台上 ...

  9. 将多个文件压缩成gzip,将gzip解压成多个文件

          第一步:文件压缩和解压缩方法 //解压gzip文件public static boolean extractZip(File file, File parent) {ZipFile zf ...

  10. python怎么解压rar文件_用Python解压缩rar、zip文件的方法

    玩蛇网本文为大家提供关于用Python解压缩rar.zip文件的方法源码.Python语言对文件方面的处理还是很方便的,例如以前有为大家介绍过Python读取分割压缩TXT文本文件的方法.Python ...

最新文章

  1. 大神教你实现redis键空间通知
  2. 转行学python后悔了-转行学Python可以吗?
  3. 如何快速将Android库发布到JCenter
  4. Preload custom controller defined in runtime framework
  5. GDB and core
  6. 模块怎么用_IC设计方法:模块划分与overdesign
  7. 求助:关于sql如何统计时间的问题
  8. Blazor WebAssembly 3.2.0 已在塔架就位 将发射新一代前端SPA框架
  9. 高德地图轨迹回放_高德地图上线了一个新功能….
  10. 结合MSDN理解windows service 服务安装的三个类。
  11. Maven核心概念及Eclipse使用Maven
  12. two points
  13. micropython文件上传软件_MicroPython
  14. 中兴服务器 raid,中兴LIS等直通阵列卡卡硬盘检测工具
  15. java运行vbs_如何在Java中执行VBS脚本?
  16. html5视频在线剪辑,五种剪辑方法让视频更精彩
  17. 用canvas和原生JS写的一个flappy bird游戏
  18. cv::fitLine用法
  19. 对VC 一些常见问题的整理
  20. javaFX裁剪视频exe

热门文章

  1. Ubunut 下安装teamview
  2. 计算机科学与软件工程的关系
  3. 普华永道:2030年区块链价值将突破30万亿
  4. [贪心][区间dp]Zero-One Codeforces1733D1D2
  5. Python:实现拓扑排序算法(附完整源码)
  6. 薪酬体系:了解越多,满意越高
  7. 获取时间戳的4种方法
  8. python编写自动更换ip工具的代码
  9. 性能优化大牛 Brendan Gregg 的新书要来了
  10. TkMybatis的使用