Passed
Push — master ( d135c8...c0d002 )
by Ramon
05:03
created

build.bika.lims.idserver.get_variables()   C

Complexity

Conditions 7

Size

Total Lines 85
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 61
dl 0
loc 85
rs 6.8763
c 0
b 0
f 0
cc 7
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import itertools
9
import re
10
11
import transaction
12
from bika.lims import api
13
from bika.lims import logger
14
from bika.lims.alphanumber import Alphanumber
15
from bika.lims.alphanumber import to_alpha
16
from bika.lims.browser.fields.uidreferencefield import \
17
    get_backreferences as get_backuidreferences
18
from bika.lims.interfaces import IAnalysisRequest
19
from bika.lims.interfaces import IAnalysisRequestPartition
20
from bika.lims.interfaces import IAnalysisRequestRetest
21
from bika.lims.interfaces import IAnalysisRequestSecondary
22
from bika.lims.interfaces import IIdServer
23
from bika.lims.numbergenerator import INumberGenerator
24
from DateTime import DateTime
25
from Products.ATContentTypes.utils import DT2dt
26
from zope.component import getAdapters
27
from zope.component import getUtility
28
29
AR_TYPES = [
30
    "AnalysisRequest",
31
    "AnalysisRequestRetest",
32
    "AnalysisRequestPartition",
33
    "AnalysisRequestSecondary",
34
]
35
36
37
def get_objects_in_sequence(brain_or_object, ctype, cref):
38
    """Return a list of items
39
    """
40
    obj = api.get_object(brain_or_object)
41
    if ctype == "backreference":
42
        return get_backreferences(obj, cref)
43
    if ctype == "contained":
44
        return get_contained_items(obj, cref)
45
    raise ValueError("Reference value is mandatory for sequence type counter")
46
47
48
def get_backreferences(obj, relationship):
49
    """Returns the backreferences
50
    """
51
    refs = get_backuidreferences(obj, relationship)
52
53
    # TODO remove after all ReferenceField get ported to UIDReferenceField
54
    # At this moment, there are still some content types that are using the
55
    # ReferenceField, so we need to fallback to traditional getBackReferences
56
    # for these cases.
57
    if not refs:
58
        refs = obj.getBackReferences(relationship)
59
60
    return refs
61
62
63
def get_contained_items(obj, spec):
64
    """Returns a list of (id, subobject) tuples of the current context.
65
    If 'spec' is specified, returns only objects whose meta_type match 'spec'
66
    """
67
    return obj.objectItems(spec)
68
69
70
def get_type_id(context, **kw):
71
    """Returns the type id for the context passed in
72
    """
73
    portal_type = kw.get("portal_type", None)
74
    if portal_type:
75
        return portal_type
76
77
    # Override by provided marker interface
78
    if IAnalysisRequestPartition.providedBy(context):
79
        return "AnalysisRequestPartition"
80
    elif IAnalysisRequestRetest.providedBy(context):
81
        return "AnalysisRequestRetest"
82
    elif IAnalysisRequestSecondary.providedBy(context):
83
        return "AnalysisRequestSecondary"
84
85
    return api.get_portal_type(context)
86
87
88
def get_suffix(id, regex="-[A-Z]{1}[0-9]{1,2}$"):
89
    """Get the suffix of the ID, e.g. '-R01' or '-P05'
90
91
    The current regex determines a pattern of a single uppercase character with
92
    at most 2 numbers following at the end of the ID as the suffix.
93
    """
94
    parts = re.findall(regex, id)
95
    if not parts:
96
        return ""
97
    return parts[0]
98
99
100
def strip_suffix(id):
101
    """Split off any suffix from ID
102
103
    This mimics the old behavior of the Sample ID.
104
    """
105
    suffix = get_suffix(id)
106
    if not suffix:
107
        return id
108
    return re.split(suffix, id)[0]
109
110
111
def get_retest_count(context, default=0):
112
    """Returns the number of retests of this AR
113
    """
114
    if not is_ar(context):
115
        return default
