Completed
Push — master ( 5644bc...b87258 )
by Paolo
17s queued 13s
created

common.helpers.format_attribute()   A

Complexity

Conditions 5

Size

Total Lines 27
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 27
rs 9.1832
c 0
b 0
f 0
cc 5
nop 4
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Wed Mar 27 12:50:16 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import logging
10
import datetime
11
import websockets
12
import time
13
import json
14
15
from dateutil.relativedelta import relativedelta
16
17
from django.conf import settings
18
from django.contrib.admin.utils import NestedObjects
19
from django.db import DEFAULT_DB_ALIAS
20
from django.utils.text import capfirst
21
from django.utils.encoding import force_text
22
23
from .constants import YEARS, MONTHS, DAYS, OBO_URL
24
25
# Get an instance of a logger
26
logger = logging.getLogger(__name__)
27
28
29
def image_timedelta(t1, t2):
30
    """A function to deal with image time intervals. Returns a number and
31
    time unit"""
32
33
    if t1 is None or t2 is None:
34
        logger.warning("One date is NULL ({0}, {1}) ignoring".format(t2, t1))
35
        return None, YEARS
36
37
    if t2 > t1:
38
        logger.warning("t2>t1 ({0}, {1}) ignoring".format(t2, t1))
39
        return None, YEARS
40
41
    # check for meaningful intervald
42
    if t1.year == 1900 or t2.year == 1900:
43
        logger.warning("Ignoring one date ({0}, {1})".format(t2, t1))
44
        return None, YEARS
45
46
    rdelta = relativedelta(t1, t2)
47
48
    if rdelta.years != 0:
49
        return rdelta.years, YEARS
50
51
    elif rdelta.months != 0:
52
        return rdelta.months, MONTHS
53
54
    else:
55
        return rdelta.days, DAYS
56
57
58
# https://stackoverflow.com/a/39533619/4385116
59
# inspired django.contrib.admin.utils.get_deleted_objects, this function
60
# tries to determine all related objects starting from a provied one
61
# HINT: similar function at https://gist.github.com/nealtodd/4594575
62
def get_deleted_objects(objs, db_alias=DEFAULT_DB_ALIAS):
63
    # NestedObjects is an imporovement of django.db.models.deletion.Collector
64
    collector = NestedObjects(using=db_alias)
65
    collector.collect(objs)
66
67
    def format_callback(obj):
68
        opts = obj._meta
69
        no_edit_link = '%s: %s' % (capfirst(opts.verbose_name),
70
                                   force_text(obj))
71
        return no_edit_link
72
73
    to_delete = collector.nested(format_callback)
74
    protected = [format_callback(obj) for obj in collector.protected]
75
    model_count = {
76
        model._meta.verbose_name_plural:
77
            len(objs) for model, objs in collector.model_objs.items()}
78
79
    return to_delete, model_count, protected
80
81
82
async def send_message_to_websocket(message, pk):
83
    """
84
    Function will create websocket object and send message to django-channels
85
    Args:
86
        message (dict): message to send to websocket
87
        pk (str): primary key of submission
88
    """
89
    # Need to have it here as in case with small test data message sent to
90
    # websocket will overcome response from server
91
    time.sleep(3)
92
    async with websockets.connect(
93
            'ws://asgi:8001/image/ws/submissions/{}/'.format(pk)) as websocket:
94
        await websocket.send(json.dumps(message))
95
96
97
def format_attribute(value, terms=None, library_uri=OBO_URL, units=None):
98
    """Format a generic attribute into biosample dictionary"""
99
100
    if value is None:
101
        return None
102
103
    # pay attention to datetime objects
104
    if isinstance(value, datetime.date):
105
        value = str(value)
106
107
    # HINT: need I deal with multiple values?
108
109
    result = {}
110
    result["value"] = value
111
112
    if terms:
113
        result["terms"] = [{
114
            "url": "/".join([
115
                library_uri,
116
                terms])
117
        }]
118
119
    if units:
120
        result["units"] = units
121
122
    # return a list of dictionaries
123
    return [result]
124
125
126
def get_admin_emails():
127
    """Return admin email from image.settings"""
128
129
    ADMINS = settings.ADMINS
130
131
    # return all admin mail addresses
132
    return [admin[1] for admin in ADMINS]
133