Passed
Push — 2.x ( a4d7e0...003ec3 )
by Jordi
07:40
created

senaite.core.idserver.idserver.to_int()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
import re
23
from datetime import datetime
24
25
import six
26
import transaction
27
from bika.lims import api
28
from bika.lims import logger
29
from bika.lims.browser.fields.uidreferencefield import \
30
    get_backreferences as get_backuidreferences
31
from bika.lims.interfaces import IAnalysisRequest
32
from bika.lims.interfaces import IAnalysisRequestPartition
33
from bika.lims.interfaces import IAnalysisRequestRetest
34
from bika.lims.interfaces import IAnalysisRequestSecondary
35
from bika.lims.interfaces import IARReport
36
from DateTime import DateTime
37
from Products.ATContentTypes.utils import DT2dt
38
from senaite.core.idserver.alphanumber import Alphanumber
39
from senaite.core.idserver.alphanumber import to_alpha
40
from senaite.core.interfaces import IIdServer
41
from senaite.core.interfaces import IIdServerTypeID
42
from senaite.core.interfaces import IIdServerVariables
43
from senaite.core.interfaces import INumberGenerator
44
from zope.component import getAdapters
45
from zope.component import getUtility
46
from zope.component import queryAdapter
47
48
AR_TYPES = [
49
    "AnalysisRequest",
50
    "AnalysisRequestRetest",
51
    "AnalysisRequestPartition",
52
    "AnalysisRequestSecondary",
53
]
54
55
56
def get_objects_in_sequence(brain_or_object, ctype, cref):
57
    """Return a list of items
58
    """
59
    obj = api.get_object(brain_or_object)
60
    if ctype == "backreference":
61
        return get_backreferences(obj, cref)
62
    if ctype == "contained":
63
        return get_contained_items(obj, cref)
64
    raise ValueError("Reference value is mandatory for sequence type counter")
65
66
67
def get_backreferences(obj, relationship):
68
    """Returns the backreferences
69
    """
70
    return get_backuidreferences(obj, relationship)
71
72
73
def get_contained_items(obj, spec):
74
    """Returns a list of (id, subobject) tuples of the current context.
75
    If 'spec' is specified, returns only objects whose meta_type match 'spec'
76
    """
77
    return obj.objectItems(spec)
78
79
80
def get_type_id(context, **kw):
81
    """Returns the type id for the context passed in, that is used for custom
82
    ID formatting, regardless of the real portal type of the context passed in.
83
    Thus, the function might return different type ids for two objects from
84
    same portal type (e.g AnalysisRequest). This type id is used later for
85
    ID server prefixes (in lowercase)
86
    """
87
    portal_type = kw.get("portal_type", None)
88
    if portal_type:
89
        return portal_type
90
91
    # Look for a get_type_id adapter
92
    adapter = queryAdapter(context, IIdServerTypeID)
93
    if adapter:
94
        type_id = adapter.get_type_id(**kw)
95
        if type_id:
96
            return type_id
97
98
    # Override by provided marker interface
99
    if IAnalysisRequestPartition.providedBy(context):
100
        return "AnalysisRequestPartition"
101
    elif IAnalysisRequestRetest.providedBy(context):
102
        return "AnalysisRequestRetest"
103
    elif IAnalysisRequestSecondary.providedBy(context):
104
        return "AnalysisRequestSecondary"
105
106
    return api.get_portal_type(context)
107
108
109
def get_suffix(id, regex="-[A-Z]{1}[0-9]{1,2}$"):
110
    """Get the suffix of the ID, e.g. '-R01' or '-P05'
111
112
    The current regex determines a pattern of a single uppercase character with
113
    at most 2 numbers following at the end of the ID as the suffix.
114
    """
115
    parts = re.findall(regex, id)
116
    if not parts:
117
        return ""
118
    return parts[0]
119
120
121
def strip_suffix(id):
122
    """Split off any suffix from ID
123
124
    This mimics the old behavior of the Sample ID.
125
    """
126
    suffix = get_suffix(id)
127
    if not suffix:
128
        return id
129
    return re.split(suffix, id)[0]
130
131
132
def get_retest_count(context, default=0):
133
    """Returns the number of retests of this AR
134
    """
135
    if not is_ar(context):
136
        return default
137
138
    invalidated = context.getInvalidated()
139
140
    count = 0
141
    while invalidated:
142
        count += 1
143
        invalidated = invalidated.getInvalidated()
144
145
    return count
146
147
148
def get_partition_count(context, default=0):
149
    """Returns the number of partitions of this AR
150
    """
151
    if not is_ar(context):
152
        return default
153
154
    parent = context.getParentAnalysisRequest()
155
156
    if not parent:
157
        return default
158
159
    return len(parent.getDescendants())