116
117
    invalidated = context.getInvalidated()
118
119
    count = 0
120
    while invalidated:
121
        count += 1
122
        invalidated = invalidated.getInvalidated()
123
124
    return count
125
126
127
def get_partition_count(context, default=0):
128
    """Returns the number of partitions of this AR
129
    """
130
    if not is_ar(context):
131
        return default
132
133
    parent = context.getParentAnalysisRequest()
134
135
    if not parent:
136
        return default
137
138
    return len(parent.getDescendants())
139
140
def get_secondary_count(context, default=0):
141
    """Returns the number of secondary ARs of this AR
142
    """
143
    if not is_ar(context):
144
        return default
145
146
    primary = context.getPrimaryAnalysisRequest()
147
148
    if not primary:
149
        return default
150
151
    return len(primary.getSecondaryAnalysisRequests())
152
153
154
def is_ar(context):
155
    """Checks if the context is an AR
156
    """
157
    return IAnalysisRequest.providedBy(context)
158
159
160
def get_config(context, **kw):
161
    """Fetch the config dict from the Bika Setup for the given portal_type
162
    """
163
    # get the ID formatting config
164
    config_map = api.get_bika_setup().getIDFormatting()
165
166
    # allow portal_type override
167
    portal_type = get_type_id(context, **kw)
168
169
    # check if we have a config for the given portal_type
170
    for config in config_map:
171
        if config['portal_type'].lower() == portal_type.lower():
172
            return config
173
174
    # return a default config
175
    default_config = {
176
        'form': '%s-{seq}' % portal_type.lower(),
177
        'sequence_type': 'generated',
178
        'prefix': '%s' % portal_type.lower(),
179
    }
180
    return default_config
181
182
183
def get_variables(context, **kw):
184
    """Prepares a dictionary of key->value pairs usable for ID formatting
185
    """
186
    # allow portal_type override
187
    portal_type = get_type_id(context, **kw)
188
189
    # The variables map hold the values that might get into the constructed id
190
    variables = {
191
        "context": context,
192
        "id": api.get_id(context),
193
        "portal_type": portal_type,
194
        "year": get_current_year(),
195
        "parent": api.get_parent(context),
196
        "seq": 0,
197
        "alpha": Alphanumber(0),
198
    }
199
200
    # Augment the variables map depending on the portal type
201
    if portal_type in AR_TYPES:
202
        now = DateTime()
203
        sampling_date = context.getSamplingDate()
204
        sampling_date = sampling_date and DT2dt(sampling_date) or DT2dt(now)
205
        date_sampled = context.getDateSampled()
206
        date_sampled = date_sampled and DT2dt(date_sampled) or DT2dt(now)
207
        test_count = 1
208
209
        variables.update({
210
            "clientId": context.getClientID(),
211
            "dateSampled": date_sampled,
212
            "samplingDate": sampling_date,
213
            "sampleType": context.getSampleType().getPrefix(),
214
            "test_count": test_count
215
        })
216
217
        # Partition
218
        if portal_type == "AnalysisRequestPartition":
219
            parent_ar = context.getParentAnalysisRequest()
220
            parent_ar_id = api.get_id(parent_ar)
221
            parent_base_id = strip_suffix(parent_ar_id)
222
            partition_count = get_partition_count(context)
223
            variables.update({
224
                "parent_analysisrequest": parent_ar,
225
                "parent_ar_id": parent_ar_id,
226
                "parent_base_id": parent_base_id,
227
                "partition_count": partition_count,
228
            })
229
230
        # Retest
231
        elif portal_type == "AnalysisRequestRetest":
232
            # Note: we use "parent" instead of "invalidated" for simplicity
233
            parent_ar = context.getInvalidated()
234
            parent_ar_id = api.get_id(parent_ar)
235
            parent_base_id = strip_suffix(parent_ar_id)
236
            # keep the full ID if the retracted AR is a partition
237
            if context.isPartition():
238
                parent_base_id = parent_ar_id
239
            retest_count = get_retest_count(context)
