common.helpers.image_timedelta()   C
last analyzed

Complexity

Conditions 10

Size

Total Lines 32
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 32
rs 5.9999
c 0
b 0
f 0
cc 10
nop 2

How to fix   Complexity   

Complexity

Complex classes like common.helpers.image_timedelta() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Wed Mar 27 12:50:16 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import re
10
import logging
11
import datetime
12
import websockets
13
import time
14
import json
15
16
from dateutil.relativedelta import relativedelta
17
18
from django.conf import settings
19
from django.contrib.admin.utils import NestedObjects
20
from django.core.mail import send_mass_mail
21
from django.db import DEFAULT_DB_ALIAS
22
from django.utils.text import capfirst
23
from django.utils.encoding import force_text
24
25
from .constants import YEARS, MONTHS, DAYS, OBO_URL, TIME_UNITS
26
27
# Get an instance of a logger
28
logger = logging.getLogger(__name__)
29
30
31
def image_timedelta(t1, t2):
32
    """A function to deal with image time intervals. Returns a number and
33
    time unit"""
34
35
    if t1 is None or t2 is None:
36
        logger.warning("One date is NULL ({0}, {1}) ignoring".format(t2, t1))
37
        return None, YEARS
38
39
    if type(t1) != datetime.date or type(t2) != datetime.date:
40
        raise ValueError(
41
            "Expecting a datetime value (got '%s':%s and '%s':%s)" % (
42
                t1, type(t1), t2, type(t2)))
43
44
    if t2 > t1:
45
        logger.warning("t2>t1 ({0}, {1}) ignoring".format(t2, t1))
46
        return None, YEARS
47
48
    # check for meaningful intervald
49
    if t1.year == 1900 or t2.year == 1900:
50
        logger.warning("Ignoring one date ({0}, {1})".format(t2, t1))
51
        return None, YEARS
52
53
    rdelta = relativedelta(t1, t2)
54
55
    if rdelta.years != 0:
56
        return rdelta.years, YEARS
57
58
    elif rdelta.months != 0:
59
        return rdelta.months, MONTHS
60
61
    else:
62
        return rdelta.days, DAYS
63
64
65
PATTERN_INTERVAL = re.compile(r"([\d]+) ([\w]+s?)")
66
67
68
def parse_image_timedelta(interval):
69
    """A function to parse from a image_timdelta string"""
70
71
    # cast interval as string
72
    interval = str(interval)
73
74
    match = re.search(PATTERN_INTERVAL, interval)
75
76
    # get parsed data
77
    try:
78
        value, units = match.groups()
79
80
    except AttributeError:
81
        raise ValueError("Expecting a value like '1 year', '10 months', etc")
82
83
    # time units are plural in database
84
    if units[-1] != 's':
85
        units += 's'
86
87
    # get time units from database
88
    units = TIME_UNITS.get_value_by_desc(units)
89
90
    return int(value), units
91
92
93
# https://stackoverflow.com/a/39533619/4385116
94
# inspired django.contrib.admin.utils.get_deleted_objects, this function
95
# tries to determine all related objects starting from a provied one
96
# HINT: similar function at https://gist.github.com/nealtodd/4594575
97
def get_deleted_objects(objs, db_alias=DEFAULT_DB_ALIAS):
98
    # NestedObjects is an imporovement of django.db.models.deletion.Collector
99
    collector = NestedObjects(using=db_alias)
100
    collector.collect(objs)
101
102
    def format_callback(obj):
103
        opts = obj._meta
104
        no_edit_link = '%s: %s' % (capfirst(opts.verbose_name),
105
                                   force_text(obj))
106
        return no_edit_link
107
108
    to_delete = collector.nested(format_callback)
109
    protected = [format_callback(obj) for obj in collector.protected]
110
    model_count = {
111
        model._meta.verbose_name_plural:
112
            len(objs) for model, objs in collector.model_objs.items()}
113
114
    return to_delete, model_count, protected
115
116
117
async def send_message_to_websocket(message, pk):
118
    """
119
    Function will create websocket object and send message to django-channels
120
121
    Args:
122
        message (dict): message to send to websocket
123
        pk (str): primary key of submission
124
    """
125
    # Need to have it here as in case with small test data message sent to
126
    # websocket will overcome response from server
127
    time.sleep(3)
128
    async with websockets.connect(
129
            'ws://asgi:8001/ws/submissions/{}/'.format(pk)) as websocket:
130
        await websocket.send(json.dumps(message))
131
132
133
def format_attribute(value, terms=None, library_uri=OBO_URL, units=None):
134
    """Format a generic attribute into biosample dictionary"""
135
136
    if value is None:
137
        return None
138
139
    # pay attention to datetime objects
140
    if isinstance(value, datetime.date):
141
        value = str(value)
142
143
    # HINT: need I deal with multiple values?
144
145
    result = {}
146
    result["value"] = value
147
148
    if terms:
149
        result["terms"] = [{
150
            "url": "/".join([
151
                library_uri,
152
                terms])
153
        }]
154
155
    if units:
156
        result["units"] = units
157
158
    # return a list of dictionaries
159
    return [result]
160
161
162
def get_admin_emails():
163
    """Return admin email from image.settings"""
164
165
    ADMINS = settings.ADMINS
166
167
    # return all admin mail addresses
168
    return [admin[1] for admin in ADMINS]
169
170
171
def send_mail_to_admins(
172
        email_subject, email_message,
173
        default_from=settings.DEFAULT_FROM_EMAIL,
174
        addresses=get_admin_emails()):
175
    """Send a mail to all admins
176
177
    Args:
178
        email_subject (str): the email subject
179
        email_message (str): the body of the email
180
        default_from (str): the from field of the email
181
        addresses (list): a list of email addresses
182
    """
183
184
    # submit mail to admins
185
    datatuple = (
186
        email_subject,
187
        email_message,
188
        default_from,
189
        addresses)
190
191
    send_mass_mail((datatuple, ))
192
193
194
def uid2biosample(value):
195
    """Convert human-readable name to model field"""
196
    if value == 'Sample storage':
197
        return 'storage'
198
    elif value == 'Sample storage processing':
199
        return 'storage_processing'
200
    elif value == 'Sampling to preparation interval':
201
        return 'preparation_interval_units'
202
    elif value == 'Specimen collection protocol':
203
        return 'protocol'
204
    return '_'.join(value.lower().split(" "))
205