160
161
162
def get_secondary_count(context, default=0):
163
    """Returns the number of secondary ARs of this AR
164
    """
165
    if not is_ar(context):
166
        return default
167
168
    primary = context.getPrimaryAnalysisRequest()
169
170
    if not primary:
171
        return default
172
173
    return len(primary.getRawSecondaryAnalysisRequests())
174
175
176
def is_ar(context):
177
    """Checks if the context is an AR
178
    """
179
    return IAnalysisRequest.providedBy(context)
180
181
182
def get_config(context, **kw):
183
    """Fetch the config dict from the Bika Setup for the given portal_type
184
    """
185
    # get the ID formatting config
186
    config_map = api.get_bika_setup().getIDFormatting()
187
188
    # allow portal_type override
189
    portal_type = get_type_id(context, **kw)
190
191
    # check if we have a config for the given portal_type
192
    for config in config_map:
193
        if config['portal_type'].lower() == portal_type.lower():
194
            return config
195
196
    # return a default config
197
    default_config = {
198
        'form': '%s-{seq}' % portal_type.lower(),
199
        'sequence_type': 'generated',
200
        'prefix': '%s' % portal_type.lower(),
201
    }
202
    return default_config
203
204
205
def get_variables(context, **kw):
206
    """Prepares a dictionary of key->value pairs usable for ID formatting
207
    """
208
    # allow portal_type override
209
    portal_type = get_type_id(context, **kw)
210
211
    parent = kw.get("container") or api.get_parent(context)
212
213
    # The variables map hold the values that might get into the constructed id
214
    variables = {
215
        "context": context,
216
        "id": api.get_id(context),
217
        "portal_type": portal_type,
218
        "year": get_current_year(),
219
        "yymmdd": get_yymmdd(),
220
        "parent": parent,
221
        "seq": 0,
222
        "alpha": Alphanumber(0),
223
    }
224
225
    # Augment the variables map depending on the portal type
226
    if IAnalysisRequest.providedBy(context):
227
        now = DateTime()
228
        sampling_date = context.getSamplingDate()
229
        sampling_date = sampling_date and DT2dt(sampling_date) or DT2dt(now)
230
        date_sampled = context.getDateSampled()
231
        date_sampled = date_sampled and DT2dt(date_sampled) or DT2dt(now)
232
        test_count = 1
233
234
        variables.update({
235
            "clientId": context.getClientID(),
236
            "dateSampled": date_sampled,
237
            "samplingDate": sampling_date,
238
            "sampleType": context.getSampleType().getPrefix(),
239
            "test_count": test_count
240
        })
241
242
        # Partition
243
        if IAnalysisRequestPartition.providedBy(context):
244
            parent_ar = context.getParentAnalysisRequest()
245
            parent_ar_id = api.get_id(parent_ar)
246
            parent_base_id = strip_suffix(parent_ar_id)
247
            partition_count = get_partition_count(context)
248
            variables.update({
249
                "parent_analysisrequest": parent_ar,
250
                "parent_ar_id": parent_ar_id,
251
                "parent_base_id": parent_base_id,
252
                "partition_count": partition_count,
253
            })
254
255
        # Retest
256
        elif IAnalysisRequestRetest.providedBy(context):
257
            # Note: we use "parent" instead of "invalidated" for simplicity
258
            parent_ar = context.getInvalidated()
259
            parent_ar_id = api.get_id(parent_ar)
260
            parent_base_id = strip_suffix(parent_ar_id)
261
            # keep the full ID if the retracted AR is a partition
262
            if context.isPartition():
263
                parent_base_id = parent_ar_id
264
            retest_count = get_retest_count(context)
265
            test_count = test_count + retest_count
266
            variables.update({
267
                "parent_analysisrequest": parent_ar,
268
                "parent_ar_id": parent_ar_id,
269
                "parent_base_id": parent_base_id,
270
                "retest_count": retest_count,
271
                "test_count": test_count,
272
            })
273
274
        # Secondary
275
        elif IAnalysisRequestSecondary.providedBy(context):
276
            primary_ar = context.getPrimaryAnalysisRequest()
277
            primary_ar_id = api.get_id(primary_ar)
278
            parent_base_id = strip_suffix(primary_ar_id)
279
            secondary_count = get_secondary_count(context)
280
            variables.update({
281
                "parent_analysisrequest": primary_ar,
282
                "parent_ar_id": primary_ar_id,
283
                "parent_base_id": parent_base_id,
284
                "secondary_count": secondary_count,
285
            })
286
287
    elif IARReport.providedBy(context):
288
        variables.update({
289
            "clientId": parent.getClientID(),
290
        })
291
292
    # Look for a variables adapter
293
    adapter = queryAdapter(context, IIdServerVariables)
294
    if adapter:
295
        vars = adapter.get_variables(**kw)
