Passed
Pull Request — devel (#72)
by Paolo
06:33
created

common.helpers.uid2biosample()   A

Complexity

Conditions 5

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 11
rs 9.3333
c 0
b 0
f 0
cc 5
nop 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Wed Mar 27 12:50:16 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import re
10
import logging
11
import datetime
12
import websockets
13
import time
14
import json
15
16
from dateutil.relativedelta import relativedelta
17
18
from django.conf import settings
19
from django.contrib.admin.utils import NestedObjects
20
from django.core.mail import send_mass_mail
21
from django.db import DEFAULT_DB_ALIAS
22
from django.utils.text import capfirst
23
from django.utils.encoding import force_text
24
25
from .constants import YEARS, MONTHS, DAYS, OBO_URL, TIME_UNITS
26
27
# Get an instance of a logger
28
logger = logging.getLogger(__name__)
29
30
31
def image_timedelta(t1, t2):
32
    """A function to deal with image time intervals. Returns a number and
33
    time unit"""
34
35
    if t1 is None or t2 is None:
36
        logger.warning("One date is NULL ({0}, {1}) ignoring".format(t2, t1))
37
        return None, YEARS
38
39
    if type(t1) != datetime.date or type(t2) != datetime.date:
40
        raise ValueError("Expecting a datetime value")
41
42
    if t2 > t1:
43
        logger.warning("t2>t1 ({0}, {1}) ignoring".format(t2, t1))
44
        return None, YEARS
45
46
    # check for meaningful intervald
47
    if t1.year == 1900 or t2.year == 1900:
48
        logger.warning("Ignoring one date ({0}, {1})".format(t2, t1))
49
        return None, YEARS
50
51
    rdelta = relativedelta(t1, t2)
52
53
    if rdelta.years != 0:
54
        return rdelta.years, YEARS
55
56
    elif rdelta.months != 0:
57
        return rdelta.months, MONTHS
58
59
    else:
60
        return rdelta.days, DAYS
61
62
63
PATTERN_INTERVAL = re.compile(r"([\d]+) ([\w]+s?)")
64
65
66
def parse_image_timedelta(interval):
67
    """A function to parse from a image_timdelta string"""
68
69
    # cast interval as string
70
    interval = str(interval)
71
72
    match = re.search(PATTERN_INTERVAL, interval)
73
74
    # get parsed data
75
    try:
76
        value, units = match.groups()
77
78
    except AttributeError:
79
        raise ValueError("Expecting a value like '1 year', '10 months', etc")
80
81
    # time units are plural in database
82
    if units[-1] != 's':
83
        units += 's'
84
85
    # get time units from database
86
    units = TIME_UNITS.get_value_by_desc(units)
87
88
    return int(value), units
89
90
91
# https://stackoverflow.com/a/39533619/4385116
92
# inspired django.contrib.admin.utils.get_deleted_objects, this function
93
# tries to determine all related objects starting from a provied one
94
# HINT: similar function at https://gist.github.com/nealtodd/4594575
95
def get_deleted_objects(objs, db_alias=DEFAULT_DB_ALIAS):
96
    # NestedObjects is an imporovement of django.db.models.deletion.Collector
97
    collector = NestedObjects(using=db_alias)
98
    collector.collect(objs)
99
100
    def format_callback(obj):
101
        opts = obj._meta
102
        no_edit_link = '%s: %s' % (capfirst(opts.verbose_name),
103
                                   force_text(obj))
104
        return no_edit_link
105
106
    to_delete = collector.nested(format_callback)
107
    protected = [format_callback(obj) for obj in collector.protected]
108
    model_count = {
109
        model._meta.verbose_name_plural:
110
            len(objs) for model, objs in collector.model_objs.items()}
111
112
    return to_delete, model_count, protected
113
114
115
async def send_message_to_websocket(message, pk):
116
    """
117
    Function will create websocket object and send message to django-channels
118
119
    Args:
120
        message (dict): message to send to websocket
121
        pk (str): primary key of submission
122
    """
123
    # Need to have it here as in case with small test data message sent to
124
    # websocket will overcome response from server
125
    time.sleep(3)
126
    async with websockets.connect(
127
            'ws://asgi:8001/image/ws/submissions/{}/'.format(pk)) as websocket:
128
        await websocket.send(json.dumps(message))
129
130
131
def format_attribute(value, terms=None, library_uri=OBO_URL, units=None):
132
    """Format a generic attribute into biosample dictionary"""
133
134
    if value is None:
135
        return None
136
137
    # pay attention to datetime objects
138
    if isinstance(value, datetime.date):
139
        value = str(value)
140
141
    # HINT: need I deal with multiple values?
142
143
    result = {}
144
    result["value"] = value
145
146
    if terms:
147
        result["terms"] = [{
148
            "url": "/".join([
149
                library_uri,
150
                terms])
151
        }]
152
153
    if units:
154
        result["units"] = units
155
156
    # return a list of dictionaries
157
    return [result]
158
159
160
def get_admin_emails():
161
    """Return admin email from image.settings"""
162
163
    ADMINS = settings.ADMINS
164
165
    # return all admin mail addresses
166
    return [admin[1] for admin in ADMINS]
167
168
169
def send_mail_to_admins(
170
        email_subject, email_message,
171
        default_from=settings.DEFAULT_FROM_EMAIL,
172
        addresses=get_admin_emails()):
173
    """Send a mail to all admins
174
175
    Args:
176
        email_subject (str): the email subject
177
        email_message (str): the body of the email
178
        default_from (str): the from field of the email
179
        addresses (list): a list of email addresses
180
    """
181
182
    # submit mail to admins
183
    datatuple = (
184
        email_subject,
185
        email_message,
186
        default_from,
187
        addresses)
188
189
    send_mass_mail((datatuple, ))
190
191
192
def uid2biosample(value):
193
    """Convert human-readable name to model field"""
194
    if value == 'Sample storage':
195
        return 'storage'
196
    elif value == 'Sample storage processing':
197
        return 'storage_processing'
198
    elif value == 'Sampling to preparation interval':
199
        return 'preparation_interval_units'
200
    elif value == 'Specimen collection protocol':
201
        return 'protocol'
202
    return '_'.join(value.lower().split(" "))
203