Completed
Push — master ( 2c16e2...2c16e2 )
by Paolo
13s queued 11s
created

common.tests.test_tasks   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 44
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 4
eloc 26
dl 0
loc 44
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A ExclusiveTasktest.test_delay_blocking() 0 7 1
A TestClearSession.test_clearsession() 0 6 1
A ExclusiveTasktest.setUp() 0 4 1
A ExclusiveTasktest.test_delay() 0 4 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Mon Oct  7 12:37:21 2019
5
6
@author: Paolo Cozzi <[email protected]>
7
"""
8
9
from unittest.mock import patch
10
11
from django.test import TestCase
12
13
14
from ..tasks import clearsessions, ExclusiveTask
15
16
17
class TestClearSession(TestCase):
18
    @patch('django.core.management.call_command')
19
    def test_clearsession(self, my_patch):
20
        result = clearsessions.run()
21
22
        self.assertEqual(result, "Session cleaned with success")
23
        self.assertTrue(my_patch.called)
24
25
26
class ExclusiveTasktest(TestCase):
27
    def setUp(self):
28
        self.my_task = ExclusiveTask()
29
        self.my_task.name = "test"
30
        self.my_task.lock_id = "test"
31
32
    @patch('common.tasks.ExclusiveTask.apply_async', return_value=True)
33
    def test_delay(self, my_func):
34
        self.assertTrue(self.my_task.delay())
35
        self.assertTrue(my_func.called)
36
37
    @patch("redis.lock.Lock.acquire", return_value=False)
38
    @patch('common.tasks.ExclusiveTask.apply_async', return_value=True)
39
    def test_delay_blocking(self, my_func, my_lock):
40
        result = self.my_task.delay()
41
        self.assertEqual("test already running!", result)
42
        self.assertFalse(my_func.called)
43
        self.assertTrue(my_lock.called)
44