296
        variables.update(vars)
297
298
    return variables
299
300
301
def split(string, separator="-"):
302
    """ split a string on the given separator
303
    """
304
    if not isinstance(string, six.string_types):
305
        return []
306
    return string.split(separator)
307
308
309
def to_int(thing, default=0):
310
    """Convert a thing to an integer
311
    """
312
    try:
313
        return int(thing)
314
    except (TypeError, ValueError):
315
        return default
316
317
318
def slice(string, separator="-", start=None, end=None):
319
    """Slice out a segment of a string, which is splitted on both the wildcards
320
    and the separator passed in, if any
321
    """
322
    # split by wildcards/keywords first
323
    # AR-{sampleType}-{parentId}{alpha:3a2d}
324
    segments = filter(None, re.split(r'(\{.+?})', string))
325
    # ['AR-', '{sampleType}', '-', '{parentId}', '{alpha:3a2d}']
326
    if separator:
327
        # Keep track of singleton separators as empties
328
        # We need to do this to prevent duplicates later, when splitting
329
        segments = map(lambda seg: seg!=separator and seg or "", segments)
330
        # ['AR-', '{sampleType}', '', '{parentId}', '{alpha:3a2d}']
331
        # Split each segment at the given separator
332
        segments = map(lambda seg: split(seg, separator), segments)
333
        # [['AR', ''], ['{sampleType}'], [''], ['{parentId}'], ['{alpha:3a2d}']]
334
        # Flatten the list
335
        segments = list(itertools.chain.from_iterable(segments))
336
        # ['AR', '', '{sampleType}', '', '{parentId}', '{alpha:3a2d}']
337
        # And replace empties with separator
338
        segments = map(lambda seg: seg!="" and seg or separator, segments)
339
        # ['AR', '-', '{sampleType}', '-', '{parentId}', '{alpha:3a2d}']
340
341
    # Get the start and end positions from the segments without separator
342
    cleaned_segments = filter(lambda seg: seg!=separator, segments)
343
    start_pos = to_int(start, 0)
344
    # Note "end" is not a position, but the number of elements to join!
345
    end_pos = to_int(end, len(cleaned_segments) - start_pos) + start_pos - 1
346
347
    # Map the positions against the segments with separator
348
    start = segments.index(cleaned_segments[start_pos])
349
    end = segments.index(cleaned_segments[end_pos]) + 1
350
351
    # Return all segments joined
352
    sliced_parts = segments[start:end]
353
    return "".join(sliced_parts)
354
355
356
def get_current_year():
357
    """Returns the current year as a two digit string
358
    """
359
    return DateTime().strftime("%Y")[2:]
360
361
def get_yymmdd():
362
    """Returns the current date in yymmdd format
363
    """
364
    return datetime.now().strftime("%y%m%d")
365
366
def make_storage_key(portal_type, prefix=None):
367
    """Make a storage (dict-) key for the number generator
368
    """
369
    key = portal_type.lower()
370
    if prefix:
371
        key = "{}-{}".format(key, prefix)
372
    return key
373
374
375
def get_seq_number_from_id(id, id_template, prefix, **kw):
376
    """Return the sequence number of the given ID
377
    """
378
    separator = kw.get("separator", "-")
379
    postfix = id.replace(prefix, "").strip(separator)
380
    postfix_segments = postfix.split(separator)
381
    seq_number = 0
382
    possible_seq_nums = filter(lambda n: n.isalnum(), postfix_segments)
383
    if possible_seq_nums:
384
        seq_number = possible_seq_nums[-1]
385
386
    # Check if this id has to be expressed as an alphanumeric number
387
    seq_number = get_alpha_or_number(seq_number, id_template)
388
    seq_number = to_int(seq_number)
389
    return seq_number
390
391
392
def get_alpha_or_number(number, template):
393
    """Returns an Alphanumber that represents the number passed in, expressed
394
    as defined in the template. Otherwise, returns the number
395
    """
396
    match = re.match(r".*\{alpha:(\d+a\d+d)\}$", template.strip())
397
    if match and match.groups():
398
        format = match.groups()[0]
399
        return to_alpha(number, format)
400
    return number
401
402
403
def get_counted_number(context, config, variables, **kw):
404
    """Compute the number for the sequence type "Counter"
405
    """
406
    # This "context" is defined by the user in the Setup and can be actually
407
    # anything. However, we assume it is something like "sample" or similar
408
    ctx = config.get("context")
409
410
    # get object behind the context name (falls back to the current context)
411
    obj = variables.get(ctx, context)
412
413
    # get the counter type, which is either "backreference" or "contained"
414
    counter_type = config.get("counter_type")
415
    if counter_type == "backreference":
