Completed
Pull Request — master (#44)
by Paolo
06:24
created

FetchStatusHelper.finalize()   A

Complexity

Conditions 4

Size

Total Lines 52
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 52
rs 9.328
c 0
b 0
f 0
cc 4
nop 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Tue Jul 16 11:25:03 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
import os
10
import json
11
12
from decouple import AutoConfig
13
from celery.utils.log import get_task_logger
14
15
import pyUSIrest.client
16
17
from django.conf import settings
18
from django.db.models import Count
19
from django.utils import timezone
20
21
from image.celery import app as celery_app, MyTask
22
from image_app.helpers import parse_image_alias, get_model_object
23
from image_app.models import Submission
24
from common.tasks import redis_lock
25
from common.constants import (
26
    ERROR, NEED_REVISION, SUBMITTED, COMPLETED)
27
from submissions.helpers import send_message
28
29
from ..helpers import get_manager_auth
30
from ..models import Submission as USISubmission
31
from .submission import TaskFailureMixin
32
33
# Get an instance of a logger
34
logger = get_task_logger(__name__)
35
36
# define a decouple config object
37
settings_dir = os.path.join(settings.BASE_DIR, 'image')
38
config = AutoConfig(search_path=settings_dir)
39
40
# a threshold of days to determine a very long task
41
MAX_DAYS = 5
42
43
44
# HINT: how this class could be similar to SubmissionHelper?
45
class FetchStatusHelper():
46
    """Helper class to deal with submission data"""
47
48
    # define my class attributes
49
    def __init__(self, usi_submission):
50
        """
51
        Helper function to have info for a biosample.models.Submission
52
53
        Args:
54
            usi_submission (biosample.models.Submission): a biosample
55
                model Submission instance
56
        """
57
58
        # ok those are my default class attributes
59
        self.usi_submission = usi_submission
60
        self.uid_submission = usi_submission.uid_submission
61
62
        # here are pyUSIrest object
63
        self.auth = get_manager_auth()
64
        self.root = pyUSIrest.client.Root(self.auth)
65
66
        # here I will track the biosample submission
67
        self.submission_name = self.usi_submission.usi_submission_name
68
        self.submission = self.root.get_submission_by_name(
69
            submission_name=self.submission_name)
70
71
    def check_submission_status(self):
72
        """Check submission status, finalize submission, check errors etc"""
73
74
        # reload submission status
75
        self.usi_submission.refresh_from_db()
76
77
        if self.usi_submission.status != SUBMITTED:
78
            # someone else has taken this task and done something. Ignore!
79
            logger.warning("Ignoring submission %s current status is %s" % (
80
                self.usi_submission, self.usi_submission.get_status_display()))
81
            return
82
83
        logger.debug("Checking status for '%s'" % (
84
            self.submission_name))
85
86
        # Update submission status if completed
87
        if self.submission.status == 'Completed':
88
            # fetch biosample ids with a proper function
89
            self.complete()
90
91
        elif self.submission.status == 'Draft':
92
            # check for a long task
93
            if self.submission_has_issues():
94
                # return to the caller. I've just marked the submission with
95
                # errors and sent a mail to the user
96
                return
97
98
            # check validation. If it is ok, finalize submission
99
            status = self.submission.get_status()
100
101
            # this mean validation statuses, I want to see completed in all
102
            # samples
103
            if len(status) == 1 and 'Complete' in status:
104
                # check for errors and eventually finalize
105
                self.finalize()
106
107
            else:
108
                logger.warning(
109
                    "Biosample validation is not completed yet (%s)" %
110
                    (status))
111
112
        elif self.submission.status == 'Submitted':
113
            # check for a long task
114
            if self.submission_has_issues():
115
                # return to the caller. I've just marked the submission with
116
                # errors and sent a mail to the user
117
                return
118
119
            logger.info(
120
                "Submission '%s' is '%s'. Waiting for biosample ids" % (
121
                    self.submission_name,
122
                    self.submission.status))
123
124
            # debug submission status
125
            document = self.submission.follow_url(
126
                "processingStatusSummary", self.auth)
127
128
            logger.debug(
129
                "Current status for submission '%s' is '%s'" % (
130
                    self.submission_name, document.data))
131
132
        else:
133
            # HINT: thrown an exception?
134
            logger.warning("Unknown status '%s' for submission '%s'" % (
135
                self.submission.status,
136
                self.submission_name))
137
138
        logger.debug("Checking status for '%s' completed" % (
139
            self.submission_name))
140
141
    def submission_has_issues(self):
142
        """
143
        Check that biosample submission has not issues. For example, that
144
        it will remain in the same status for a long time
145
146
        Returns:
147
            bool: True if an issue is detected
148
        """
149
150
        logger.debug(
151
            "Check if submission '%s' remained in the same status "
152
            "for a long time" % (
153
                self.submission_name))
154
155
        if (timezone.now() - self.usi_submission.updated_at).days > MAX_DAYS:
156
            message = (
157
                "Biosample submission '%s' remained with the same status "
158
                "for more than %s days. Please report it to InjectTool "
159
                "team" % (self.submission_name, MAX_DAYS))
160
161
            self.usi_submission.status = ERROR
162
            self.usi_submission.message = message
163
            self.usi_submission.save()
164
165
            logger.error(
166
                "Errors for submission: %s" % (
167
                    self.submission_name))
168
            logger.error(message)
169
170
            return True
171
172
        else:
173
            return False
174
175
    def __sample_has_errors(self, sample, table, pk):
176
        """
177
        Helper metod to mark a (animal/sample) with its own errors. Table
178
        sould be Animal or Sample to update the approriate object. Sample
179
        is a USI sample object
180
181
        Args:
182
            sample (pyUSIrest.client.sample): a USI sample object
183
            table (str): ``Animal`` or ``Sample``, mean the table where this
184
                object should be searched
185
            pk (int): table primary key
186
        """
187
188
        # get sample/animal object relying on table name and pk
189
        sample_obj = get_model_object(table, pk)
190
191
        sample_obj.name.status = NEED_REVISION
192
        sample_obj.name.save()
193
194
        # get a USI validation result
195
        validation_result = sample.get_validation_result()
196
197
        # TODO: should I store validation_result error in validation tables?
198
        errorMessages = validation_result.errorMessages
199
200
        # return an error for each object
201
        return {str(sample_obj): errorMessages}
202
203
    def finalize(self):
204
        """Finalize a submission by closing document and send it to
205
        biosample"""
206
207
        logger.debug("Finalizing submission '%s'" % (
208
            self.submission_name))
209
210
        # get errors for a submission
211
        errors = self.submission.has_errors()
212
213
        # collect all error messages in a list
214
        messages = []
215
216
        if True in errors:
217
            # get sample with errors then update database
218
            samples = self.submission.get_samples(has_errors=True)
219
220
            for sample in samples:
221
                # derive pk and table from alias
222
                table, pk = parse_image_alias(sample.alias)
223
224
                # need to check if this sample/animals has errors or not
225
                if sample.has_errors():
226
                    logger.warning(
227
                        "%s in table %s has errors!!!" % (sample, table))
228
229
                    # mark this sample since has problems
230
                    errorMessages = self.__sample_has_errors(
231
                        sample, table, pk)
232
233
                    # append this into error messages list
234
                    messages.append(errorMessages)
235
236
                # if a sample has no errors, status will be the same
237
238
            logger.error(
239
                "Errors for submission: '%s'" % (self.submission_name))
240
            logger.error("Fix them, then finalize")
241
242
            # report error
243
            message = json.dumps(messages, indent=2)
244
245
            # Update status for biosample.models.Submission
246
            self.usi_submission.status = NEED_REVISION
247
            self.usi_submission.message = message
248
            self.usi_submission.save()
249
250
        else:
251
            # raising an exception while finalizing will result
252
            # in a failed task.
253
            # TODO: model and test exception in finalization
254
            self.submission.finalize()
255
256
    def complete(self):
257
        """Complete a submission and fetch name objects"""
258
259
        logger.debug("Completing submission '%s'" % (
260
            self.submission_name))
261
262
        for sample in self.submission.get_samples():
263
            # derive pk and table from alias
264
            table, pk = parse_image_alias(sample.alias)
265
266
            # if no accession, return without doing anything
267
            if sample.accession is None:
268
                logger.error("No accession found for sample '%s'" % (sample))
269
                logger.error("Ignoring submission '%s'" % (self.submission))
270
                return
271
272
            # get sample/animal object relying on table name and pk
273
            sample_obj = get_model_object(table, pk)
274
275
            # update statuses
276
            sample_obj.name.status = COMPLETED
277
            sample_obj.name.biosample_id = sample.accession
278
            sample_obj.name.save()
279
280
        # update submission
281
        self.usi_submission.status = COMPLETED
282
        self.usi_submission.message = "Successful submission into biosample"
283
        self.usi_submission.save()
284
285
        logger.info(
286
            "Submission %s is now completed and recorded into UID" % (
287
                self.submission))
288
289
290
class FetchStatusTask(MyTask):
291
    name = "Fetch USI status"
292
    description = """Fetch biosample using USI API"""
293
    lock_id = "FetchStatusTask"
294
295
    def run(self):
296
        """
297
        This function is called when delay is called. It will acquire a lock
298
        in redis, so those tasks are mutually exclusive
299
300
        Returns:
301
            str: success if everything is ok. Different messages if task is
302
            already running or exception is caught"""
303
304
        # debugging instance
305
        self.debug_task()
306
307
        # forcing blocking condition: Wait until a get a lock object
308
        with redis_lock(
309
                self.lock_id, blocking=False, expire=False) as acquired:
310
            if acquired:
311
                # do stuff and return something
312
                return self.fetch_status()
313
314
        message = "%s already running!" % (self.name)
315
316
        logger.warning(message)
317
318
        return message
319
320
    def fetch_status(self):
321
        """
322
        Fetch status from pending submissions. Called from
323
        :py:meth:`run`, handles exceptions from USI, select
324
        all :py:class:`Submission <image_app.models.Submission>` objects
325
        with :py:const:`SUBMITTED <common.constants.SUBMITTED>` status
326
        from :ref:`UID <The Unified Internal Database>` and call
327
        :py:meth:`fetch_queryset` with this data
328
        """
329
330
        logger.info("fetch_status started")
331
332
        # search for submission with SUBMITTED status. Other submission are
333
        # not yet finalized. This function need to be called by exclusives
334
        # tasks
335
        qs = Submission.objects.filter(status=SUBMITTED)
336
337
        # check for queryset length
338
        if qs.count() != 0:
339
            try:
340
                # fetch biosample status
341
                self.fetch_queryset(qs)
342
343
            # retry a task under errors
344
            # http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
345
            except ConnectionError as exc:
346
                raise self.retry(exc=exc)
347
348
        else:
349
            logger.debug("No pending submission in UID database")
350
351
        # debug
352
        logger.info("fetch_status completed")
353
354
        return "success"
355
356
    # a function to retrieve biosample submission
357
    def fetch_queryset(self, queryset):
358
        """Fetch biosample against a queryset (a list of
359
        :py:const:`SUBMITTED <common.constants.SUBMITTED>`
360
        :py:class:`Submission <image_app.models.Submission>` objects). Iterate
361
        through submission to get USI info. Calls
362
        :py:class:`FetchStatusHelper`
363
        """
364
365
        logger.info("Searching for submissions into biosample")
366
367
        for uid_submission in queryset:
368
            usi_submissions = USISubmission.objects.filter(
369
                uid_submission=uid_submission,
370
                status=SUBMITTED)
371
372
            # HINT: fetch statuses using tasks?
373
            for usi_submission in usi_submissions:
374
                status_helper = FetchStatusHelper(usi_submission)
375
                status_helper.check_submission_status()
376
377
            # set the final status for a submission like SubmissionCompleteTask
378
            retrievalcomplete = RetrievalCompleteTask()
379
380
            # assign kwargs to chord
381
            res = retrievalcomplete.delay(uid_submission_id=uid_submission.id)
382
383
            logger.info(
384
                "Start RetrievalCompleteTask process for %s with task %s" % (
385
                    uid_submission,
386
                    res.task_id))
387
388
        logger.info("fetch_queryset completed")
389
390
391
class RetrievalCompleteTask(TaskFailureMixin, MyTask):
392
    """Update submission status after fetching status"""
393
394
    name = "Complete Retrieval Process"
395
    description = """Check submission status after retrieval nd update stuff"""
396
397
    def run(self, *args, **kwargs):
398
        """Fetch submission data and then update UID submission status"""
399
400
        # get UID submission
401
        uid_submission = Submission.objects.get(
402
            pk=kwargs['uid_submission_id'])
403
404
        # fetch data from database
405
        submission_qs = USISubmission.objects.filter(
406
            uid_submission=uid_submission)
407
408
        # annotate biosample submission by statuses
409
        statuses = {}
410
411
        for res in submission_qs.values('status').annotate(
412
                count=Count('status')):
413
            statuses[res['status']] = res['count']
414
415
        if SUBMITTED in statuses:
416
            # ignoring the other models. No errors thrown until there is
417
            # as SUBMITTED USISubmission
418
            logger.info("Submission %s not yet finished" % uid_submission)
419
420
            return "success"
421
422
        # if there is ANY errors in biosample.models.Submission for a
423
        # particoular submission, I will mark it as ERROR
424
        elif ERROR in statuses:
425
            # submission failed
426
            logger.info("Submission %s failed" % uid_submission)
427
428
            self.__update_message(uid_submission, submission_qs, ERROR)
429
430
            # send a mail to the user
431
            uid_submission.owner.email_user(
432
                "Error in biosample submission %s" % (
433
                    uid_submission.id),
434
                ("Something goes wrong with biosample submission. Please "
435
                 "report this to InjectTool team\n\n"
436
                 "%s" % uid_submission.message),
437
            )
438
439
        # check if submission need revision
440
        elif NEED_REVISION in statuses:
441
            # submission failed
442
            logger.info("Submission %s failed" % uid_submission)
443
444
            self.__update_message(uid_submission, submission_qs, NEED_REVISION)
445
446
            # send a mail to the user
447
            uid_submission.owner.email_user(
448
                "Error in biosample submission %s" % (
449
                    uid_submission.id),
450
                "Some items needs revision:\n\n" + uid_submission.message,
451
            )
452
453
        elif COMPLETED in statuses and len(statuses) == 1:
454
            # if all status are complete, the submission is completed
455
            logger.info(
456
                "Submission %s completed with success" % uid_submission)
457
458
            self.__update_message(uid_submission, submission_qs, COMPLETED)
459
460
        # send async message
461
        send_message(uid_submission)
462
463
        return "success"
464
465
    def __update_message(self, uid_submission, submission_qs, status):
466
        """Read biosample.models.Submission message and set
467
        image_app.models.Submission message"""
468
469
        # get error messages for submission
470
        message = []
471
472
        for submission in submission_qs.filter(status=status):
473
            message.append(submission.message)
474
475
        uid_submission.status = status
476
        uid_submission.message = "\n".join(set(message))
477
        uid_submission.save()
478
479
480
# register explicitly tasks
481
# https://github.com/celery/celery/issues/3744#issuecomment-271366923
482
celery_app.tasks.register(FetchStatusTask)
483
celery_app.tasks.register(RetrievalCompleteTask)
484