Completed
Pull Request — master (#41)
by Paolo
06:52
created

common.helpers.format_attribute()   A

Complexity

Conditions 5

Size

Total Lines 27
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 27
rs 9.1832
c 0
b 0
f 0
cc 5
nop 4
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Wed Mar 27 12:50:16 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import re
10
import logging
11
import datetime
12
import websockets
13
import time
14
import json
15
16
from dateutil.relativedelta import relativedelta
17
18
from django.conf import settings
19
from django.contrib.admin.utils import NestedObjects
20
from django.db import DEFAULT_DB_ALIAS
21
from django.utils.text import capfirst
22
from django.utils.encoding import force_text
23
24
from .constants import YEARS, MONTHS, DAYS, OBO_URL, TIME_UNITS
25
26
# Get an instance of a logger
27
logger = logging.getLogger(__name__)
28
29
30
def image_timedelta(t1, t2):
31
    """A function to deal with image time intervals. Returns a number and
32
    time unit"""
33
34
    if t1 is None or t2 is None:
35
        logger.warning("One date is NULL ({0}, {1}) ignoring".format(t2, t1))
36
        return None, YEARS
37
38
    if t2 > t1:
39
        logger.warning("t2>t1 ({0}, {1}) ignoring".format(t2, t1))
40
        return None, YEARS
41
42
    # check for meaningful intervald
43
    if t1.year == 1900 or t2.year == 1900:
44
        logger.warning("Ignoring one date ({0}, {1})".format(t2, t1))
45
        return None, YEARS
46
47
    rdelta = relativedelta(t1, t2)
48
49
    if rdelta.years != 0:
50
        return rdelta.years, YEARS
51
52
    elif rdelta.months != 0:
53
        return rdelta.months, MONTHS
54
55
    else:
56
        return rdelta.days, DAYS
57
58
59
PATTERN_INTERVAL = re.compile(r"([\d]+) ([\w]+s?)")
60
61
62
def parse_image_timedelta(interval):
63
    """A function to parse from a image_timdelta string"""
64
65
    match = re.search(PATTERN_INTERVAL, interval)
66
67
    # get parsed data
68
    value, units = match.groups()
69
70
    # time units are plural in database
71
    if units[-1] != 's':
72
        units += 's'
73
74
    # get time units from database
75
    units = TIME_UNITS.get_value_by_desc(units)
76
77
    return int(value), units
78
79
80
# https://stackoverflow.com/a/39533619/4385116
81
# inspired django.contrib.admin.utils.get_deleted_objects, this function
82
# tries to determine all related objects starting from a provied one
83
# HINT: similar function at https://gist.github.com/nealtodd/4594575
84
def get_deleted_objects(objs, db_alias=DEFAULT_DB_ALIAS):
85
    # NestedObjects is an imporovement of django.db.models.deletion.Collector
86
    collector = NestedObjects(using=db_alias)
87
    collector.collect(objs)
88
89
    def format_callback(obj):
90
        opts = obj._meta
91
        no_edit_link = '%s: %s' % (capfirst(opts.verbose_name),
92
                                   force_text(obj))
93
        return no_edit_link
94
95
    to_delete = collector.nested(format_callback)
96
    protected = [format_callback(obj) for obj in collector.protected]
97
    model_count = {
98
        model._meta.verbose_name_plural:
99
            len(objs) for model, objs in collector.model_objs.items()}
100
101
    return to_delete, model_count, protected
102
103
104
async def send_message_to_websocket(message, pk):
105
    """
106
    Function will create websocket object and send message to django-channels
107
    Args:
108
        message (dict): message to send to websocket
109
        pk (str): primary key of submission
110
    """
111
    # Need to have it here as in case with small test data message sent to
112
    # websocket will overcome response from server
113
    time.sleep(3)
114
    async with websockets.connect(
115
            'ws://asgi:8001/image/ws/submissions/{}/'.format(pk)) as websocket:
116
        await websocket.send(json.dumps(message))
117
118
119
def format_attribute(value, terms=None, library_uri=OBO_URL, units=None):
120
    """Format a generic attribute into biosample dictionary"""
121
122
    if value is None:
123
        return None
124
125
    # pay attention to datetime objects
126
    if isinstance(value, datetime.date):
127
        value = str(value)
128
129
    # HINT: need I deal with multiple values?
130
131
    result = {}
132
    result["value"] = value
133
134
    if terms:
135
        result["terms"] = [{
136
            "url": "/".join([
137
                library_uri,
138
                terms])
139
        }]
140
141
    if units:
142
        result["units"] = units
143
144
    # return a list of dictionaries
145
    return [result]
146
147
148
def get_admin_emails():
149
    """Return admin email from image.settings"""
150
151
    ADMINS = settings.ADMINS
152
153
    # return all admin mail addresses
154
    return [admin[1] for admin in ADMINS]
155
156
157
def uid2biosample(value):
158
    """Convert human-readable name to model field"""
159
    if value == 'Sample storage':
160
        return 'storage'
161
    elif value == 'Sample storage processing':
162
        return 'storage_processing'
163
    elif value == 'Sampling to preparation interval':
164
        return 'preparation_interval_units'
165
    return '_'.join(value.lower().split(" "))
166