gitlabculture
clone your own copy | download snapshot

Snapshots | iceberg

No images in this repository’s iceberg at this time

Inside this repository

api.py
text/x-python

Download raw (3.1 KB)

from urllib.request import URLError, urlopen
from urllib.parse import urlencode, urljoin
import os.path
import os
import json
import datetime

CACHE_DIR = os.path.join(os.path.dirname(__file__), 'cache')
API_URL = "https://gitlab.constantvzw.org/api/v4/"
GROUP_ID = 8

DEBUG = True
MAX_PAGES = 100
PER_PAGE = 100

def debug (msg):
  if DEBUG:
    print(msg)

class ApiError(Exception):
    def __init__(self, url, what):
        self.url = url
        self.what = what
    def __str__(self):
        return '(%s) => %s'%(self.url,self.what)

class ApiCall (object):
  def __init__ (self, api_path, api_url = API_URL, cache_dir = CACHE_DIR):
    self._cache = None
    self.api_path = list(map(str, api_path))
    self.api_url = api_url
    self.cache_dir = cache_dir

  @property
  def cache_file(self):
    return os.path.join(self.cache_dir, '{}.json'.format('.'.join(self.api_path)))

  @property
  def url(self):
    return urljoin(self.api_url, '/'.join(self.api_path))

  @property
  def has_cache(self):
    return os.path.exists(self.cache_file)

  def invalidate_cache(self):
    if self.has_cache:
      os.unlink(self.cache_file)

  def make_cache(self):
    try:
      obj = self.get_api()
      json.dump(obj, open(self.cache_file, 'w'))
    except ApiError as e:
      json.dump({
        'reason': e.what,
        'timestamp': datetime.datetime.now().timestamp()               
      }, open(self.cache_file, 'w'))
      obj = []

    self._cache = obj

  def load_cache(self):
    debug('Hit cache {}'.format(self.url))
    if not self._cache:
      obj = json.load(open(self.cache_file, 'r'))

      if 'reason' in obj:
        self._cache = []
      else:
        self._cache = obj

    return self._cache

  """
    Returns values for the call. If the request is paginated go through
    all pages
  """
  def get_api(self):
    page = 1
    items = []
    while page < MAX_PAGES:
      headers, pageitems = self.get_api_page(page)
      items.extend(pageitems)
      if page >= int(headers['X-Total-Pages']):
        return items
      else:
        page += 1

  def get_api_page(self, page=0):
    debug('{}, page {}'.format(self.url, page))

    try:
      q = urlencode({'page': page, 'per_page': PER_PAGE})
      url = '{}?{}'.format(self.url, q)
      res = urlopen(url)
      return ({ k: v for (k,v) in res.getheaders() }, json.loads(res.read()))
    except URLError as e:
      if hasattr(e, 'reason'):
        raise ApiError(url, e.reason)
      elif hasattr(e, 'code'):
        raise ApiError(url, e.code)
    return None

  def get (self):
    if self.has_cache:
      return self.load_cache()
    else:
      self.make_cache()
      return self.load_cache()
    
# A way to make a call, cache and be able te remove the cache when needed
# prepare a call, set of url and local cache file
def group_projects (group = GROUP_ID):
  return ApiCall(['groups', group, 'projects'])

def commits (project_id = None):
  if project_id is not None:
    return ApiCall(['projects', project_id, 'repository', 'commits'])

def tree (project_id = None):
  if project_id is not None:
    return ApiCall(['projects', project_id, 'repository', 'tree'])