Passed
Push — master ( 869927...b7ddba )
by Jordi
06:02 queued 01:49
created

bika.lims.idserver.get_generated_number()   B

Complexity

Conditions 5

Size

Total Lines 57
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 57
rs 8.8853
c 0
b 0
f 0
cc 5
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import itertools
9
import re
10
11
import transaction
12
from bika.lims import api
13
from bika.lims import logger
14
from bika.lims.alphanumber import Alphanumber
15
from bika.lims.alphanumber import to_alpha
16
from bika.lims.browser.fields.uidreferencefield import \
17
    get_backreferences as get_backuidreferences
18
from bika.lims.interfaces import IAnalysisRequest
19
from bika.lims.interfaces import IAnalysisRequestPartition
20
from bika.lims.interfaces import IAnalysisRequestRetest
21
from bika.lims.interfaces import IIdServer
22
from bika.lims.numbergenerator import INumberGenerator
23
from DateTime import DateTime
24
from Products.ATContentTypes.utils import DT2dt
25
from zope.component import getAdapters
26
from zope.component import getUtility
27
28
AR_TYPES = [
29
    "AnalysisRequest",
30
    "AnalysisRequestRetest",
31
    "AnalysisRequestPartition",
32
]
33
34
35
def get_objects_in_sequence(brain_or_object, ctype, cref):
36
    """Return a list of items
37
    """
38
    obj = api.get_object(brain_or_object)
39
    if ctype == "backreference":
40
        return get_backreferences(obj, cref)
41
    if ctype == "contained":
42
        return get_contained_items(obj, cref)
43
    raise ValueError("Reference value is mandatory for sequence type counter")
44
45
46
def get_backreferences(obj, relationship):
47
    """Returns the backreferences
48
    """
49
    refs = get_backuidreferences(obj, relationship)
50
51
    # TODO remove after all ReferenceField get ported to UIDReferenceField
52
    # At this moment, there are still some content types that are using the
53
    # ReferenceField, so we need to fallback to traditional getBackReferences
54
    # for these cases.
55
    if not refs:
56
        refs = obj.getBackReferences(relationship)
57
58
    return refs
59
60
61
def get_contained_items(obj, spec):
62
    """Returns a list of (id, subobject) tuples of the current context.
63
    If 'spec' is specified, returns only objects whose meta_type match 'spec'
64
    """
65
    return obj.objectItems(spec)
66
67
68
def get_type_id(context, **kw):
69
    """Returns the type id for the context passed in
70
    """
71
    portal_type = kw.get("portal_type", None)
72
    if portal_type:
73
        return portal_type
74
75
    # Override by provided marker interface
76
    if IAnalysisRequestPartition.providedBy(context):
77
        return "AnalysisRequestPartition"
78
    elif IAnalysisRequestRetest.providedBy(context):
79
        return "AnalysisRequestRetest"
80
81
    return api.get_portal_type(context)
82
83
84
def get_suffix(id, regex="-[A-Z]{1}[0-9]{1,2}$"):
85
    """Get the suffix of the ID, e.g. '-R01' or '-P05'
86
87
    The current regex determines a pattern of a single uppercase character with
88
    at most 2 numbers following at the end of the ID as the suffix.
89
    """
90
    parts = re.findall(regex, id)
91
    if not parts:
92
        return ""
93
    return parts[0]
94
95
96
def strip_suffix(id):
97
    """Split off any suffix from ID
98
99
    This mimics the old behavior of the Sample ID.
100
    """
101
    suffix = get_suffix(id)
102
    if not suffix:
103
        return id
104
    return re.split(suffix, id)[0]
105
106
107
def get_retest_count(context, default=0):
108
    """Returns the number of retests of this AR
109
    """
110
    if not is_ar(context):
111
        return default
112
113
    invalidated = context.getInvalidated()
114
115
    count = 0
116
    while invalidated:
117
        count += 1
118
        invalidated = invalidated.getInvalidated()
119
120
    return count
121
122
123
def get_partition_count(context, default=0):
124
    """Returns the number of partitions of this AR
125
    """
126
    if not is_ar(context):
127
        return default
128
129
    parent = context.getParentAnalysisRequest()
130
131
    if not parent:
132
        return default
133
134
    return len(parent.getDescendants())
135
136
137
def is_ar(context):
138
    """Checks if the context is an AR
139
    """
140
    return IAnalysisRequest.providedBy(context)
141
142
143
def get_config(context, **kw):
144
    """Fetch the config dict from the Bika Setup for the given portal_type
145
    """
146
    # get the ID formatting config
147
    config_map = api.get_bika_setup().getIDFormatting()
148
149
    # allow portal_type override
150
    portal_type = get_type_id(context, **kw)
