validate_pages_json_data()   F
last analyzed

Complexity

Conditions 19

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 65
rs 2.8121
c 0
b 0
f 0
cc 19

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like validate_pages_json_data() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from django.db.models import Max
2
from django.utils.translation import ugettext_lazy as _
3
from django.contrib.sites.models import Site
4
from django.conf import settings as global_settings
5
from django.contrib.auth import get_user_model
6
7
from pages.models import Page, Content
8
from pages.managers import PageManager
9
from pages.utils import get_placeholders
10
from pages import settings
11
12
from datetime import datetime
13
import json as _json
14
15
ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'  # for parsing dates from JSON
16
JSON_PAGE_EXPORT_NAME = 'gerbi_cms_page_export_version'
17
JSON_PAGE_EXPORT_VERSION = 4
18
# make it readable -- there are better ways to save space
19
JSON_PAGE_EXPORT_INDENT = 2
20
21
22
def monkeypatch_remove_pages_site_restrictions():
23
    """
24
    monkeypatch PageManager to expose pages for all sites by
25
    removing customized get_query_set. Only actually matters
26
    if PAGE_HIDE_SITES is set
27
    """
28
    try:
29
        del PageManager.get_query_set
30
    except AttributeError:
31
        pass
32
33
34
def dump_json_data(page):
35
    """
36
    Return a python dict representation of this page for use as part of
37
    a JSON export.
38
    """
39
    def content_langs_ordered():
40
        """
41
        Return a list of languages ordered by the page content
42
        with the latest creation date in each.  This will be used
43
        to maintain the state of the language_up_to_date template
44
        tag when a page is restored or imported into another site.
45
        """
46
        params = {'page': page}
47
        if page.freeze_date:
48
            params['creation_date__lte'] = page.freeze_date
49
        cqs = Content.objects.filter(**params)
50
        cqs = cqs.values('language').annotate(latest=Max('creation_date'))
51
        return [c['language'] for c in cqs.order_by('latest')]
52
    languages = content_langs_ordered()
53
54
    def language_content(ctype):
55
        return dict(
56
            (lang, page.get_content(lang, ctype, language_fallback=False))
57
            for lang in languages)
58
59
    def placeholder_content():
60
        """Return content of each placeholder in each language."""
61
        out = {}
62
        for p in get_placeholders(page.get_template()):
63
            if p.ctype in ('title', 'slug'):
64
                continue  # these were already included
65
            out[p.name] = language_content(p.name)
66
        return out
67
68
    def isoformat(d):
69
        return None if d is None else d.strftime(ISODATE_FORMAT)
70
71
    def custom_email(user):
72
        """Allow a user's profile to return an email for the user."""
73
        return user.email
74
75
    tags = []
76
    if settings.PAGE_TAGGING:
77
        tags = [tag.name for tag in page.tags.all()]
78
79
    return {
80
        'complete_slug': dict(
81
            (lang, page.get_complete_slug(lang, hideroot=False))
82
            for lang in languages),
83
        'title': language_content('title'),
84
        'author_email': custom_email(page.author),
85
        'creation_date': isoformat(page.creation_date),
86
        'publication_date': isoformat(page.publication_date),
87
        'publication_end_date': isoformat(page.publication_end_date),
88
        'last_modification_date': isoformat(page.last_modification_date),
89
        'status': {
90
            Page.PUBLISHED: 'published',
91
            Page.HIDDEN: 'hidden',
92
            Page.DRAFT: 'draft'}[page.status],
93
        'template': page.template,
94
        'sites': (
95
            [site.domain for site in page.sites.all()]
96
            if settings.PAGE_USE_SITE_ID else []),
97
        'redirect_to_url': page.redirect_to_url,
98
        'redirect_to_complete_slug': dict(
99
            (lang, page.redirect_to.get_complete_slug(
100
                lang, hideroot=False))
101
            for lang in page.redirect_to.get_languages()
102
            ) if page.redirect_to is not None else None,
103
        'content': placeholder_content(),
104
        'content_language_updated_order': languages,
105
        'tags': tags,
106
    }
107
108
109
def update_redirect_to_from_json(page, redirect_to_complete_slugs):
110
    """
111
    The second pass of create_and_update_from_json_data
112
    used to update the redirect_to field.
113
114
    Returns a messages list to be appended to the messages from the
115
    first pass.
116
    """
117
    messages = []
118
    s = ''
119
    for lang, s in list(redirect_to_complete_slugs.items()):
120
        r = Page.objects.from_path(s, lang, exclude_drafts=False)