240
            test_count = test_count + retest_count
241
            variables.update({
242
                "parent_analysisrequest": parent_ar,
243
                "parent_ar_id": parent_ar_id,
244
                "parent_base_id": parent_base_id,
245
                "retest_count": retest_count,
246
                "test_count": test_count,
247
            })
248
249
        # Secondary
250
        elif portal_type == "AnalysisRequestSecondary":
251
            primary_ar = context.getPrimaryAnalysisRequest()
252
            primary_ar_id = api.get_id(primary_ar)
253
            parent_base_id = strip_suffix(primary_ar_id)
254
            secondary_count = get_secondary_count(context)
255
            variables.update({
256
                "parent_analysisrequest": primary_ar,
257
                "parent_ar_id": primary_ar_id,
258
                "parent_base_id": parent_base_id,
259
                "secondary_count": secondary_count,
260
            })
261
262
    elif portal_type == "ARReport":
263
        variables.update({
264
            "clientId": context.aq_parent.getClientID(),
265
        })
266
267
    return variables
268
269
270
def split(string, separator="-"):
271
    """ split a string on the given separator
272
    """
273
    if not isinstance(string, basestring):
274
        return []
275
    return string.split(separator)
276
277
278
def to_int(thing, default=0):
279
    """Convert a thing to an integer
280
    """
281
    try:
282
        return int(thing)
283
    except (TypeError, ValueError):
284
        return default
285
286
287
def slice(string, separator="-", start=None, end=None):
288
    """Slice out a segment of a string, which is splitted on both the wildcards
289
    and the separator passed in, if any
290
    """
291
    # split by wildcards/keywords first
292
    # AR-{sampleType}-{parentId}{alpha:3a2d}
293
    segments = filter(None, re.split('(\{.+?\})', string))
294
    # ['AR-', '{sampleType}', '-', '{parentId}', '{alpha:3a2d}']
295
    if separator:
296
        # Keep track of singleton separators as empties
297
        # We need to do this to prevent duplicates later, when splitting
298
        segments = map(lambda seg: seg!=separator and seg or "", segments)
299
        # ['AR-', '{sampleType}', '', '{parentId}', '{alpha:3a2d}']
300
        # Split each segment at the given separator
301
        segments = map(lambda seg: split(seg, separator), segments)
302
        # [['AR', ''], ['{sampleType}'], [''], ['{parentId}'], ['{alpha:3a2d}']]
303
        # Flatten the list
304
        segments = list(itertools.chain.from_iterable(segments))
305
        # ['AR', '', '{sampleType}', '', '{parentId}', '{alpha:3a2d}']
306
        # And replace empties with separator
307
        segments = map(lambda seg: seg!="" and seg or separator, segments)
308
        # ['AR', '-', '{sampleType}', '-', '{parentId}', '{alpha:3a2d}']
309
310
    # Get the start and end positions from the segments without separator
311
    cleaned_segments = filter(lambda seg: seg!=separator, segments)
312
    start_pos = to_int(start, 0)
313
    # Note "end" is not a position, but the number of elements to join!
314
    end_pos = to_int(end, len(cleaned_segments) - start_pos) + start_pos - 1
315
316
    # Map the positions against the segments with separator
317
    start = segments.index(cleaned_segments[start_pos])
318
    end = segments.index(cleaned_segments[end_pos]) + 1
319
320
    # Return all segments joined
321
    sliced_parts = segments[start:end]
322
    return "".join(sliced_parts)
323
324
325
def get_current_year():
326
    """Returns the current year as a two digit string
327
    """
328
    return DateTime().strftime("%Y")[2:]
329
330
331
def search_by_prefix(portal_type, prefix):
332
    """Returns brains which share the same portal_type and ID prefix
333
    """
334
    catalog = api.get_tool("uid_catalog")
335
    brains = catalog({"portal_type": portal_type})
336
    # Filter brains with the same ID prefix
337
    return filter(lambda brain: api.get_id(brain).startswith(prefix), brains)
