Source code for rate_limited_queue.queue

import traceback
from collections import Iterable
from warnings import warn

from rate_limit import RateLimit


[docs]class RateLimitedQueue(object): def __init__(self, items, processing_function, rate_limits): """ Takes a list of items to be processed, along with a function with which to process them, and a list of ``RateLimit`` objects. :param items: Items to be processed. :type items: list or other iterable :param processing_function: The function to call when processing an item. :type processing_function: function :param rate_limits: One or more ``RateLimit`` objects. :type rate_limits: list or other iterable """ self.items = [x for x in items] self.processing_function = processing_function self.rate_limits = rate_limits self._validate_params() def _validate_params(self): if not isinstance(self.rate_limits, Iterable): raise ValueError("The 'rate_limits' parameter must be a list!") if not all([isinstance(x, RateLimit) for x in self.rate_limits]): raise ValueError("The 'rate_limits' parameter must be a list of RateLimits!") if not hasattr(self.processing_function, '__call__'): raise ValueError("The 'processing_function' parameter must be a function!") def can_process(self): return all([not r.is_exceeded() for r in self.rate_limits])
[docs] def process(self): """ Processes the list of ``items`` via application of ``processing_function``, but without violating any of the ``rate_limits``. :rtype: list of processed items """ result = [] while self.items: if self.can_process(): item = self.items.pop(0) processed_item = None try: processed_item = self.processing_function(item) except: traceback.print_exc() warn("Could not process this item: '%s'" % item) result.append(processed_item) for rl in self.rate_limits: rl.num_processed_this_interval += 1 return result