121
        if r:
122
            page.redirect_to = r
123
            page.save()
124
            break
125
    else:
126
        messages.append(_("Could not find page for redirect-to field"
127
            " '%s'") % (s,))
128
    return messages
129
130
131
def create_and_update_from_json_data(d, user):
132
    """
133
    Create or update page based on python dict d loaded from JSON data.
134
    This applies all data except for redirect_to, which is done in a
135
    second pass after all pages have been imported,
136
137
    user is the User instance that will be used if the author can't
138
    be found in the DB.
139
140
    returns (page object, created, messages).
141
142
    created is True if this was a new page or False if an existing page
143
    was updated.
144
145
    messages is a list of strings warnings/messages about this import
146
    """
147
    page = None
148
    parent = None
149
    parent_required = True
150
    created = False
151
    messages = []
152
153
    page_languages = set(lang[0] for lang in settings.PAGE_LANGUAGES)
154
155
    for lang, s in list(d['complete_slug'].items()):
156
        if lang not in page_languages:
157
            messages.append(_("Language '%s' not imported") % (lang,))
158
            continue
159
160
        page = Page.objects.from_path(s, lang, exclude_drafts=False)
161
        if page and page.get_complete_slug(lang) == s:
162
            break
163
        if parent_required and parent is None:
164
            if '/' in s:
165
                parent = Page.objects.from_path(s.rsplit('/', 1)[0], lang,
166
                    exclude_drafts=False)
167
            else:
168
                parent_required = False
169
    else:
170
        # can't find an existing match, need to create a new Page
171
        page = Page(parent=parent)
172
        created = True
173
174
    user_model = get_user_model()
175
176
    def custom_get_user_by_email(email):
177
        """
178
        Simplified version
179
        """
180
        return user_model.objects.get(email=email)
181
182
    try:
183
        page.author = custom_get_user_by_email(d['author_email'])
184
    except (user_model.DoesNotExist, user_model.MultipleObjectsReturned):
185
        page.author = user
186
        messages.append(_("Original author '%s' not found")
187
            % (d['author_email'],))
188
189
    page.creation_date = datetime.strptime(d['creation_date'],
190
        ISODATE_FORMAT)
191
    page.publication_date = datetime.strptime(d['publication_date'],
192
        ISODATE_FORMAT) if d['publication_date'] else None
193
    page.publication_end_date = datetime.strptime(d['publication_end_date'],
194
        ISODATE_FORMAT) if d['publication_end_date'] else None
195
    page.last_modification_date = datetime.strptime(
196
        d['last_modification_date'], ISODATE_FORMAT)
197
    page.status = {
198
        'published': Page.PUBLISHED,
199
        'hidden': Page.HIDDEN,
200
        'draft': Page.DRAFT,
201
        }[d['status']]
202
    page.template = d['template']
203
    page.redirect_to_url = d['redirect_to_url']
204
205
    page.save()
206
207
    # Add tags
208
    if settings.PAGE_TAGGING:
209
        from taggit.models import Tag
210
        tags = d.get('tags', [])
211
        page.tags.clear()
212
        if tags:
213
            for tag in tags:
214
                Tag.objects.get_or_create(name=tag)
215
                page.tags.add(tag)
216
            page.save()
217
218
    if settings.PAGE_USE_SITE_ID:
219
        if d['sites']:
220
            for site in d['sites']:
221
                try:
222
                    page.sites.add(Site.objects.get(domain=site))
223
                except Site.DoesNotExist:
224
                    messages.append(_("Could not add site '%s' to page")
225
                        % (site,))
226
        if not settings.PAGE_HIDE_SITES and not page.sites.count():
227
            # need at least one site
228
            page.sites.add(Site.objects.get(pk=global_settings.SITE_ID))
229
230
    def create_content(lang, ctype, body):
231
        Content.objects.create_content_if_changed(page, lang, ctype, body)
232
233
    for lang in d['content_language_updated_order']:
234
        if lang not in page_languages:
235
            continue
236
        create_content(lang, 'slug',
237
            d['complete_slug'][lang].rsplit('/', 1)[-1])
238
        create_content(lang, 'title', d['title'][lang])
239
        for ctype, langs_bodies in list(d['content'].items()):
240
            create_content(lang, ctype, langs_bodies[lang])
241
242
    return page, created, messages
243
244
245
def pages_to_json(queryset):
246
    """
247
    Return a JSON string export of the pages in queryset.
248
    """
249
    # selection may be in the wrong order, and order matters
