Passed
Push — master ( 227024...1d0698 )
by Oleksandr
02:44
created

on_state_change()   C

Complexity

Conditions 8

Size

Total Lines 57
Code Lines 45

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 45
dl 0
loc 57
rs 6.9333
c 0
b 0
f 0
cc 8
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import logging
2
import sys
3
from tabpy.tabpy_server.app.SettingsParameters import SettingsParameters
4
from tabpy.tabpy_server.common.messages\
5
    import (LoadObject, DeleteObjects, ListObjects, ObjectList)
6
from tabpy.tabpy_server.common.endpoint_file_mgr import cleanup_endpoint_files
7
from tabpy.tabpy_server.common.util import format_exception
8
from tabpy.tabpy_server.management.state\
9
    import TabPyState, get_query_object_path
10
from tabpy.tabpy_server.management import util
11
from time import sleep
12
from tornado import gen
13
14
15
logger = logging.getLogger(__name__)
16
17
18
def wait_for_endpoint_loaded(python_service, object_uri):
19
    '''
20
    This method waits for the object to be loaded.
21
    '''
22
    logger.info('Waiting for object to be loaded...')
23
    while True:
24
        msg = ListObjects()
25
        list_object_msg = python_service.manage_request(msg)
26
        if not isinstance(list_object_msg, ObjectList):
27
            logger.error(
28
                f'Error loading endpoint {object_uri}: {list_object_msg}')
29
            return
30
31
        for (uri, info) in (list_object_msg.objects.items()):
32
            if uri == object_uri:
33
                if info['status'] != 'LoadInProgress':
34
                    logger.info(f'Object load status: {info["status"]}')
35
                    return
36
37
        sleep(0.1)
38
39
40
@gen.coroutine
41
def init_ps_server(settings, tabpy_state):
42
    logger.info("Initializing TabPy Server...")
43
    existing_pos = tabpy_state.get_endpoints()
44
    for (object_name, obj_info) in (existing_pos.items()):
45
        try:
46
            object_version = obj_info['version']
47
            get_query_object_path(
48
                settings[SettingsParameters.StateFilePath],
49
                object_name, object_version)
50
        except Exception as e:
51
            logger.error(
52
                f'Exception encounted when downloading object: {object_name}'
53
                f', error: {e}')
54
55
56
@gen.coroutine
57
def init_model_evaluator(settings, tabpy_state, python_service):
58
    '''
59
    This will go through all models that the service currently have and
60
    initialize them.
61
    '''
62
    logger.info("Initializing models...")
63
64
    existing_pos = tabpy_state.get_endpoints()
65
66
    for (object_name, obj_info) in (existing_pos.items()):
67
        object_version = obj_info['version']
68
        object_type = obj_info['type']
69
        object_path = get_query_object_path(
70
            settings[SettingsParameters.StateFilePath],
71
            object_name, object_version)
72
73
        logger.info(
74
            f'Load endpoint: {object_name}, '
75
            f'version: {object_version}, '
76
            f'type: {object_type}')
77
        if object_type == 'alias':
78
            msg = LoadObject(object_name, obj_info['target'],
79
                             object_version, False, 'alias')
80
        else:
81
            local_path = object_path
82
            msg = LoadObject(object_name, local_path, object_version,
83
                             False, object_type)
84
        python_service.manage_request(msg)
85
86
87
def _get_latest_service_state(settings,
88
                              tabpy_state,
89
                              new_ps_state,
90
                              python_service):
91
    '''
92
    Update the endpoints from the latest remote state file.
93
94
    Returns
95
    --------
96
    (has_changes, endpoint_diff):
97
        has_changes: True or False
98
        endpoint_diff: Summary of what has changed, one entry for each changes
99
    '''
100
    # Shortcut when nothing is changed
101
    changes = {'endpoints': {}}
102
103
    # update endpoints
104
    new_endpoints = new_ps_state.get_endpoints()
105
    diff = {}
106
    current_endpoints = python_service.ps.query_objects
107
    for (endpoint_name, endpoint_info) in new_endpoints.items():
108
        existing_endpoint = current_endpoints.get(endpoint_name)
109
        if (existing_endpoint is None) or \
110
                endpoint_info['version'] != existing_endpoint['version']:
111
            # Either a new endpoint or new endpoint version
112
            path_to_new_version = get_query_object_path(
113
                settings[SettingsParameters.StateFilePath],
114
                endpoint_name, endpoint_info['version'])
115
            endpoint_type = endpoint_info.get('type', 'model')
116
            diff[endpoint_name] = (endpoint_type, endpoint_info['version'],
117
                                   path_to_new_version)
118
119
    # add removed models too
120
    for (endpoint_name, endpoint_info) in current_endpoints.items():
121
        if endpoint_name not in new_endpoints.keys():
122
            endpoint_type = current_endpoints[endpoint_name].get(
123
                'type', 'model')
124
            diff[endpoint_name] = (endpoint_type, None, None)
125
126
    if diff:
127
        changes['endpoints'] = diff
128
129
    tabpy_state = new_ps_state
130
    return (True, changes)
131
132
133
@gen.coroutine
134
def on_state_change(settings, tabpy_state, python_service,
135
                    logger=logging.getLogger(__name__)):
136
    try:
137
        logger.log(logging.INFO, "Loading state from state file")
138
        config = util._get_state_from_file(
139
            settings[SettingsParameters.StateFilePath],
140
            logger=logger)
141
        new_ps_state = TabPyState(config=config, settings=settings)
142
143
        (has_changes, changes) = _get_latest_service_state(settings,
144
                                                           tabpy_state,
145
                                                           new_ps_state,
146
                                                           python_service)
147
        if not has_changes:
148
            logger.info("Nothing changed, return.")
149
            return
150
151
        new_endpoints = new_ps_state.get_endpoints()
152
        for object_name in changes['endpoints']:
153
            (object_type, object_version, object_path) = changes['endpoints'][
154
                object_name]
155
156
            if not object_path and not object_version:  # removal
157
                logger.info(f'Removing object: URI={object_name}')
158
159
                python_service.manage_request(DeleteObjects([object_name]))
160
161
                cleanup_endpoint_files(object_name,
162
                                       settings[SettingsParameters.UploadDir],
163
                                       logger=logger)
164
165
            else:
166
                endpoint_info = new_endpoints[object_name]
167
                is_update = object_version > 1
168
                if object_type == 'alias':
169
                    msg = LoadObject(object_name, endpoint_info['target'],
170
                                     object_version, is_update, 'alias')
171
                else:
172
                    local_path = object_path
173
                    msg = LoadObject(object_name, local_path, object_version,
174
                                     is_update, object_type)
175
176
                python_service.manage_request(msg)
177
                wait_for_endpoint_loaded(python_service, object_name)
178
179
                # cleanup old version of endpoint files
180
                if object_version > 2:
181
                    cleanup_endpoint_files(
182
                        object_name, settings[SettingsParameters.UploadDir],
183
                        logger=logger,
184
                        retain_versions=[object_version, object_version - 1])
185
186
    except Exception as e:
187
        err_msg = format_exception(e, 'on_state_change')
188
        logger.log(logging.ERROR,
189
                   f'Error submitting update model request: error={err_msg}')
190