Source code for swimlane.core.cursor
import itertools
[docs]class Cursor(object):
def __init__(self):
self._elements = []
def __len__(self):
return len(list(self._evaluate()))
def __iter__(self):
for element in self._evaluate():
yield element
def __getitem__(self, item):
return self._evaluate()[item]
def _evaluate(self):
"""Hook to allow lazy evaluation or retrieval of cursor's elements
Defaults to simply returning list of self._elements
"""
return self._elements
[docs]class PaginatedCursor(Cursor):
"""Handle paginated lists, exposes hooks to simplify retrieval and parsing of paginated data"""
default_limit = 0
page_size = 10
def __init__(self, limit=None):
super(PaginatedCursor, self).__init__()
if limit is None:
limit = self.default_limit
self.__limit = limit
if self.__limit:
self.page_size = min(self.page_size, self.__limit)
def _evaluate(self):
"""Lazily retrieve and paginate report results and build Record instances from returned data"""
if self._elements:
for element in self._elements:
yield element
else:
for page in itertools.count():
raw_elements = self._retrieve_raw_elements(page)
for raw_element in raw_elements:
element = self._parse_raw_element(raw_element)
self._elements.append(element)
yield element
if self.__limit and len(self._elements) >= self.__limit:
break
if any([
len(raw_elements) < self.page_size,
(self.__limit and len(self._elements) >= self.__limit)
]):
break
def _retrieve_raw_elements(self, page):
"""Send request and return response for single page of data"""
raise NotImplementedError
def _parse_raw_element(self, raw_element):
"""Hook to override parsing individual raw elements just before yielding"""
return raw_element