250
    queryset = queryset.order_by('tree_id', 'lft')
251
    return _json.dumps(
252
        {JSON_PAGE_EXPORT_NAME: JSON_PAGE_EXPORT_VERSION,
253
            'pages': [dump_json_data(page) for page in queryset]},
254
        indent=JSON_PAGE_EXPORT_INDENT, sort_keys=True)
255
256
257
def json_to_pages(json, user, preferred_lang=None):
258
    """
259
    Attept to create/update pages from JSON string json.  user is the
260
    user that will be used when creating a page if a page's original
261
    author can't be found.  preferred_lang is the language code of the
262
    slugs to include in error messages (defaults to
263
    settings.PAGE_DEFAULT_LANGUAGE).
264
265
    Returns (errors, pages_created) where errors is a list of strings
266
    and pages_created is a list of: (page object, created bool,
267
    messages list of strings) tuples.
268
269
    If any errors are detected there the error list will contain
270
    information for the user and no pages will be created/updated.
271
    """
272
    from pages.models import Page
273
    if not preferred_lang:
274
        preferred_lang = settings.PAGE_DEFAULT_LANGUAGE
275
276
    d = _json.loads(json)
277
    try:
278
        errors = validate_pages_json_data(d, preferred_lang)
279
    except KeyError as e:
280
        errors = [_('JSON file is invalid: %s') % (e.args[0],)]
281
282
    pages_created = []
283
    if not errors:
284
        # pass one
285
        for p in d['pages']:
286
            pages_created.append(
287
                create_and_update_from_json_data(p, user))
288
        # pass two
289
        for p, results in zip(d['pages'], pages_created):
290
            page, created, messages = results
291
            rtcs = p['redirect_to_complete_slug']
292
            if rtcs:
293
                messages.extend(update_redirect_to_from_json(page, rtcs))
294
        # clean up MPTT links
295
        Page.objects.rebuild()
296
297
    return errors, pages_created
298
299
300
def validate_pages_json_data(d, preferred_lang):
301
    """
302
    Check if an import of d will succeed, and return errors.
303
304
    errors is a list of strings.  The import should proceed only if errors
305
    is empty.
306
    """
307
    from pages.models import Page
308
    errors = []
309
310
    seen_complete_slugs = dict(
311
        (lang[0], set()) for lang in settings.PAGE_LANGUAGES)
312
313
    valid_templates = set(t[0] for t in settings.get_page_templates())
314
    valid_templates.add(settings.PAGE_DEFAULT_TEMPLATE)
315
316
    if d[JSON_PAGE_EXPORT_NAME] != JSON_PAGE_EXPORT_VERSION:
317
        return [_('Unsupported file version: %s') % repr(
318
            d[JSON_PAGE_EXPORT_NAME])], []
319
    pages = d['pages']
320
    for p in pages:
321
        # use the complete slug as a way to identify pages in errors
322
        slug = p['complete_slug'].get(preferred_lang, None)
323
        seen_parent = False
324
        for lang, s in list(p['complete_slug'].items()):
325
            if lang not in seen_complete_slugs:
326
                continue
327
            seen_complete_slugs[lang].add(s)
328
329
            if '/' not in s:  # root level, no parent req'd
330
                seen_parent = True
331
            if not seen_parent:
332
                parent_slug, ignore = s.rsplit('/', 1)
333
                if parent_slug in seen_complete_slugs[lang]:
334
                    seen_parent = True
335
                else:
336
                    parent = Page.objects.from_path(parent_slug, lang,
337
                        exclude_drafts=False)
338
                    if parent and parent.get_complete_slug(lang) == parent_slug:
339
                        # parent not included, but exists on site
340
                        seen_parent = True
341
            if not slug:
342
                slug = s
343
344
        if not slug:
345
            errors.append(_("%s has no common language with this site")
346
                % (list(p['complete_slug'].values())[0],))
347
            continue
348
349
        if not seen_parent:
350
            errors.append(_("%s did not include its parent page and a matching"
351
                " one was not found on this site") % (slug,))
352
353
        if p['template'] not in valid_templates:
354
            errors.append(_("%s uses a template not found on this site: %s")
355
                % (slug, p['template']))
356
            continue
357
358
        if set(p.ctype for p in get_placeholders(p['template']) if
359
                p.ctype not in ('title', 'slug')) != set(p['content'].keys()):
360
            errors.append(_("%s template contents are different than our "
361
                "template: %s") % (slug, p['template']))
362
            continue
363
364
    return errors
365