151
152
    # check if we have a config for the given portal_type
153
    for config in config_map:
154
        if config['portal_type'].lower() == portal_type.lower():
155
            return config
156
157
    # return a default config
158
    default_config = {
159
        'form': '%s-{seq}' % portal_type.lower(),
160
        'sequence_type': 'generated',
161
        'prefix': '%s' % portal_type.lower(),
162
    }
163
    return default_config
164
165
166
def get_variables(context, **kw):
167
    """Prepares a dictionary of key->value pairs usable for ID formatting
168
    """
169
    # allow portal_type override
170
    portal_type = get_type_id(context, **kw)
171
172
    # The variables map hold the values that might get into the constructed id
173
    variables = {
174
        "context": context,
175
        "id": api.get_id(context),
176
        "portal_type": portal_type,
177
        "year": get_current_year(),
178
        "parent": api.get_parent(context),
179
        "seq": 0,
180
        "alpha": Alphanumber(0),
181
    }
182
183
    # Augment the variables map depending on the portal type
184
    if portal_type in AR_TYPES:
185
        now = DateTime()
186
        sampling_date = context.getSamplingDate()
187
        sampling_date = sampling_date and DT2dt(sampling_date) or DT2dt(now)
188
        date_sampled = context.getDateSampled()
189
        date_sampled = date_sampled and DT2dt(date_sampled) or DT2dt(now)
190
        test_count = 1
191
192
        variables.update({
193
            "clientId": context.getClientID(),
194
            "dateSampled": date_sampled,
195
            "samplingDate": sampling_date,
196
            "sampleType": context.getSampleType().getPrefix(),
197
            "test_count": test_count
198
        })
199
200
        # Partition
201
        if portal_type == "AnalysisRequestPartition":
202
            parent_ar = context.getParentAnalysisRequest()
203
            parent_ar_id = api.get_id(parent_ar)
204
            parent_base_id = strip_suffix(parent_ar_id)
205
            partition_count = get_partition_count(context)
206
            variables.update({
207
                "parent_analysisrequest": parent_ar,
208
                "parent_ar_id": parent_ar_id,
209
                "parent_base_id": parent_base_id,
210
                "partition_count": partition_count,
211
            })
212
213
        # Retest
214
        elif portal_type == "AnalysisRequestRetest":
215
            # Note: we use "parent" instead of "invalidated" for simplicity
216
            parent_ar = context.getInvalidated()
217
            parent_ar_id = api.get_id(parent_ar)
218
            parent_base_id = strip_suffix(parent_ar_id)
219
            # keep the full ID if the retracted AR is a partition
220
            if context.isPartition():
221
                parent_base_id = parent_ar_id
222
            retest_count = get_retest_count(context)
223
            test_count = test_count + retest_count
224
            variables.update({
225
                "parent_analysisrequest": parent_ar,
226
                "parent_ar_id": parent_ar_id,
227
                "parent_base_id": parent_base_id,
228
                "retest_count": retest_count,
229
                "test_count": test_count,
230
            })
231
232
    elif portal_type == "ARReport":
233
        variables.update({
234
            "clientId": context.aq_parent.getClientID(),
235
        })
236
237
    return variables
238
239
240
def split(string, separator="-"):
241
    """ split a string on the given separator
242
    """
243
    if not isinstance(string, basestring):
244
        return []
245
    return string.split(separator)
246
247
248
def to_int(thing, default=0):
249
    """Convert a thing to an integer
250
    """
251
    try:
252
        return int(thing)
253
    except (TypeError, ValueError):
254
        return default
255
256
257
def slice(string, separator="-", start=None, end=None):
258
    """Slice out a segment of a string, which is splitted on both the wildcards
259
    and the separator passed in, if any
260
    """
261
    # split by wildcards/keywords first
262
    # AR-{sampleType}-{parentId}{alpha:3a2d}
263
    segments = filter(None, re.split('(\{.+?\})', string))
264
    # ['AR-', '{sampleType}', '-', '{parentId}', '{alpha:3a2d}']
265
    if separator:
266
        # Keep track of singleton separators as empties
267
        # We need to do this to prevent duplicates later, when splitting
268
        segments = map(lambda seg: seg!=separator and seg or "", segments)
269
        # ['AR-', '{sampleType}', '', '{parentId}', '{alpha:3a2d}']
270
        # Split each segment at the given separator
271
        segments = map(lambda seg: split(seg, separator), segments)
272
        # [['AR', ''], ['{sampleType}'], [''], ['{parentId}'], ['{alpha:3a2d}']]
273
        # Flatten the list
274
        segments = list(itertools.chain.from_iterable(segments))
275
        # ['AR', '', '{sampleType}', '', '{parentId}', '{alpha:3a2d}']
