gitlabculture
clone your own copy | download snapshot

Snapshots | iceberg

No images in this repository’s iceberg at this time

Inside this repository

api.py
text/x-python

Download raw (4.8 KB)

from urllib.request import URLError, urlopen
from urllib.parse import urlencode, urljoin
from settings import CACHE_DIR, API_URL, DEBUG, MAX_PAGES, PER_PAGE

import os.path
import os
import json
import datetime
import time

def debug (msg):
  if DEBUG:
    print(msg)

class ApiError(Exception):
    def __init__(self, url, what):
        self.url = url
        self.what = what
    def __str__(self):
        return '(%s) => %s'%(self.url,self.what)

class ApiCall (object):
  def __init__ (self, api_path, query = None, api_url = API_URL, cache_dir = CACHE_DIR,  paged = True):
    self.paged = paged
    self._cache = None
    self.api_path = list(map(str, api_path))
    self.api_url = api_url
    self.cache_dir = cache_dir
    self.query = query

  @property
  def cache_location(self):
    return os.path.join(self.cache_dir, '{}.json'.format('.'.join(self.api_path)))

  @property
  def url(self):
    return urljoin(self.api_url, '/'.join(self.api_path))

  @property
  def has_cache(self):
    return os.path.exists(self.cache_location)

  def invalidate_cache(self):
    if self.has_cache:
      debug('Invalidating cache `{}`'.format(self.cache_location))
      os.unlink(self.cache_location)

  def expire_cache_after (self, expiry_date):
    if self.has_cache:
      timestamp = os.path.getctime(self.cache_location)
      # TODO: implement timezone-info
      cache_date = datetime.datetime.fromtimestamp(timestamp)

      if expiry_date > cache_date:
        self.invalidate_cache()

  def write_cache(self, data):
    try:
      with open(self.cache_location, 'w') as h:
        h.write(self.prepare_cache(data))
    except ApiError as e:
      json.dump({
        'reason': e.what,
        'timestamp': datetime.datetime.now().timestamp()               
      }, open(self.cache_location, 'w'))
      data = []

    self._cache = data

  def read_cache(self):
    debug('Hit cache {}'.format(self.url))
    if not self._cache:
      with open(self.cache_location, 'r') as h:
        self._cache = self.parse_cache(h.read())
        
    return self._cache

  def _make_api_call(self, url, query = None):
    if query:

      url = '{}?{}'.format(url, urlencode(query))
    try:
      debug('Loading `{}`'.format(url))
      res = urlopen(url)
      return ({ k: v for (k,v) in res.getheaders() }, self.parse_api_result(res.read()))
    except URLError as e:
      if hasattr(e, 'reason'):
        raise ApiError(url, e.reason)
      # elif hasattr(e, 'code'):
      #   raise ApiError(url, e.code)
    return None

  def read_from_api (self):
    try:
      _, data = self._make_api_call(self.url, self.query)
      return data
    except ApiError:
      return None

  def get (self, force_call=False):
    if force_call or not self.has_cache:
      data = self.read_from_api()
      self.write_cache(data)
      
    return self.read_cache()

  def parse_api_result (self, raw):
    return raw.decode()

  def prepare_cache (self, raw):
    return raw
  
  def parse_cache (self, raw):
    return raw

class ApiCallJson(ApiCall):
  def parse_api_result(self, raw):
    return json.loads(raw)

  def prepare_cache(self, raw):
    return json.dumps(raw)

  def parse_cache(self, raw):
    return json.loads(raw)

  """
    Returns values for the call. If the request is paginated go through
    all pages
  """  
  def read_from_api(self):
    query = self.query.copy() if self.query else {}

    if self.paged:
      page = 1
      data = []
      query['per_page'] = PER_PAGE
      max_pages = MAX_PAGES

      try:
        while page and page < max_pages:
          query['page'] = page
          headers, page_data = self._make_api_call(self.url, query)
          data.extend(page_data)
          page = int(headers['X-Next-Page']) if headers['X-Next-Page'] else None
          if page:
            time.sleep(1/3)
      except ApiError:
        return data
    else:
      query['per_page'] = 1
      _, data = self._make_api_call(self.url, query)
      time.sleep(1/3)
    
    return data



class ApiCallRaw (ApiCall):
  multi_page = False

  @property
  def cache_location(self):
    return os.path.join(self.cache_dir, '{}.data'.format('.'.join(self.api_path)))

def get_group (group_id):
  return ApiCallJson(['groups', group_id], paged=False)

def get_project (project_id):
  return ApiCallJson(['projects', project_id], paged=False)

# A way to make a call, cache and be able te remove the cache when needed
# prepare a call, set of url and local cache file
def get_projects (group_id):
  return ApiCallJson(['groups', group_id, 'projects'])

def get_commits (project_id):
  return ApiCallJson(['projects', project_id, 'repository', 'commits'])

def get_tree (project_id, path=None):
  query = { 'path': path } if path else None
  return ApiCallJson(['projects', project_id, 'repository', 'tree'], query=query)

def get_raw (project_id, file_id):
  return ApiCallRaw(['projects', project_id, 'repository', 'blobs', file_id, 'raw'])