338
339
340
def get_ids_with_prefix(portal_type, prefix):
341
    """Return a list of ids sharing the same portal type and prefix
342
    """
343
    brains = search_by_prefix(portal_type, prefix)
344
    ids = map(api.get_id, brains)
345
    return ids
346
347
348
def make_storage_key(portal_type, prefix=None):
349
    """Make a storage (dict-) key for the number generator
350
    """
351
    key = portal_type.lower()
352
    if prefix:
353
        key = "{}-{}".format(key, prefix)
354
    return key
355
356
357
def get_seq_number_from_id(id, id_template, prefix, **kw):
358
    """Return the sequence number of the given ID
359
    """
360
    separator = kw.get("separator", "-")
361
    postfix = id.replace(prefix, "").strip(separator)
362
    postfix_segments = postfix.split(separator)
363
    seq_number = 0
364
    possible_seq_nums = filter(lambda n: n.isalnum(), postfix_segments)
365
    if possible_seq_nums:
366
        seq_number = possible_seq_nums[-1]
367
368
    # Check if this id has to be expressed as an alphanumeric number
369
    seq_number = get_alpha_or_number(seq_number, id_template)
370
    seq_number = to_int(seq_number)
371
    return seq_number
372
373
374
def get_alpha_or_number(number, template):
375
    """Returns an Alphanumber that represents the number passed in, expressed
376
    as defined in the template. Otherwise, returns the number
377
    """
378
    match = re.match(r".*\{alpha:(\d+a\d+d)\}$", template.strip())
379
    if match and match.groups():
380
        format = match.groups()[0]
381
        return to_alpha(number, format)
382
    return number
383
384
385
def get_counted_number(context, config, variables, **kw):
386
    """Compute the number for the sequence type "Counter"
387
    """
388
    # This "context" is defined by the user in the Setup and can be actually
389
    # anything. However, we assume it is something like "sample" or similar
390
    ctx = config.get("context")
391
392
    # get object behind the context name (falls back to the current context)
393
    obj = variables.get(ctx, context)
394
395
    # get the counter type, which is either "backreference" or "contained"
396
    counter_type = config.get("counter_type")
397
398
    # the counter reference is either the "relationship" for
399
    # "backreference" or the meta type for contained objects
400
    counter_reference = config.get("counter_reference")
401
402
    # This should be a list of existing items, including the current context
403
    # object
404
    seq_items = get_objects_in_sequence(obj, counter_type, counter_reference)
405
406
    number = len(seq_items)
407
    return number
408
409
410
def get_generated_number(context, config, variables, **kw):
411
    """Generate a new persistent number with the number generator for the
412
    sequence type "Generated"
413
    """
414
    # separator where to split the ID
415
    separator = kw.get('separator', '-')
416
417
    # allow portal_type override
418
    portal_type = get_type_id(context, **kw)
419
420
    # The ID format for string interpolation, e.g. WS-{seq:03d}
421
    id_template = config.get("form", "")
422
423
    # The split length defines where the key is splitted from the value
424
    split_length = config.get("split_length", 1)
425
426
    # The prefix template is the static part of the ID
427
    prefix_template = slice(id_template, separator=separator, end=split_length)
428
429
    # get the number generator
430
    number_generator = getUtility(INumberGenerator)
431
432
    # generate the key for the number generator storage
433
    prefix = prefix_template.format(**variables)
434
435
    # normalize out any unicode characters like Ö, É, etc. from the prefix
436
    prefix = api.normalize_filename(prefix)
437
438
    # The key used for the storage
439
    key = make_storage_key(portal_type, prefix)
440
441
    # Handle flushed storage
442
    if key not in number_generator:
443
        max_num = 0
444
        existing = get_ids_with_prefix(portal_type, prefix)
445
        numbers = map(lambda id: get_seq_number_from_id(id, id_template, prefix), existing)
446
        # figure out the highest number in the sequence
447
        if numbers:
448
            max_num = max(numbers)
449
        # set the number generator
