最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

Python实现大文件排序的方法

来源:动视网 责编:小采 时间:2020-11-27 14:34:21
文档

Python实现大文件排序的方法

Python实现大文件排序的方法:本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下: import gzip import os from multiprocessing import Process, Queue, Pipe, current_process, freeze_support from datet
推荐度:
导读Python实现大文件排序的方法:本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下: import gzip import os from multiprocessing import Process, Queue, Pipe, current_process, freeze_support from datet


本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:

import gzip
import os
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
from datetime import datetime
def sort_worker(input,output):
 while True:
 lines = input.get().splitlines()
 element_set = {}
 for line in lines:
 if line.strip() == 'STOP':
 return
 try:
 element = line.split(' ')[0]
 if not element_set.get(element): element_set[element] = ''
 except:
 pass
 sorted_element = sorted(element_set)
 #print sorted_element
 output.put('
'.join(sorted_element))
def write_worker(input, pre):
 os.system('mkdir %s'%pre)
 i = 0
 while True:
 content = input.get()
 if content.strip() == 'STOP':
 return
 write_sorted_bulk(content, '%s/%s'%(pre, i))
 i += 1
def write_sorted_bulk(content, filename):
 f = file(filename, 'w')
 f.write(content)
 f.close()
def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4):
 t = datetime.now()
 pre, ext = os.path.splitext(filename)
 if ext == '.gz':
 file_file = gzip.open(filename, 'rb')
 else:
 file_file = open(filename)
 bulk_queue = Queue(10)
 sorted_queue = Queue(10)
 NUM_SORT = num_sort
 sort_worker_pool = []
 for i in range(NUM_SORT):
 sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) )
 sort_worker_pool[i].start()
 NUM_WRITE = 1
 write_worker_pool = []
 for i in range(NUM_WRITE):
 write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) )
 write_worker_pool[i].start()
 buf = file_file.read(buf_size)
 sorted_count = 0
 while len(buf):
 end_line = buf.rfind('
')
 #print buf[:end_line+1]
 bulk_queue.put(buf[:end_line+1])
 sorted_count += 1
 if end_line != -1:
 buf = buf[end_line+1:] + file_file.read(buf_size)
 else:
 buf = file_file.read(buf_size)
 for i in range(NUM_SORT):
 bulk_queue.put('STOP')
 for i in range(NUM_SORT):
 sort_worker_pool[i].join()
 
 for i in range(NUM_WRITE):
 sorted_queue.put('STOP')
 for i in range(NUM_WRITE):
 write_worker_pool[i].join()
 print 'elasped ', datetime.now() - t
 return sorted_count
from heapq import heappush, heappop
from datetime import datetime
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
import os
class file_heap:
 def __init__(self, dir, idx = 0, count = 1):
 files = os.listdir(dir)
 self.heap = []
 self.files = {}
 self.bulks = {}
 self.pre_element = None
 for i in range(len(files)):
 file = files[i]
 if hash(file) % count != idx: continue
 input = open(os.path.join(dir, file))
 self.files[i] = input
 self.bulks[i] = ''
 heappush(self.heap, (self.get_next_element_buffered(i), i))
 def get_next_element_buffered(self, i):
 if len(self.bulks[i]) < 256:
 if self.files[i] is not None:
 buf = self.files[i].read(65536)
 if buf:
 self.bulks[i] += buf
 else:
 self.files[i].close()
 self.files[i] = None
 end_line = self.bulks[i].find('
')
 if end_line == -1:
 end_line = len(self.bulks[i])
 element = self.bulks[i][:end_line]
 self.bulks[i] = self.bulks[i][end_line+1:]
 return element
 def poppush_uniq(self):
 while True:
 element = self.poppush()
 if element is None:
 return None
 if element != self.pre_element:
 self.pre_element = element
 return element
 def poppush(self):
 try:
 element, index = heappop(self.heap)
 except IndexError:
 return None
 new_element = self.get_next_element_buffered(index)
 if new_element:
 heappush(self.heap, (new_element, index))
 return element
def heappoppush(dir, queue, idx = 0, count = 1):
 heap = file_heap(dir, idx, count)
 while True:
 d = heap.poppush_uniq()
 queue.put(d)
 if d is None: return
def heappoppush2(dir, queue, count = 1):
 heap = []
 procs = []
 queues = []
 pre_element = None
 for i in range(count):
 q = Queue(1024)
 q_buf = queue_buffer(q)
 queues.append(q_buf)
 p = Process(target=heappoppush, args=(dir, q_buf, i, count))
 procs.append(p)
 p.start()
 queues = tuple(queues)
 for i in range(count):
 heappush(heap, (queues[i].get(), i))
 while True:
 try:
 d, i= heappop(heap)
 except IndexError:
 queue.put(None)
 for p in procs:
 p.join()
 return
 else:
 if d is not None:
 heappush(heap,(queues[i].get(), i))
 if d != pre_element:
 pre_element = d
 queue.put(d)
def merge_file(dir):
 heap = file_heap( dir )
 os.system('rm -f '+dir+'.merge')
 fmerge = open(dir+'.merge', 'a')
 element = heap.poppush_uniq()
 fmerge.write(element+'
')
 while element is not None:
 element = heap.poppush_uniq()
 fmerge.write(element+'
')
class queue_buffer:
 def __init__(self, queue):
 self.q = queue
 self.rbuf = []
 self.wbuf = []
 def get(self):
 if len(self.rbuf) == 0:
 self.rbuf = self.q.get()
 r = self.rbuf[0]
 del self.rbuf[0]
 return r
 def put(self, d):
 self.wbuf.append(d)
 if d is None or len(self.wbuf) > 1024:
 self.q.put(self.wbuf)
 self.wbuf = []
def diff_file(file_old, file_new, file_diff, buf = 268435456):
 print 'buffer size', buf
 from file_split import split_sort_file
 os.system('rm -rf '+ os.path.splitext(file_old)[0] )
 os.system('rm -rf '+ os.path.splitext(file_new)[0] )
 t = datetime.now()
 split_sort_file(file_old,5,buf)
 split_sort_file(file_new,5,buf)
 print 'split elasped ', datetime.now() - t
 os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0])
 os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0])
 os.system('rm -f '+file_diff)
 t = datetime.now()
 zdiff = open(file_diff, 'a')
 old_q = Queue(1024)
 new_q = Queue(1024)
 old_queue = queue_buffer(old_q)
 new_queue = queue_buffer(new_q)
 h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3))
 h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3))
 h1.start(), h2.start()
 old = old_queue.get()
 new = new_queue.get()
 old_count, new_count = 0, 0
 while old is not None or new is not None:
 if old > new or old is None:
 zdiff.write('< '+new+'
')
 new = new_queue.get()
 new_count +=1
 elif old < new or new is None:
 zdiff.write('> '+old+'
')
 old = old_queue.get()
 old_count +=1
 else:
 old = old_queue.get()
 new = new_queue.get()
 print 'new_count:', new_count
 print 'old_count:', old_count
 print 'diff elasped ', datetime.now() - t
 h1.join(), h2.join()

希望本文所述对大家的Python程序设计有所帮助。

文档

Python实现大文件排序的方法

Python实现大文件排序的方法:本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下: import gzip import os from multiprocessing import Process, Queue, Pipe, current_process, freeze_support from datet
推荐度:
标签: 文件 方法 大文件
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top