DownloadTask.__init__()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 15
rs 9.4285
1
import temba_client.v1
2
3
import rapidpropull.cache
4
5
__author__ = 'Tomasz J. Kotarba <[email protected]>'
6
__copyright__ = 'Copyright (c) 2016, Tomasz J. Kotarba. All rights reserved.'
7
__maintainer__ = 'Tomasz J. Kotarba'
8
__email__ = '[email protected]'
9
10
11
class DownloadTask(object):
12
    """
13
    Provides a mechanism for querying RapidPro servers and pulling data from
14
    them.  The downloaded data can be represented as RapidPro objects (see:
15
    rapidpro-python) or serialised to JSON.
16
    """
17
    def __init__(self, processed_arguments):
18
        """Create a download task for the specified ArgumentProcessor."""
19
        self.client = temba_client.v1.TembaClient(
20
            processed_arguments.get_address(),
21
            processed_arguments.get_api_token())
22
        self.endpoint_selector = processed_arguments.get_endpoint_selector()
23
        self.endpoint_kwargs = processed_arguments.get_endpoint_kwargs()
24
        self.selectors_of_requested_associations =\
25
            processed_arguments.get_selectors_of_requested_associations()
26
        cache_url = processed_arguments.get_cache_url()
27
        if cache_url is None:
28
            self.cache = None
29
        else:
30
            self.cache = rapidpropull.cache.RapidProCache(cache_url)
31
        self._downloaded_data = None
32
33
    def download(self):
34
        """
35
        Execute the download task - i.e. download and store matching objects
36
        from RapidPro.
37
38
        Stores a list of objects or a dictionary of lists in case --with-flows
39
        or --with-contacts were used.
40
41
        Example for --with-flows --with-contacts:
42
            {'runs': [run1, run2, ...],
43
             'flows': [flow1, flow2, ...],
44
             'contacts': [contact2, contact2, ...]}
45
46
        Example for --with-contacts:
47
            {'runs': [run1, ...],
48
             'contacts': [contact1, ...]}
49
50
        Example for --with-flows:
51
            {'runs': [run1, ...],
52
             'flows': [flow1, ...]}
53
        """
54
        endpoint_data = self._get_endpoint()(**self.endpoint_kwargs)
55
        if self.cache:
56
            self.cache.substitute_cached_for_downloaded(endpoint_data)
57
        if not self.selectors_of_requested_associations:
58
            self._downloaded_data = endpoint_data
59
        else:
60
            self._download_associated_data(endpoint_data)
61
        if self.cache:
62
            self.cache.insert_objects(self._downloaded_data)
63
64
    def get_downloaded_objects(self):
65
        """
66
        If any object have been downloaded, return a list or a dictionary
67
        containing all downloaded objects as instances of classes Contact, Flow
68
        or Run (see: rapidpro-python).  Otherwise, return None.
69
        """
70
        return self._downloaded_data
71
72
    def get_downloaded_json_structure(self):
73
        """
74
        Return a JSON structure (not a text string) with all downloaded objects
75
        serialised.  E.g. [{'uuid': 'object1'}, {'uuid': 'object2'}, ...].
76
        """
77
        result = None
78
        if isinstance(self._downloaded_data, list):
79
            result = []
80
            for item in self._downloaded_data:
81
                result.append(item.serialize())
82
        elif isinstance(self._downloaded_data, dict):
83
            result = {}
84
            for k in self._downloaded_data:
85
                result[k] = [o.serialize() for o in self._downloaded_data[k]]
86
        return result
87
88
    def overwrite_downloaded_data(self, data):
89
        """Overwrite stored downloaded data with the value of argument data."""
90
        self._downloaded_data = data
91
92
    def _get_endpoint(self, endpoint_selector=None):
93
        if endpoint_selector is None:
94
            endpoint_selector = self.endpoint_selector
95
        if endpoint_selector == '--flow-runs':
96
            return self.client.get_runs
97
        elif endpoint_selector == '--flows':
98
            return self.client.get_flows
99
        elif endpoint_selector == '--contacts':
100
            return self.client.get_contacts
101
        else:
102
            raise ValueError('Invalid endpoint selector "{}"'.format(
103
                endpoint_selector))
104
105
    def _download_associated_data(self, flowruns):
106
        all_data = {'runs': flowruns}
107
        for endpoint_selector in self.selectors_of_requested_associations:
108
            container_attr = endpoint_selector.lstrip('-')
109
            uuid_attr = container_attr.rstrip('s')
110
            all_data[container_attr] = []
111
            uuids = set()
112
            for run in flowruns:
113
                uuids.add(getattr(run, uuid_attr))
114
            if self.cache:
115
                from_cache, missing_uuids = self.cache.get_objects(
116
                    endpoint_selector, uuids)
117
                uuids = missing_uuids
118
                all_data[container_attr].extend(from_cache)
119
            all_data[container_attr].extend(self._get_endpoint(
120
                endpoint_selector)(uuids=uuids))
121
        self._downloaded_data = all_data
122