450
        logger.info("*** SEEDING Prefix '{}' to {}".format(prefix, max_num))
451
        number_generator.set_number(key, max_num)
452
453
    if not kw.get("dry_run", False):
454
        # Generate a new number
455
        # NOTE Even when the number exceeds the given ID sequence format,
456
        #      it will overflow gracefully, e.g.
457
        #      >>> {sampleId}-R{seq:03d}'.format(sampleId="Water", seq=999999)
458
        #      'Water-R999999‘
459
        number = number_generator.generate_number(key=key)
460
    else:
461
        # => This allows us to "preview" the next generated ID in the UI
462
        # TODO Show the user the next generated number somewhere in the UI
463
        number = number_generator.get(key, 1)
464
465
    # Return an int or Alphanumber
466
    return get_alpha_or_number(number, id_template)
467
468
469
def generateUniqueId(context, **kw):
470
    """ Generate pretty content IDs.
471
    """
472
473
    # get the config for this portal type from the system setup
474
    config = get_config(context, **kw)
475
476
    # get the variables map for later string interpolation
477
    variables = get_variables(context, **kw)
478
479
    # The new generate sequence number
480
    number = 0
481
482
    # get the sequence type from the global config
483
    sequence_type = config.get("sequence_type", "generated")
484
485
    # Sequence Type is "Counter", so we use the length of the backreferences or
486
    # contained objects of the evaluated "context" defined in the config
487
    if sequence_type in ["counter"]:
488
        number = get_counted_number(context, config, variables, **kw)
489
490
    # Sequence Type is "Generated", so the ID is constructed according to the
491
    # configured split length
492
    if sequence_type in ["generated"]:
493
        number = get_generated_number(context, config, variables, **kw)
494
495
    # store the new sequence number to the variables map for str interpolation
496
    if isinstance(number, Alphanumber):
497
        variables["alpha"] = number
498
    variables["seq"] = to_int(number)
499
500
    # The ID formatting template from user config, e.g. {sampleId}-R{seq:02d}
501
    id_template = config.get("form", "")
502
503
    # Interpolate the ID template
504
    try:
505
        new_id = id_template.format(**variables)
506
    except KeyError, e:
507
        logger.error('KeyError: {} not in id_template {}'.format(
508
            e, id_template))
509
        raise
510
    normalized_id = api.normalize_filename(new_id)
511
    logger.info("generateUniqueId: {}".format(normalized_id))
512
513
    return normalized_id
514
515
516
def renameAfterCreation(obj):
517
    """Rename the content after it was created/added
518
    """
519
    # Check if the _bika_id was already set
520
    bika_id = getattr(obj, "_bika_id", None)
521
    if bika_id is not None:
522
        return bika_id
523
    # Can't rename without a subtransaction commit when using portal_factory
524
    transaction.savepoint(optimistic=True)
525
    # The id returned should be normalized already
526
    new_id = None
527
    # Checking if an adapter exists for this content type. If yes, we will
528
    # get new_id from adapter.
529
    for name, adapter in getAdapters((obj, ), IIdServer):
530
        if new_id:
531
            logger.warn(('More than one ID Generator Adapter found for'
532
                         'content type -> %s') % obj.portal_type)
533
        new_id = adapter.generate_id(obj.portal_type)
534
    if not new_id:
535
        new_id = generateUniqueId(obj)
536
537
    # TODO: This is a naive check just in current folder
538
    # -> this should check globally for duplicate objects with same prefix
539
    # N.B. a check like `search_by_prefix` each time would probably slow things
540
    # down too much!
541
    # -> A solution could be to store all IDs with a certain prefix in a storage
542
    parent = api.get_parent(obj)
543
    if new_id in parent.objectIds():
544
        # XXX We could do the check in a `while` loop and generate a new one.
545
        raise KeyError("The ID {} is already taken in the path {}".format(
546
            new_id, api.get_path(parent)))
547
    # rename the object to the new id
548
    parent.manage_renameObject(obj.id, new_id)
549
550
    return new_id
551