416
        logger.warn("Counter type 'backreference' is obsolete! "
417
                    "Please use 'contained' instead. Alternatively, use "
418
                    "'generated' instead of 'counter' as the sequence type")
419
420
    # the counter reference is either the "relationship" for
421
    # "backreference" or the meta type for contained objects
422
    counter_reference = config.get("counter_reference")
423
424
    # This should be a list of existing items, including the current context
425
    # object
426
    seq_items = get_objects_in_sequence(obj, counter_type, counter_reference)
427
428
    number = len(seq_items)
429
    return number
430
431
432
def get_generated_number(context, config, variables, **kw):
433
    """Generate a new persistent number with the number generator for the
434
    sequence type "Generated"
435
    """
436
    # separator where to split the ID
437
    separator = kw.get('separator', '-')
438
439
    # allow portal_type override
440
    portal_type = get_type_id(context, **kw)
441
442
    # The ID format for string interpolation, e.g. WS-{seq:03d}
443
    id_template = config.get("form", "")
444
445
    # The split length defines where the key is splitted from the value
446
    split_length = config.get("split_length", 1)
447
448
    # The prefix template is the static part of the ID
449
    prefix_template = slice(id_template, separator=separator, end=split_length)
450
451
    # get the number generator
452
    number_generator = getUtility(INumberGenerator)
453
454
    # generate the key for the number generator storage
455
    prefix = prefix_template.format(**variables)
456
457
    # normalize out any unicode characters like Ö, É, etc. from the prefix
458
    prefix = api.normalize_filename(prefix)
459
460
    # The key used for the storage
461
    key = make_storage_key(portal_type, prefix)
462
463
    if not kw.get("dry_run", False):
464
        # Generate a new number
465
        # NOTE Even when the number exceeds the given ID sequence format,
466
        #      it will overflow gracefully, e.g.
467
        #      >>> {sampleId}-R{seq:03d}'.format(sampleId="Water", seq=999999)
468
        #      'Water-R999999‘
469
        number = number_generator.generate_number(key=key)
470
    else:
471
        # => This allows us to "preview" the next generated ID in the UI
472
        # TODO Show the user the next generated number somewhere in the UI
473
        number = number_generator.get(key, 1)
474
475
    # Return an int or Alphanumber
476
    return get_alpha_or_number(number, id_template)
477
478
479
def generateUniqueId(context, **kw):
480
    """ Generate pretty content IDs.
481
    """
482
483
    # get the config for this portal type from the system setup
484
    config = get_config(context, **kw)
485
486
    # get the variables map for later string interpolation
487
    variables = get_variables(context, **kw)
488
489
    # The new generate sequence number
490
    number = 0
491
492
    # get the sequence type from the global config
493
    sequence_type = config.get("sequence_type", "generated")
494
495
    # Sequence Type is "Counter", so we use the length of the backreferences or
496
    # contained objects of the evaluated "context" defined in the config
497
    if sequence_type in ["counter"]:
498
        number = get_counted_number(context, config, variables, **kw)
499
500
    # Sequence Type is "Generated", so the ID is constructed according to the
501
    # configured split length
502
    if sequence_type in ["generated"]:
503
        number = get_generated_number(context, config, variables, **kw)
504
505
    # store the new sequence number to the variables map for str interpolation
506
    if isinstance(number, Alphanumber):
507
        variables["alpha"] = number
508
    variables["seq"] = to_int(number)
509
510
    # The ID formatting template from user config, e.g. {sampleId}-R{seq:02d}
511
    id_template = config.get("form", "")
512
513
    # Interpolate the ID template
514
    try:
515
        new_id = id_template.format(**variables)
516
    except KeyError as e:
517
        logger.error('KeyError: {} not in id_template {}'.format(
518
            e, id_template))
519
        raise
520
    normalized_id = api.normalize_filename(new_id)
521
    logger.info("generateUniqueId: {}".format(normalized_id))
522
523
    return normalized_id
524
525
526
def renameAfterCreation(obj):
527
    """Rename the content after it was created/added
528
    """
529
    # Can't rename without a subtransaction commit when using portal_factory
530
    transaction.savepoint(optimistic=True)
531
532
    # The id returned should be normalized already
533
    new_id = None
534
535
    # Checking if an adapter exists for this content type. If yes, we will
536
    # get new_id from adapter.
537
    for name, adapter in getAdapters((obj, ), IIdServer):
538
        if new_id:
539
            logger.warn(('More than one ID Generator Adapter found for'
540
                         'content type -> %s') % obj.portal_type)
541
        new_id = adapter.generate_id(obj.portal_type)
542
    if not new_id:
543
        new_id = generateUniqueId(obj)
544
545
    # rename the object to the new id
546
    parent = api.get_parent(obj)
547
    parent.manage_renameObject(obj.id, new_id)
548
549
    return new_id
550