No images in this repository’s iceberg at this time
Download raw (4.8 KB)
from urllib.request import URLError, urlopen from urllib.parse import urlencode, urljoin from settings import CACHE_DIR, API_URL, DEBUG, MAX_PAGES, PER_PAGE import os.path import os import json import datetime import time def debug (msg): if DEBUG: print(msg) class ApiError(Exception): def __init__(self, url, what): self.url = url self.what = what def __str__(self): return '(%s) => %s'%(self.url,self.what) class ApiCall (object): def __init__ (self, api_path, query = None, api_url = API_URL, cache_dir = CACHE_DIR, paged = True): self.paged = paged self._cache = None self.api_path = list(map(str, api_path)) self.api_url = api_url self.cache_dir = cache_dir self.query = query @property def cache_location(self): return os.path.join(self.cache_dir, '{}.json'.format('.'.join(self.api_path))) @property def url(self): return urljoin(self.api_url, '/'.join(self.api_path)) @property def has_cache(self): return os.path.exists(self.cache_location) def invalidate_cache(self): if self.has_cache: debug('Invalidating cache `{}`'.format(self.cache_location)) os.unlink(self.cache_location) def expire_cache_after (self, expiry_date): if self.has_cache: timestamp = os.path.getctime(self.cache_location) # TODO: implement timezone-info cache_date = datetime.datetime.fromtimestamp(timestamp) if expiry_date > cache_date: self.invalidate_cache() def write_cache(self, data): try: with open(self.cache_location, 'w') as h: h.write(self.prepare_cache(data)) except ApiError as e: json.dump({ 'reason': e.what, 'timestamp': datetime.datetime.now().timestamp() }, open(self.cache_location, 'w')) data = [] self._cache = data def read_cache(self): debug('Hit cache {}'.format(self.url)) if not self._cache: with open(self.cache_location, 'r') as h: self._cache = self.parse_cache(h.read()) return self._cache def _make_api_call(self, url, query = None): if query: url = '{}?{}'.format(url, urlencode(query)) try: debug('Loading `{}`'.format(url)) res = urlopen(url) return ({ k: v for (k,v) in res.getheaders() }, self.parse_api_result(res.read())) except URLError as e: if hasattr(e, 'reason'): raise ApiError(url, e.reason) # elif hasattr(e, 'code'): # raise ApiError(url, e.code) return None def read_from_api (self): try: _, data = self._make_api_call(self.url, self.query) return data except ApiError: return None def get (self, force_call=False): if force_call or not self.has_cache: data = self.read_from_api() self.write_cache(data) return self.read_cache() def parse_api_result (self, raw): return raw.decode() def prepare_cache (self, raw): return raw def parse_cache (self, raw): return raw class ApiCallJson(ApiCall): def parse_api_result(self, raw): return json.loads(raw) def prepare_cache(self, raw): return json.dumps(raw) def parse_cache(self, raw): return json.loads(raw) """ Returns values for the call. If the request is paginated go through all pages """ def read_from_api(self): query = self.query.copy() if self.query else {} if self.paged: page = 1 data = [] query['per_page'] = PER_PAGE max_pages = MAX_PAGES try: while page and page < max_pages: query['page'] = page headers, page_data = self._make_api_call(self.url, query) data.extend(page_data) page = int(headers['X-Next-Page']) if headers['X-Next-Page'] else None if page: time.sleep(1/3) except ApiError: return data else: query['per_page'] = 1 _, data = self._make_api_call(self.url, query) time.sleep(1/3) return data class ApiCallRaw (ApiCall): multi_page = False @property def cache_location(self): return os.path.join(self.cache_dir, '{}.data'.format('.'.join(self.api_path))) def get_group (group_id): return ApiCallJson(['groups', group_id], paged=False) def get_project (project_id): return ApiCallJson(['projects', project_id], paged=False) # A way to make a call, cache and be able te remove the cache when needed # prepare a call, set of url and local cache file def get_projects (group_id): return ApiCallJson(['groups', group_id, 'projects']) def get_commits (project_id): return ApiCallJson(['projects', project_id, 'repository', 'commits']) def get_tree (project_id, path=None): query = { 'path': path } if path else None return ApiCallJson(['projects', project_id, 'repository', 'tree'], query=query) def get_raw (project_id, file_id): return ApiCallRaw(['projects', project_id, 'repository', 'blobs', file_id, 'raw'])