Каков самый быстрый способ отправить 100 000 HTTP-запросов в Python?



Я открываю файл, который имеет 100 000 URL-адресов. Мне нужно отправить HTTP-запрос для каждого URL-адреса, и печатать код состояния. Я использую Python 2.6, и до сих пор смотрел на многие запутанные способы реализации Python threading/concurrency. Я даже посмотрел на питона согласие библиотеки, но не могу понять, как правильно написать эту программу. Кто-нибудь сталкивался с подобной проблемой? Я думаю, вообще мне нужно знать, как выполнять тысячи задач в Python так быстро, как возможно - я полагаю, что это означает "одновременно".

1029   13  

13 ответов:

решение без скручивания:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Это один slighty быстрее, чем витой решение и использует меньше процессора.

решение с использованием торнадо асинхронная сетевая библиотека

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

темы абсолютно не ответ здесь. Они будут обеспечивать как узкие места процесса, так и ядра, а также ограничения пропускной способности, которые неприемлемы, если общая цель - "самый быстрый способ".

немного twisted и его асинхронный HTTP клиента даст вам гораздо лучшие результаты.

использовать grequests, это комбинация запросов + модуль Gevent .

GRequests позволяет использовать запросы с Gevent, чтобы сделать asyncronous HTTP-запросы легко.

использование просто:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

создать набор неотправленных запросов:

>>> rs = (grequests.get(u) for u in urls)

отправить их все в то же время:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

все изменилось совсем немного с 2010 года, когда это было опубликовано, и я не пробовал все другие ответы, но я пробовал несколько, и я нашел, что это работает лучше всего для меня, используя python3.6.

я смог получить около ~150 уникальных доменов в секунду, работающих на AWS.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

Если вы хотите получить максимальную производительность, вы можете рассмотреть возможность использования асинхронного ввода-вывода, а не потоков. Накладные расходы, связанные с тысячами потоков ОС, нетривиальны, и переключение контекста в интерпретаторе Python добавляет еще больше на него. Потоковая передача, безусловно, выполнит эту работу, но я подозреваю, что асинхронный маршрут обеспечит лучшую общую производительность.

в частности, я бы предложил асинхронный веб-клиент в Twisted библиотека (http://www.twistedmatrix.com). он имеет, по общему признанию, крутую кривую обучения, но его довольно легко использовать, как только вы получите хорошую ручку на стиле асинхронного программирования Twisted.

A HowTo на асинхронном API веб-клиента Twisted доступен по адресу:

http://twistedmatrix.com/documents/current/web/howto/client.html

в идеальном мире это просто означало бы одновременный запуск 100 000 потоков, которые выводят свои результаты в словарь или список для последующей обработки, но на практике вы ограничены в том, сколько параллельных HTTP-запросов вы можете выдать таким образом. Локально, у вас есть ограничения в том, сколько розеток вы можете откройте одновременно, сколько потоков выполнения позволит ваш интерпретатор Python. Удаленно вы можете быть ограничены в количестве одновременных подключений, если все запросы направлены против одного сервера или нескольких. Эти ограничения, вероятно, потребуют, чтобы вы написали сценарий таким образом, чтобы опрашивать только небольшую часть URL-адресов в любое время (100, как упоминалось в другом плакате, вероятно, является приличным размером пула потоков, хотя вы можете обнаружить, что вы можете успешно развернуть многие более.)

вы можете следовать этому шаблону проектирования, чтобы решить вышеуказанную проблему:

  1. запустите поток, который запускает новые потоки запроса до тех пор, пока количество запущенных потоков (вы можете отслеживать их с помощью threading.active_count () или путем нажатия объектов потока в структуру данных) - это >= максимальное количество одновременных запросов (скажем, 100), а затем спит в течение короткого тайм-аута. Этот поток должен завершиться, когда нет больше URL-адресов для обработки. Таким образом, поток будет продолжать просыпаться, запускать новые потоки и спать, пока вы не закончите.
  2. пусть потоки запросов хранят свои результаты в некоторой структуре данных для последующего извлечения и вывода. Если структура, в которой вы храните результаты, является list или dict в CPython, вы можете безопасно добавлять или вставлять уникальные элементы из ваших потоков без блокировки, но если вы пишете в файл или требуете более сложного межпоточного взаимодействия данных вы должны использовать взаимное исключение блокировки для защиты этого государства от коррупции.

Я бы предложил вам использовать резьбонарезной модуль. Вы можете использовать его для запуска и отслеживания запущенных потоков. Поддержка потоков Python является голой, но описание вашей проблемы предполагает, что она полностью достаточна для ваших нужд.

наконец, если вы хотите увидеть довольно простое приложение параллельного сетевого приложения, написанного на Python, проверьте ssh.py. это небольшая библиотека, которая использует Python threading для распараллеливания многих SSH-соединений. Конструкция достаточно близко к вашим требованиям, что вы можете найти его, чтобы быть хорошим ресурсом.

решение:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

С помощью нить бассейн это хороший вариант, и сделает это довольно легко. К сожалению, python не имеет стандартной библиотеки, которая делает пулы потоков очень легкими. Но вот приличная библиотека, которая должна помочь вам начать: http://www.chrisarndt.de/projects/threadpool/

пример кода с их сайта:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

надеюсь, что это помогает.

для вашего случая резьба, вероятно, будет делать трюк, поскольку вы, вероятно, будете тратить большую часть времени на ожидание ответа. Есть полезные модули, такие как очереди в стандартной библиотеке, которая может помочь.

Я сделал аналогичную вещь с параллельной загрузкой файлов раньше, и это было достаточно хорошо для меня, но это было не в том масштабе, о котором вы говорите.

Если ваша задача была более привязана к процессору, вы можете посмотреть на многопроцессорная обработка модуль, который позволит вам использовать больше процессоров / ядер / потоков (больше процессов, которые не будут блокировать друг друга, так как блокировка выполняется для каждого процесса)

рассмотрите возможность использования Мельница, хотя ветряная мельница, вероятно, не может сделать это много потоков.

вы можете сделать это с помощью ручного скрипта Python на 5 машинах, каждый из которых подключается к исходящим портам 40000-60000, открывая 100 000 портов.

кроме того, это может помочь сделать образец теста с красиво резьбовым приложением QA, таким как OpenSTA для того, чтобы получить представление о том, сколько каждый сервер может обрабатывать.

кроме того, попробуйте посмотреть просто используя простой Perl с классом LWP:: ConnCache. Вы, вероятно, получите больше производительности (больше соединений) таким образом.

этот скрученный асинхронный веб-клиент работает довольно быстро.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

самый простой способ-использовать встроенную библиотеку потоков Python. они не "настоящие" / потоки ядра у них есть проблемы (например, сериализация), но они достаточно хороши. Вам нужен пул очереди и потоков. Один из вариантов -здесь, но это тривиально, чтобы написать свой собственный. Вы не можете распараллелить все 100 000 вызовов, но вы можете запустить 100 (или около того) из них одновременно.

Comments

    Ничего не найдено.