276
        # And replace empties with separator
277
        segments = map(lambda seg: seg!="" and seg or separator, segments)
278
        # ['AR', '-', '{sampleType}', '-', '{parentId}', '{alpha:3a2d}']
279
280
    # Get the start and end positions from the segments without separator
281
    cleaned_segments = filter(lambda seg: seg!=separator, segments)
282
    start_pos = to_int(start, 0)
283
    # Note "end" is not a position, but the number of elements to join!
284
    end_pos = to_int(end, len(cleaned_segments) - start_pos) + start_pos - 1
285
286
    # Map the positions against the segments with separator
287
    start = segments.index(cleaned_segments[start_pos])
288
    end = segments.index(cleaned_segments[end_pos]) + 1
289
290
    # Return all segments joined
291
    sliced_parts = segments[start:end]
292
    return "".join(sliced_parts)
293
294
295
def get_current_year():
296
    """Returns the current year as a two digit string
297
    """
298
    return DateTime().strftime("%Y")[2:]
299
300
301
def search_by_prefix(portal_type, prefix):
302
    """Returns brains which share the same portal_type and ID prefix
303
    """
304
    catalog = api.get_tool("uid_catalog")
305
    brains = catalog({"portal_type": portal_type})
306
    # Filter brains with the same ID prefix
307
    return filter(lambda brain: api.get_id(brain).startswith(prefix), brains)
308
309
310
def get_ids_with_prefix(portal_type, prefix):
311
    """Return a list of ids sharing the same portal type and prefix
312
    """
313
    brains = search_by_prefix(portal_type, prefix)
314
    ids = map(api.get_id, brains)
315
    return ids
316
317
318
def make_storage_key(portal_type, prefix=None):
319
    """Make a storage (dict-) key for the number generator
320
    """
321
    key = portal_type.lower()
322
    if prefix:
323
        key = "{}-{}".format(key, prefix)
324
    return key
325
326
327
def get_seq_number_from_id(id, id_template, prefix, **kw):
328
    """Return the sequence number of the given ID
329
    """
330
    separator = kw.get("separator", "-")
331
    postfix = id.replace(prefix, "").strip(separator)
332
    postfix_segments = postfix.split(separator)
333
    seq_number = 0
334
    possible_seq_nums = filter(lambda n: n.isalnum(), postfix_segments)
335
    if possible_seq_nums:
336
        seq_number = possible_seq_nums[-1]
337
338
    # Check if this id has to be expressed as an alphanumeric number
339
    seq_number = get_alpha_or_number(seq_number, id_template)
340
    seq_number = to_int(seq_number)
341
    return seq_number
342
343
344
def get_alpha_or_number(number, template):
345
    """Returns an Alphanumber that represents the number passed in, expressed
346
    as defined in the template. Otherwise, returns the number
347
    """
348
    match = re.match(r".*\{alpha:(\d+a\d+d)\}$", template.strip())
349
    if match and match.groups():
350
        format = match.groups()[0]
351
        return to_alpha(number, format)
352
    return number
353
354
355
def get_counted_number(context, config, variables, **kw):
356
    """Compute the number for the sequence type "Counter"
357
    """
358
    # This "context" is defined by the user in the Setup and can be actually
359
    # anything. However, we assume it is something like "sample" or similar
360
    ctx = config.get("context")
361
362
    # get object behind the context name (falls back to the current context)
363
    obj = variables.get(ctx, context)
364
365
    # get the counter type, which is either "backreference" or "contained"
366
    counter_type = config.get("counter_type")
367
368
    # the counter reference is either the "relationship" for
369
    # "backreference" or the meta type for contained objects
370
    counter_reference = config.get("counter_reference")
371
372
    # This should be a list of existing items, including the current context
373
    # object
374
    seq_items = get_objects_in_sequence(obj, counter_type, counter_reference)
375
376
    number = len(seq_items)
377
    return number
378
379
380
def get_generated_number(context, config, variables, **kw):
381
    """Generate a new persistent number with the number generator for the
382
    sequence type "Generated"
383
    """
384
    # separator where to split the ID
385
    separator = kw.get('separator', '-')
386
387
    # allow portal_type override
388
    portal_type = get_type_id(context, **kw)
389
390
    # The ID format for string interpolation, e.g. WS-{seq:03d}
391
    id_template = config.get("form", "")
392
393
    # The split length defines where the key is splitted from the value
394
    split_length = config.get("split_length", 1)
395
396
    # The prefix template is the static part of the ID
397
    prefix_template = slice(id_template, separator=separator, end=split_length)
398
399
    # get the number generator
400
    number_generator = getUtility(INumberGenerator)
