Passed
Push — develop ( 1f6ef3...42aeec )
by Koen
01:25
created

atramhasis.scripts.sitemap_generator.main()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 32
rs 9.28
c 0
b 0
f 0
cc 3
nop 0
1
import argparse
2
import contextlib
3
import datetime
4
import logging
5
import os
6
import xml.etree.ElementTree as ElementTree
7
from builtins import input
8
from os import listdir
9
from os.path import isfile
10
11
from pyramid.paster import bootstrap
12
from pyramid.paster import get_appsettings
13
from pyramid.paster import setup_logging
14
from pytz import timezone
15
from sqlalchemy import engine_from_config
16
from sqlalchemy.orm import sessionmaker
17
18
from atramhasis.errors import SkosRegistryNotFoundException
19
20
timezone_brussels = timezone('Europe/Brussels')
21
log = logging.getLogger(__name__)
22
23
24
@contextlib.contextmanager
25
def db_session(settings):
26
    engine = engine_from_config(settings, 'sqlalchemy.')
27
    session_maker = sessionmaker(bind=engine)
28
    session = session_maker()
29
    try:
30
        yield session
31
        session.commit()
32
    except Exception:
33
        session.rollback()
34
        raise
35
    finally:
36
        session.close()
37
38
39
def write_element_to_xml(filename, sitemap_dir, element):
40
    tree = ElementTree.ElementTree(element)
41
    file_path_name = os.path.join(sitemap_dir, filename)
42
    tree.write(file_path_name, encoding='utf-8', xml_declaration=True)
43
44
45
def create_sitemaps(settings, limit_per_deel, directory, env):
46
    base_url = settings.get("atramhasis.url")
47
    schemes_url = f"{base_url}/conceptschemes/{{}}"
48
    concepts_url = f"{schemes_url}/c/{{}}"
49
50
    request = env['request']
51
52
    if hasattr(request, 'skos_registry') and request.skos_registry is not None:
53
        skos_registry = request.skos_registry
54
    else:
55
        raise SkosRegistryNotFoundException()  # pragma: no cover
56
57
    scheme_urls = list()
58
    concept_urls = list()
59
    for p in skos_registry.get_providers():
60
        if any([not_shown in p.get_metadata()['subject']
61
                for not_shown in ['external', 'hidden']]):
62
            continue
63
        scheme_id = p.get_metadata()['id']
64
        scheme_urls.append(schemes_url.format(scheme_id))
65
        concept_urls.extend([concepts_url.format(scheme_id, x['id'])
66
                             for x in p.get_all()])
67
68
    create_deel_sitemaps(scheme_urls, limit_per_deel, directory, 'conceptschemes')
69
    create_deel_sitemaps(concept_urls, limit_per_deel, directory, 'concepts')
70
71
    create_index_sitemap(base_url, directory)
72
73
74
def create_deel_sitemaps(objecturls, limit_per_deel, sitemap_dir, name):
75
    """
76
    Sitemaps have a maximum amount of items. This method creates partial
77
    sitemaps with up to `limit_per_deel` items per file.
78
    """
79
    log.info("Beginning creation of sitemaps...")
80
    urlset = ElementTree.Element(
81
        "urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
82
    )
83
    sitemap_counter = 1
84
85
    for counter, objecturl in enumerate(objecturls, 1):
86
        url = ElementTree.SubElement(urlset, "url")
87
        ElementTree.SubElement(url, "loc").text = objecturl
88
89
        if counter % limit_per_deel == 0:
90
            filename = f'{name}_sitemap_deel_{sitemap_counter}.xml'
91
            log.info("Processed %s conceptschemes, writing %s", counter, filename)
92
            write_element_to_xml(filename, sitemap_dir, urlset)
93
            sitemap_counter += 1
94
            urlset = ElementTree.Element(
95
                "urlset", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
96
            )
97
    if len(urlset):
98
        filename = f'{name}_sitemap_deel_{sitemap_counter}.xml'
99
        write_element_to_xml(filename, sitemap_dir, urlset)
100
    log.info(f"All {name} sitemaps created.")
101
102
103
def create_index_sitemap(base_url, directory):
104
    """Loop over all the created sitemaps, and create an index file."""
105
    log.info("Beginning creation of the final sitemap index...")
106
    list_sitemaps = [
107
        f for f in listdir(directory)
108
        if isfile(os.path.join(directory, f))
109
        and "sitemap" in f and "sitemap_index.xml" not in f
110
    ]
111
    sitemapindex = ElementTree.Element(
112
        "sitemapindex", xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
113
    )
114
115
    for file_name in list_sitemaps:
116
        sitemap_static_url = f"{base_url}/sitemaps/{file_name}"
117
        sitemap_area = ElementTree.SubElement(sitemapindex, "sitemap")
118
        ElementTree.SubElement(sitemap_area, "loc").text = sitemap_static_url
119
        today = datetime.datetime.now(timezone_brussels).strftime("%Y-%m-%d")
120
        ElementTree.SubElement(sitemap_area, "lastmod").text = today
121
122
    write_element_to_xml("sitemap_index.xml", directory, sitemapindex)
123
    log.info("Sitemap index created.")
124
125
126
def main():
127
    parser = argparse.ArgumentParser(
128
        description="Process some command line arguments. ",
129
        usage="sitemap_generator development.ini#atramhasis "
130
              "--limit=1000")
131
    parser.add_argument('settings_file',
132
                        help="<The location of the settings file>")
133
    parser.add_argument("--limit", type=int,
134
                        help="range of objects in sitemap", default=50000)
135
    parser.add_argument("--no-input", action='store_true',
136
                        help="Don't stop script for user input")
137
    args, _ = parser.parse_known_args()
138
139
    config_uri = args.settings_file
140
    limit = args.limit
141
    setup_logging(config_uri)
142
    settings = get_appsettings(config_uri)
143
    env = bootstrap(config_uri)
144
    here = os.path.dirname(__file__)
145
    sitemap_dir = os.path.join(here, "..", "static", "_sitemaps")
146
    if os.listdir(sitemap_dir):
147
        print(
148
            "[WARNING]The target sitemap directory ({}) is not empty.\n"
149
            "Existing sitemaps may get overridden. But the sitemap index file "
150
            "will contain all sitemaps in the folder, even old ones that are "
151
            "no longer needed. Consider deleting the contents of the folder "
152
            "first.".format(os.path.abspath(sitemap_dir))
153
        )
154
        if not args.no_input:
155
            input("Press [Enter] to continue.")
156
157
    create_sitemaps(settings, limit, sitemap_dir, env)
158
159
160
if __name__ == '__main__':  # pragma: no cover
161
    main()
162