Passed
Push — master ( c88b19...443080 )
by Koen
02:19
created

atramhasis.scripts.sitemap_generator.main()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 32
rs 9.28
c 0
b 0
f 0
cc 3
nop 0
1
import argparse
2
import datetime
3
import logging
4
import os
5
import xml.etree.ElementTree as ElementTree
6
from builtins import input
7
from os import listdir
8
from os.path import isfile
9
10
from pyramid.paster import bootstrap
11
from pyramid.paster import get_appsettings
12
from pyramid.paster import setup_logging
13
14
from atramhasis.errors import SkosRegistryNotFoundException
15
16
log = logging.getLogger(__name__)
17
18
19
def write_element_to_xml(filename, sitemap_dir, element):
20
    tree = ElementTree.ElementTree(element)
21
    file_path_name = os.path.join(sitemap_dir, filename)
22
    tree.write(file_path_name, encoding='utf-8', xml_declaration=True)
23
24
25
def create_sitemaps(settings, limit_per_deel, directory, env):
26
    base_url = settings.get("atramhasis.url")
27
    schemes_url = f"{base_url}/conceptschemes/{{}}"
28
    concepts_url = f"{schemes_url}/c/{{}}"
29
30
    request = env['request']
31
32
    if hasattr(request, 'skos_registry') and request.skos_registry is not None:
33
        skos_registry = request.skos_registry
34
    else:
35
        raise SkosRegistryNotFoundException()  # pragma: no cover
36
37
    scheme_urls = list()
38
    concept_urls = list()
39
    for p in skos_registry.get_providers():
40
        if any([not_shown in p.get_metadata()['subject']
41
                for not_shown in ['external', 'hidden']]):
42
            continue
43
        scheme_id = p.get_metadata()['id']
44
        scheme_urls.append(schemes_url.format(scheme_id))
45
        concept_urls.extend([concepts_url.format(scheme_id, x['id'])
46
                             for x in p.get_all()])
47
48
    create_deel_sitemaps(scheme_urls, limit_per_deel, directory, 'conceptschemes')
49
    create_deel_sitemaps(concept_urls, limit_per_deel, directory, 'concepts')
50
51
    create_index_sitemap(base_url, directory)
52
53
54
def create_deel_sitemaps(objecturls, limit_per_deel, sitemap_dir, name):
55
    """
56
    Sitemaps have a maximum amount of items. This method creates partial
57
    sitemaps with up to `limit_per_deel` items per file.
58
    """
59
    log.info("Beginning creation of sitemaps...")
60
    urlset = ElementTree.Element(
61
        "urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
62
    )
63
    sitemap_counter = 1
64
65
    for counter, objecturl in enumerate(objecturls, 1):
66
        url = ElementTree.SubElement(urlset, "url")
67
        ElementTree.SubElement(url, "loc").text = objecturl
68
69
        if counter % limit_per_deel == 0:
70
            filename = f'{name}_sitemap_deel_{sitemap_counter}.xml'
71
            log.info("Processed %s conceptschemes, writing %s", counter, filename)
72
            write_element_to_xml(filename, sitemap_dir, urlset)
73
            sitemap_counter += 1
74
            urlset = ElementTree.Element(
75
                "urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
76
            )
77
    if len(urlset):
78
        filename = f'{name}_sitemap_deel_{sitemap_counter}.xml'
79
        write_element_to_xml(filename, sitemap_dir, urlset)
80
    log.info(f"All {name} sitemaps created.")
81
82
83
def create_index_sitemap(base_url, directory):
84
    """Loop over all the created sitemaps, and create an index file."""
85
    log.info("Beginning creation of the final sitemap index...")
86
    list_sitemaps = [
87
        f for f in listdir(directory)
88
        if isfile(os.path.join(directory, f))
89
        and "sitemap" in f and "sitemap_index.xml" not in f
90
    ]
91
    sitemapindex = ElementTree.Element(
92
        "sitemapindex", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
93
    )
94
    today = datetime.date.today().isoformat()
95
96
    for file_name in list_sitemaps:
97
        sitemap_static_url = f"{base_url}/sitemaps/{file_name}"
98
        sitemap_area = ElementTree.SubElement(sitemapindex, "sitemap")
99
        ElementTree.SubElement(sitemap_area, "loc").text = sitemap_static_url
100
        ElementTree.SubElement(sitemap_area, "lastmod").text = today
101
102
    write_element_to_xml("sitemap_index.xml", directory, sitemapindex)
103
    log.info("Sitemap index created.")
104
105
106
def main():
107
    parser = argparse.ArgumentParser(
108
        description="Process some command line arguments. ",
109
        usage="sitemap_generator development.ini#atramhasis "
110
              "--limit=1000")
111
    parser.add_argument('settings_file',
112
                        help="<The location of the settings file>")
113
    parser.add_argument("--limit", type=int,
114
                        help="range of objects in sitemap", default=50000)
115
    parser.add_argument("--no-input", action='store_true',
116
                        help="Don't stop script for user input")
117
    args, _ = parser.parse_known_args()
118
119
    config_uri = args.settings_file
120
    limit = args.limit
121
    setup_logging(config_uri)
122
    settings = get_appsettings(config_uri)
123
    env = bootstrap(config_uri)
124
    here = os.path.dirname(__file__)
125
    sitemap_dir = os.path.join(here, "..", "static", "_sitemaps")
126
    if os.listdir(sitemap_dir):
127
        print(
128
            "[WARNING]The target sitemap directory ({}) is not empty.\n"
129
            "Existing sitemaps may get overridden. But the sitemap index file "
130
            "will contain all sitemaps in the folder, even old ones that are "
131
            "no longer needed. Consider deleting the contents of the folder "
132
            "first.".format(os.path.abspath(sitemap_dir))
133
        )
134
        if not args.no_input:
135
            input("Press [Enter] to continue.")
136
137
    create_sitemaps(settings, limit, sitemap_dir, env)
138
139
140
if __name__ == '__main__':  # pragma: no cover
141
    main()
142