эффективный круговой буфер?
Я хочу создать эффективную кольцевой буфер в python (с целью получения средних значений целых чисел в буфере).
это эффективный способ использовать список для сбора значений?
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
что было бы более эффективным (и почему)?
12 ответов:
Я хотел бы использовать
collections.dequeСmaxlenarg>>> import collections >>> d = collections.deque(maxlen=10) >>> d deque([], maxlen=10) >>> for i in xrange(20): ... d.append(i) ... >>> d deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)есть рецепт в документах для
dequeЭто похоже на то, что вы хотите. Мое утверждение, что это самый эффективный, полностью основано на том факте, что он реализован в C невероятно квалифицированной командой, которая имеет привычку проворачивать код высшего качества.
выскакивание из головы списка приводит к копированию всего списка, поэтому неэффективно
вместо этого вы должны использовать список / массив фиксированного размера и индекс, который перемещается через буфер при добавлении / удалении элементов
дек питона медленно. Вы также можете использовать numpy.ролл вместо этого Как вы вращаете числа в массиве numpy формы (n,) или (n, 1)?
в этот тест, дека 448ms. И NumPy.крен 29мс http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
ок с использованием класса deque, но для requeriments вопроса (средний) это мое решение:
>>> from collections import deque >>> class CircularBuffer(deque): ... def __init__(self, size=0): ... super(CircularBuffer, self).__init__(maxlen=size) ... @property ... def average(self): # TODO: Make type check for integer or floats ... return sum(self)/len(self) ... >>> >>> cb = CircularBuffer(size=10) >>> for i in range(20): ... cb.append(i) ... print "@%s, Average: %s" % (cb, cb.average) ... @deque([0], maxlen=10), Average: 0 @deque([0, 1], maxlen=10), Average: 0 @deque([0, 1, 2], maxlen=10), Average: 1 @deque([0, 1, 2, 3], maxlen=10), Average: 1 @deque([0, 1, 2, 3, 4], maxlen=10), Average: 2 @deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2 @deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3 @deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3 @deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4 @deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4 @deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5 @deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6 @deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7 @deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8 @deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9 @deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10 @deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11 @deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12 @deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13 @deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
на основе ответ MoonCactus, вот
circularlistкласса. Разница с его версией в том, что здесьc[0]всегда будет давать самый старый добавленный элемент,c[-1]последний добавленный элемент,c[-2]предпоследний... Это более естественно для приложений.c = circularlist(4) c.append(1); print c, c[0], c[-1] #[1] (1 items) first 1, last 1 c.append(2); print c, c[0], c[-1] #[1, 2] (2 items) first 1, last 2 c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items) first 1, last 3 c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items) first 1, last 8 c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10 c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11класс:
class circularlist(object): def __init__(self, size): """Initialization""" self.index = 0 self.size = size self._data = [] def append(self, value): """Append an element""" if len(self._data) == self.size: self._data[self.index] = value else: self._data.append(value) self.index = (self.index + 1) % self.size def __getitem__(self, key): """Get element by index, relative to the current index""" if len(self._data) == self.size: return(self._data[(key + self.index) % self.size]) else: return(self._data[key]) def __repr__(self): """Return string representation""" return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
вы также можете увидеть это довольно старый питон рецепт.
вот моя собственная версия с массивом NumPy:
#!/usr/bin/env python import numpy as np class RingBuffer(object): def __init__(self, size_max, default_value=0.0, dtype=float): """initialization""" self.size_max = size_max self._data = np.empty(size_max, dtype=dtype) self._data.fill(default_value) self.size = 0 def append(self, value): """append an element""" self._data = np.roll(self._data, 1) self._data[0] = value self.size += 1 if self.size == self.size_max: self.__class__ = RingBufferFull def get_all(self): """return a list of elements from the oldest to the newest""" return(self._data) def get_partial(self): return(self.get_all()[0:self.size]) def __getitem__(self, key): """get element""" return(self._data[key]) def __repr__(self): """return string representation""" s = self._data.__repr__() s = s + '\t' + str(self.size) s = s + '\t' + self.get_all()[::-1].__repr__() s = s + '\t' + self.get_partial()[::-1].__repr__() return(s) class RingBufferFull(RingBuffer): def append(self, value): """append an element when buffer is full""" self._data = np.roll(self._data, 1) self._data[0] = value
У меня была эта проблема, прежде чем делать последовательное Программирование. В то время чуть более года назад я тоже не мог найти эффективных реализаций, поэтому я закончил писать один как расширение C и это также доступно на pypi под лицензией MIT. Это супер базовый, только обрабатывает буферы 8-битных подписанных символов, но имеет гибкую длину, поэтому вы можете использовать Struct или что-то поверх него, если вам нужно что-то другое, чем символы. Теперь я вижу с помощью поиска google, что есть несколько вариантов в эти дни, хотя, так что вы можете посмотреть на них тоже.
этот не требует никакой библиотеки. Он растет список, а затем цикл внутри по индексу.
след очень мал (без библиотеки), и он работает в два раза быстрее, чем dequeue по крайней мере. Это действительно хорошо для вычисления скользящих средних, но имейте в виду, что элементы не сортируются по возрасту, как указано выше.
class CircularBuffer(object): def __init__(self, size): """initialization""" self.index= 0 self.size= size self._data = [] def record(self, value): """append an element""" if len(self._data) == self.size: self._data[self.index]= value else: self._data.append(value) self.index= (self.index + 1) % self.size def __getitem__(self, key): """get element by index like a regular array""" return(self._data[key]) def __repr__(self): """return string representation""" return self._data.__repr__() + ' (' + str(len(self._data))+' items)' def get_all(self): """return a list of all the elements""" return(self._data)чтобы получить среднее значение, например:
q= CircularBuffer(1000000); for i in range(40000): q.record(i); print "capacity=", q.size print "stored=", len(q.get_all()) print "average=", sum(q.get_all()) / len(q.get_all())результаты:
capacity= 1000000 stored= 40000 average= 19999 real 0m0.024s user 0m0.020s sys 0m0.000sЭто примерно 1/3 времени эквивалент из очереди.
хотя здесь уже есть большое количество отличных ответов, я не мог найти никакого прямого сравнения таймингов для упомянутых вариантов. Поэтому, пожалуйста, найдите мою скромную попытку сравнения ниже.
только в целях тестирования класс может переключаться между a
listбуфера, аcollections.deque-буфера, иNumpy.rollбуфера.отметим, что
updateметод добавляет только одно значение за раз, чтобы сохранить его простым.import numpy import timeit import collections class CircularBuffer(object): buffer_methods = ('list', 'deque', 'roll') def __init__(self, buffer_size, buffer_method): self.content = None self.size = buffer_size self.method = buffer_method def update(self, scalar): if self.method == self.buffer_methods[0]: # Use list try: self.content.append(scalar) self.content.pop(0) except AttributeError: self.content = [0.] * self.size elif self.method == self.buffer_methods[1]: # Use collections.deque try: self.content.append(scalar) except AttributeError: self.content = collections.deque([0.] * self.size, maxlen=self.size) elif self.method == self.buffer_methods[2]: # Use Numpy.roll try: self.content = numpy.roll(self.content, -1) self.content[-1] = scalar except IndexError: self.content = numpy.zeros(self.size, dtype=float) # Testing and Timing circular_buffer_size = 100 circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size, buffer_method=method) for method in CircularBuffer.buffer_methods] timeit_iterations = 1e4 timeit_setup = 'from __main__ import circular_buffers' timeit_results = [] for i, cb in enumerate(circular_buffers): # We add a convenient number of convenient values (see equality test below) code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format( i, circular_buffer_size) # Testing eval(code) buffer_content = [item for item in cb.content] assert buffer_content == range(circular_buffer_size) # Timing timeit_results.append( timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations))) print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format( cb.method, timeit_results[-1], timeit_results[-1] / timeit_iterations * 1e3)на мой система эта дает:
list: total 1.06s (0.11ms per iteration) deque: total 0.87s (0.09ms per iteration) roll: total 6.27s (0.63ms per iteration)
исходный вопрос был: "эффективное" кольцевой буфер. Согласно этой эффективности, запрошенной, ответ от aaronasterling кажется окончательно правильным. Использование выделенного класса, запрограммированного в Python, и сравнение времени обработки с коллекциями.дека показывает Х5.2 раза ускорение с двухсторонней очереди! Вот очень простой код, чтобы проверить это:
class cb: def __init__(self, size): self.b = [0]*size self.i = 0 self.sz = size def append(self, v): self.b[self.i] = v self.i = (self.i + 1) % self.sz b = cb(1000) for i in range(10000): b.append(i) # called 200 times, this lasts 1.097 second on my laptop from collections import deque b = deque( [], 1000 ) for i in range(10000): b.append(i) # called 200 times, this lasts 0.211 second on my laptopчтобы преобразовать deque в список, просто используйте:
my_list = [v for v in my_deque]вы получите O (1) случайный доступ к элементам очереди. Конечно, это ценно только в том случае, если вам нужно сделать много случайных обращений к Деку после его установки один раз.
вы отвечаете не правильно. Круговой буфер основной имеют два priciples (https://en.wikipedia.org/wiki/Circular_buffer)
- устанавливается длина буфера;
- первый в первый выход;
- когда вы добавляете или удаляете элемент, другие элементы не должны перемещать свою позицию
код ниже:
def add_to_buffer( self, num ): self.mylist.pop( 0 ) self.mylist.append( num )давайте рассмотрим ситуацию, когда список полон, используя ваш код:
self.mylist = [1, 2, 3, 4, 5]Теперь мы добавляем 6, список изменяется на
self.mylist = [2, 3, 4, 5, 6]элементы ожидают, что 1 в списке изменил свою позицию
ваш код-это очередь, а не буферной круг.
ответ Basj, я думаю, является наиболее эффективным.
кстати, буфер круга может imporve представление деятельности чтобы добавить элемент.
этот принцип применяется к некоторым буферам, предназначенным для хранения последних текстовых сообщений.
import time import datetime import sys, getopt class textbffr(object): def __init__(self, size_max): #initialization self.posn_max = size_max-1 self._data = [""]*(size_max) self.posn = self.posn_max def append(self, value): #append an element if self.posn == self.posn_max: self.posn = 0 self._data[self.posn] = value else: self.posn += 1 self._data[self.posn] = value def __getitem__(self, key): #return stored element if (key + self.posn+1) > self.posn_max: return(self._data[key - (self.posn_max-self.posn)]) else: return(self._data[key + self.posn+1]) def print_bffr(bffr,bffer_max): for ind in range(0,bffer_max): stored = bffr[ind] if stored != "": print(stored) print ( '\n' ) def make_time_text(time_value): return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2) + str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2) + str(time_value.second).zfill(2)) def main(argv): #Set things up starttime = datetime.datetime.now() log_max = 5 status_max = 7 log_bffr = textbffr(log_max) status_bffr = textbffr(status_max) scan_count = 1 #Main Loop # every 10 secounds write a line with the time and the scan count. while True: time_text = make_time_text(datetime.datetime.now()) #create next messages and store in buffers status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text) log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ") #print whole buffers so far print_bffr(log_bffr,log_max) print_bffr(status_bffr,status_max) time.sleep(2) scan_count += 1 if __name__ == '__main__': main(sys.argv[1:])
Comments