le75
clone your own copy | download snapshot

Snapshots | iceberg

Inside this repository

scrape.py
text/x-python

Download raw (3.3 KB)

from django.core.management.base import BaseCommand
from django.utils.html import strip_tags
from django.utils.text import Truncator
from django.core.files import File
from django.core.files.temp import NamedTemporaryFile

import os
import os.path
from urllib.parse import urlparse

import requests
import html5lib
from cssselect import GenericTranslator, SelectorError
from scraper.models import Feed, FeedEntry


def clean(txt):
    #  rex = re.compile(r'\W+')
    #  txt = strip_tags(txt)
    txt = txt.strip().replace("\n", "  ")
    #  txt = rex.sub(' ', txt)
    #  txt = Truncator(txt).chars(50)
    return txt

def get_content(parent, selector, attr=None):
    """extract content from the given element.
    """

    if not selector:
        return None

    try:
        exp = GenericTranslator().css_to_xpath(selector)
    except SelectorError:
        return None

    results = parent.xpath(exp)

    if results:
        ret = results[0]
        if attr:
            if attr in ret.attrib:
                ret = clean(ret.attrib[attr])
            else:
                ret = None
        else:
            ret = clean(ret.xpath("string()"))
        return ret
    else:
        return None


class Command(BaseCommand):
    help = 'Updates the satellite feeds'

    def handle(self, *args, **options):

        self.stdout.write('Scraping satellites')

        for feed in Feed.objects.all():
            self.stdout.write('Scraping %s' % feed.url)

            r = requests.get(feed.url)

            tree = html5lib.parse(r.text, treebuilder="lxml", namespaceHTMLElements=False)

            try:
                post_exp = GenericTranslator().css_to_xpath(feed.post_selector)
            except SelectorError:
                continue

            entry_list = tree.xpath(post_exp)
            entry_list.reverse()
            for entry in entry_list:
                permalink = get_content(entry, feed.permalink_selector, attr=feed.permalink_attribute)
                title = get_content(entry, feed.title_selector, attr=feed.title_attribute)
                main_image_url = get_content(entry, feed.main_image_selector, attr=feed.main_image_attribute)
                #  excerpt = get_content(entry, feed.excerpt_selector, attr=feed.excerpt_attribute)


                if title:
                    self.stdout.write('Indexing {} {}'.format(title, permalink))
                else:
                    self.stdout.write('Could not find a title for this entry {}... Skipping.'.format(permalink))
                    continue

                obj, created = FeedEntry.objects.get_or_create(permalink=permalink, defaults={
                    "title": title,
                    "feed": feed,
                    "is_published": True
                })

                if created:
                    if main_image_url:
                        r = requests.get(main_image_url)

                        img_temp = NamedTemporaryFile(delete=True)
                        img_temp.write(r.content)
                        img_temp.flush()

                        a = urlparse(main_image_url)
                        fn = os.path.basename(a.path)

                        obj.main_image_original.save(fn, File(img_temp))

                    obj.save()

            self.stdout.write('Successfully scraped %s' % feed.url)