401
402
    # generate the key for the number generator storage
403
    prefix = prefix_template.format(**variables)
404
405
    # normalize out any unicode characters like Ö, É, etc. from the prefix
406
    prefix = api.normalize_filename(prefix)
407
408
    # The key used for the storage
409
    key = make_storage_key(portal_type, prefix)
410
411
    # Handle flushed storage
412
    if key not in number_generator:
413
        max_num = 0
414
        existing = get_ids_with_prefix(portal_type, prefix)
415
        numbers = map(lambda id: get_seq_number_from_id(id, id_template, prefix), existing)
416
        # figure out the highest number in the sequence
417
        if numbers:
418
            max_num = max(numbers)
419
        # set the number generator
420
        logger.info("*** SEEDING Prefix '{}' to {}".format(prefix, max_num))
421
        number_generator.set_number(key, max_num)
422
423
    if not kw.get("dry_run", False):
424
        # Generate a new number
425
        # NOTE Even when the number exceeds the given ID sequence format,
426
        #      it will overflow gracefully, e.g.
427
        #      >>> {sampleId}-R{seq:03d}'.format(sampleId="Water", seq=999999)
428
        #      'Water-R999999‘
429
        number = number_generator.generate_number(key=key)
430
    else:
431
        # => This allows us to "preview" the next generated ID in the UI
432
        # TODO Show the user the next generated number somewhere in the UI
433
        number = number_generator.get(key, 1)
434
435
    # Return an int or Alphanumber
436
    return get_alpha_or_number(number, id_template)
437
438
439
def generateUniqueId(context, **kw):
440
    """ Generate pretty content IDs.
441
    """
442
443
    # get the config for this portal type from the system setup
444
    config = get_config(context, **kw)
445
446
    # get the variables map for later string interpolation
447
    variables = get_variables(context, **kw)
448
449
    # The new generate sequence number
450
    number = 0
451
452
    # get the sequence type from the global config
453
    sequence_type = config.get("sequence_type", "generated")
454
455
    # Sequence Type is "Counter", so we use the length of the backreferences or
456
    # contained objects of the evaluated "context" defined in the config
457
    if sequence_type in ["counter"]:
458
        number = get_counted_number(context, config, variables, **kw)
459
460
    # Sequence Type is "Generated", so the ID is constructed according to the
461
    # configured split length
462
    if sequence_type in ["generated"]:
463
        number = get_generated_number(context, config, variables, **kw)
464
465
    # store the new sequence number to the variables map for str interpolation
466
    if isinstance(number, Alphanumber):
467
        variables["alpha"] = number
468
    variables["seq"] = to_int(number)
469
470
    # The ID formatting template from user config, e.g. {sampleId}-R{seq:02d}
471
    id_template = config.get("form", "")
472
473
    # Interpolate the ID template
474
    try:
475
        new_id = id_template.format(**variables)
476
    except KeyError, e:
477
        logger.error('KeyError: {} not in id_template {}'.format(
478
            e, id_template))
479
        raise
480
    normalized_id = api.normalize_filename(new_id)
481
    logger.info("generateUniqueId: {}".format(normalized_id))
482
483
    return normalized_id
484
485
486
def renameAfterCreation(obj):
487
    """Rename the content after it was created/added
488
    """
489
    # Check if the _bika_id was already set
490
    bika_id = getattr(obj, "_bika_id", None)
491
    if bika_id is not None:
492
        return bika_id
493
    # Can't rename without a subtransaction commit when using portal_factory
494
    transaction.savepoint(optimistic=True)
495
    # The id returned should be normalized already
496
    new_id = None
497
    # Checking if an adapter exists for this content type. If yes, we will
498
    # get new_id from adapter.
499
    for name, adapter in getAdapters((obj, ), IIdServer):
500
        if new_id:
501
            logger.warn(('More than one ID Generator Adapter found for'
502
                         'content type -> %s') % obj.portal_type)
503
        new_id = adapter.generate_id(obj.portal_type)
504
    if not new_id:
505
        new_id = generateUniqueId(obj)
506
507
    # TODO: This is a naive check just in current folder
508
    # -> this should check globally for duplicate objects with same prefix
509
    # N.B. a check like `search_by_prefix` each time would probably slow things
510
    # down too much!
511
    # -> A solution could be to store all IDs with a certain prefix in a storage
512
    parent = api.get_parent(obj)
513
    if new_id in parent.objectIds():
514
        # XXX We could do the check in a `while` loop and generate a new one.
515
        raise KeyError("The ID {} is already taken in the path {}".format(
516
            new_id, api.get_path(parent)))
517
    # rename the object to the new id
518
    parent.manage_renameObject(obj.id, new_id)
519
520
    return new_id
521