Обработка одного файла из нескольких процессов в python
У меня есть один большой текстовый файл, в котором я хочу обрабатывать каждую строку ( делать некоторые операции ) и хранить их в базе данных. Поскольку одна простая программа занимает слишком много времени, я хочу, чтобы это было сделано с помощью нескольких процессов или потоков.
Каждый поток / процесс должен считывать разные данные (разные строки) из этого одного файла и выполнять некоторые операции над их фрагментом данных (строк) и помещать их в базу данных, чтобы в конце концов у меня были все обработанные данные, и моя база данных сбрасывается с помощью данные, которые мне нужны.
но я не в состоянии понять, что, как подойти к этому.
3 ответов:
то, что вы ищете производитель/потребитель шаблон
простой пример многопоточности
вот основной пример использования резьбонарезной модуль (вместо мультипроцессирование)
import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit()вы не будете делиться файловым объектом с потоками. Вы бы производить работу для них, поставляя очереди со строками данных. Затем каждый поток возьмет строку, обработает ее, а затем вернет ее в очередь.
есть несколько более продвинутых объектов, встроенных в модуль многопроцессорной для обмена данными, как списки и особый вид очереди. Есть компромиссы с использованием многопроцессорных vs-потоков, и это зависит от того, связана ли ваша работа с процессором или связана с IO.
базовая многопроцессорная обработка.Например, бассейн
вот действительно простой пример многопроцессорного пула
from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print resultsБассейн это удобный объект, который управляет своими собственными процессами. Поскольку открытый файл может перебирать свои строки, вы можете передать его в
pool.map(), который будет перебирать его и доставлять строки в рабочую функцию. карта блоки и возвращает весь результат, когда его сделали. Имейте в виду, что это слишком упрощенный пример, и чтоpool.map()будет читать весь файл в память сразу, прежде чем выполнять работу. Если вы ожидаете иметь большие файлы, имейте это в виду. Есть более продвинутые способы разработки настройки производителя / потребителя.ручной " пул " с ограничением и повторной сортировкой строк
это ручной пример бассейн.карта, но вместо того, чтобы потреблять всю итерацию за один раз, вы можете установить размер очереди, чтобы вы только подавали его по частям так быстро, как он может обрабатывать. Я также добавил номера строк, чтобы вы могли отслеживать их и ссылаться на них, если хотите, позже.
from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results)
вот действительно глупый пример, который я приготовил:
import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ...сложная часть здесь состоит в том, чтобы убедиться, что мы разделили файл на символы новой строки, чтобы вы не пропустили ни одной строки (или только читали частичные строки). Затем каждый процесс считывает свою часть файла и возвращает объект, который может быть помещен в базу данных основным потоком. Конечно, вы даже можете сделать эту часть в кусках, так что вам не придется держать всю информацию в памяти сразу. (вот довольно легко выполнить - просто разделите список "args" на X кусков и вызовите
pool.map(wrapper,chunk)-- см. здесь)
хорошо разбить один большой файл на несколько небольших файлов и каждый из них обрабатывается в отдельных потоках.
Comments