python大文件排序_Python 大文件排序
1.[代码][Python]代码
import gzip
import os
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
from datetime import datetime
def sort_worker(input,output):
while True:
lines = input.get().splitlines()
element_set = {}
for line in lines:
if line.strip() == 'STOP':
return
try:
element = line.split(' ')[0]
if not element_set.get(element): element_set[element] = ''
except:
pass
sorted_element = sorted(element_set)
#print sorted_element
output.put('\n'.join(sorted_element))
def write_worker(input, pre):
os.system('mkdir %s'%pre)
i = 0
while True:
content = input.get()
if content.strip() == 'STOP':
return
write_sorted_bulk(content, '%s/%s'%(pre, i))
i += 1
def write_sorted_bulk(content, filename):
f = file(filename, 'w')
f.write(content)
f.close()
def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4):
t = datetime.now()
pre, ext = os.path.splitext(filename)
if ext == '.gz':
file_file = gzip.open(filename, 'rb')
else:
file_file = open(filename)
bulk_queue = Queue(10)
sorted_queue = Queue(10)
NUM_SORT = num_sort
sort_worker_pool = []
for i in range(NUM_SORT):
sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) )
sort_worker_pool[i].start()
NUM_WRITE = 1
write_worker_pool = []
for i in range(NUM_WRITE):
write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) )
write_worker_pool[i].start()
buf = file_file.read(buf_size)
sorted_count = 0
while len(buf):
end_line = buf.rfind('\n')
#print buf[:end_line+1]
bulk_queue.put(buf[:end_line+1])
sorted_count += 1
if end_line != -1:
buf = buf[end_line+1:] + file_file.read(buf_size)
else:
buf = file_file.read(buf_size)
for i in range(NUM_SORT):
bulk_queue.put('STOP')
for i in range(NUM_SORT):
sort_worker_pool[i].join()
for i in range(NUM_WRITE):
sorted_queue.put('STOP')
for i in range(NUM_WRITE):
write_worker_pool[i].join()
print 'elasped ', datetime.now() - t
return sorted_count
from heapq import heappush, heappop
from datetime import datetime
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
import os
class file_heap:
def __init__(self, dir, idx = 0, count = 1):
files = os.listdir(dir)
self.heap = []
self.files = {}
self.bulks = {}
self.pre_element = None
for i in range(len(files)):
file = files[i]
if hash(file) % count != idx: continue
input = open(os.path.join(dir, file))
self.files[i] = input
self.bulks[i] = ''
heappush(self.heap, (self.get_next_element_buffered(i), i))
def get_next_element_buffered(self, i):
if len(self.bulks[i]) < 256:
if self.files[i] is not None:
buf = self.files[i].read(65536)
if buf:
self.bulks[i] += buf
else:
self.files[i].close()
self.files[i] = None
end_line = self.bulks[i].find('\n')
if end_line == -1:
end_line = len(self.bulks[i])
element = self.bulks[i][:end_line]
self.bulks[i] = self.bulks[i][end_line+1:]
return element
def poppush_uniq(self):
while True:
element = self.poppush()
if element is None:
return None
if element != self.pre_element:
self.pre_element = element
return element
def poppush(self):
try:
element, index = heappop(self.heap)
except IndexError:
return None
new_element = self.get_next_element_buffered(index)
if new_element:
heappush(self.heap, (new_element, index))
return element
def heappoppush(dir, queue, idx = 0, count = 1):
heap = file_heap(dir, idx, count)
while True:
d = heap.poppush_uniq()
queue.put(d)
if d is None: return
def heappoppush2(dir, queue, count = 1):
heap = []
procs = []
queues = []
pre_element = None
for i in range(count):
q = Queue(1024)
q_buf = queue_buffer(q)
queues.append(q_buf)
p = Process(target=heappoppush, args=(dir, q_buf, i, count))
procs.append(p)
p.start()
queues = tuple(queues)
for i in range(count):
heappush(heap, (queues[i].get(), i))
while True:
try:
d, i= heappop(heap)
except IndexError:
queue.put(None)
for p in procs:
p.join()
return
else:
if d is not None:
heappush(heap,(queues[i].get(), i))
if d != pre_element:
pre_element = d
queue.put(d)
def merge_file(dir):
heap = file_heap( dir )
os.system('rm -f '+dir+'.merge')
fmerge = open(dir+'.merge', 'a')
element = heap.poppush_uniq()
fmerge.write(element+'\n')
while element is not None:
element = heap.poppush_uniq()
fmerge.write(element+'\n')
class queue_buffer:
def __init__(self, queue):
self.q = queue
self.rbuf = []
self.wbuf = []
def get(self):
if len(self.rbuf) == 0:
self.rbuf = self.q.get()
r = self.rbuf[0]
del self.rbuf[0]
return r
def put(self, d):
self.wbuf.append(d)
if d is None or len(self.wbuf) > 1024:
self.q.put(self.wbuf)
self.wbuf = []
def diff_file(file_old, file_new, file_diff, buf = 268435456):
print 'buffer size', buf
from file_split import split_sort_file
os.system('rm -rf '+ os.path.splitext(file_old)[0] )
os.system('rm -rf '+ os.path.splitext(file_new)[0] )
t = datetime.now()
split_sort_file(file_old,5,buf)
split_sort_file(file_new,5,buf)
print 'split elasped ', datetime.now() - t
os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0])
os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0])
os.system('rm -f '+file_diff)
t = datetime.now()
zdiff = open(file_diff, 'a')
old_q = Queue(1024)
new_q = Queue(1024)
old_queue = queue_buffer(old_q)
new_queue = queue_buffer(new_q)
h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3))
h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3))
h1.start(), h2.start()
old = old_queue.get()
new = new_queue.get()
old_count, new_count = 0, 0
while old is not None or new is not None:
if old > new or old is None:
zdiff.write('< '+new+'\n')
new = new_queue.get()
new_count +=1
elif old < new or new is None:
zdiff.write('> '+old+'\n')
old = old_queue.get()
old_count +=1
else:
old = old_queue.get()
new = new_queue.get()
print 'new_count:', new_count
print 'old_count:', old_count
print 'diff elasped ', datetime.now() - t
h1.join(), h2.join()
python大文件排序_Python 大文件排序相关推荐
- python选择排序从大到小_Python实现选择排序
一.选择排序简介 选择排序(Selection sort)是一种简单直观的排序算法.选择排序首先从待排序列表中找到最小(大)的元素,存放到元素列表的起始位置(与起始位置进行交换),作为已排序序列,第一 ...
- python实现文件管理系统_Python使用文件操作实现一个XX信息管理系统的示例
大家好,我是第一次python学了一个学期,期末要完成一个毕业生信息管理系统大作业的小韩了,由于上次没有仔细看开发实现的要求,实现了一个简单的毕业生信息管理系统,而这次专门整理了两种使用文件进行保存数 ...
- python用户输入字符串串从小到大排序_python字符串从小到大排序
python字符串排序问题 a='AADFabddefgilmrsss' 请将该字符串a里的单词重新排序(a-z),并且重def char_cmp(a, b): #实现你的比较规则, 分太少,不值为你 ...
- python读写文件函数_Python读写文件
转载自:http://blog.csdn.net/adupt/article/details/4435615 1.open 使用open打开文件后一定要记得调用文件对象的close()方法.比如可以用 ...
- python pdf处理 图片_python PDF文件合并、图片处理
一.合并多个PDF文件 实例: #导入模块import codecs import os import PyPDF2 as PyPDF2 #建立一个装pdf文件的数组 files = list()#遍 ...
- python八大选择排序_Python实现选择排序
选择排序: 选择排序(Selection sort)是一种简单直观的 排序算法 .它的工作原理如下.首先在未排序序列中找到最小(大)元素,存放到排序序列的起始位置,然后,再从剩余未排序元素中继续寻找最 ...
- python如何进入文件夹_python之文件的读写和文件目录以及文件夹的操作实现代码...
这篇文章主要介绍了python之文件的读写和文件目录以及文件夹的操作实现代码,需要的朋友可以参考下 为了安全起见,最好还是给打开的文件对象指定一个名字,这样在完成操作之后可以迅速关闭文件,防止一些无用 ...
- python素材和代码_python之文件和素材
11.1 打开文件 open函数 open(name[,mode[,buffering]]) >>>f = open(r'C:\text\somefile.txt') 11.1.1 ...
- python读取坐标文本文件_Python 实现文件读写、坐标寻址、查找替换功能
读文件 打开文件(文件需要存在) #打开文件 f = open("data.txt","r") #设置文件对象 print(f)#文件句柄 f.close() ...
- python文本编码转换_python实现文件批量编码转换及注意事项
起因:大三做日本交换生期间在修一门C语言图像处理的编程课,在配套书籍的网站上下载了sample,但是由于我用的ubuntu18.04系统默认用utf-8编码,而文件源码是Shift_JIS编码,因而文 ...
最新文章
- jenkins配置git
- Python入门100题 | 第038题
- Maven 修改本地存储库位置--转
- 百度搜索结果 转换_如何让图片出现在百度搜索结果里出现?
- wsl设置c盘自动挂载到wsl中的/c/目录下
- Flink的主要特点及与Spark的对比
- boost::signals2::signal_type相关的测试程序
- boost的chrono模块周期计数延迟的测试程序
- js对象数组中的某属性值 拼接成字符串
- pat 乙级 1030 完美数列(C++)
- Hashtable和HashMap类的区别
- java网上订餐系统怎么做_基于Java的网上订餐系统
- html调用本地电脑应用,实现HTML调用打开本地软件文件
- 如何提升Javascript 基础
- Metasploit之——基本后渗透命令
- 北大AI公开课第十课--人工智能在生命科学中的应用by碳云智能李英睿
- 超好用的内网穿透工具【永久免费不限制流量】
- python绘制指数函数图像及性质_python实现画出e指数函数的图像
- mac 下禁止顽固的开机程序自启
- 网站建设SEO推广说明
热门文章
- selenium+python设置爬虫代理IP的方法
- laravel 运用
- Java并发和多线程3:线程调度和有条件取消调度
- webapp,ios Safari打开新窗口
- Oracle分析函数之FIRST_VALUE和LAST_VALUE
- ie 访问 java接口_2019年面试总结,100道Java程序员面试题(含答案)分享
- python之定制多种彩虹色爱心
- Gstreamer之video转码(七)
- android启动过程之init.rc文件浅析
- python3.7中Gluonts与Mxnet安装问题