EnsemblRest.call_api_func()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 84
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 47
dl 0
loc 84
rs 6.4012
c 0
b 0
f 0
cc 9
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import json
2
import logging
3
import re
4
import time
5
from typing import Any
6
7
import requests
8
from requests import Response
9
from requests.structures import CaseInsensitiveDict
10
11
# import ensemblrest modules
12
from .ensembl_config import (
13
    ensembl_api_table,
14
    ensembl_content_type,
15
    ensembl_default_url,
16
    ensembl_header,
17
    ensembl_http_status_codes,
18
    ensembl_known_errors,
19
    ensembl_user_agent,
20
)
21
from .exceptions import (
22
    EnsemblRestError,
23
    EnsemblRestRateLimitError,
24
    EnsemblRestServiceUnavailable,
25
)
26
27
# Logger instance
28
logger = logging.getLogger(__name__)
29
30
31
# FakeResponse object
32
class FakeResponse(object):
33
    def __init__(
34
        self,
35
        headers: CaseInsensitiveDict[str] | dict[str, Any],
36
        status_code: int,
37
        text: str,
38
    ):
39
        self.headers = headers
40
        self.status_code = status_code
41
        self.text: str = text
42
43
44
# EnsEMBL REST API object
45
class EnsemblRest(object):
46
    # class initialisation function
47
    def __init__(
48
        self, api_table: dict[str, Any] = ensembl_api_table, **kwargs: dict[str, Any]
49
    ) -> None:
50
        # read args variable into object as session_args
51
        self.session_args: dict[str, Any] = kwargs or {}
52
53
        # In order to rate limit the requests, like https://github.com/Ensembl/ensembl-rest/wiki/Example-Python-Client
54
        self.reqs_per_sec: int = 15
55
        self.req_count: int = 0
56
        self.last_req: float = 0
57
        self.wall_time: int = 1
58
59
        # get rate limit parameters, if provided
60
        self.rate_reset: int | None = None
61
        self.rate_limit: int | None = None
62
        self.rate_remaining: int | None = None
63
        self.rate_period: int | None = None
64
        self.retry_after: float | None = None
65
66
        # to record the last parameters used (in order to redo the query with an ensembl known error)
67
        self.last_url: str = ""
68
        self.last_headers: CaseInsensitiveDict[str] | dict[str, Any] = {}
69
        self.last_params: dict[str, Any] = {}
70
        self.last_data: dict[Any, Any] = {}
71
        self.last_method: str = ""
72
        self.last_attempt: int = 0
73
        self.last_response: Response | FakeResponse = Response()
74
75
        # the maximum number of attempts
76
        self.max_attempts: int = 5
77
78
        # setting a timeout
79
        self.timeout: int = 60
80
81
        # set default values if those values are not provided
82
        self.__set_default()
83
84
        # setup requests session
85
        self.session = requests.Session()
86
87
        # update headers
88
        self.__update_headers()
89
90
        # add class methods relying api_table
91
        self.__add_methods(api_table)
92
93
    def __set_default(self) -> None:
94
        """Set default values"""
95
96
        # initialise default values
97
        default_base_url = ensembl_default_url
98
        default_headers = ensembl_header
99
        default_content_type = ensembl_content_type
100
        default_proxies: dict[str, str] = {}
101
102
        if "base_url" not in self.session_args:
103
            self.session_args["base_url"] = default_base_url
104
105
        if "headers" not in self.session_args:
106
            self.session_args["headers"] = default_headers
107
108
        if "User-Agent" not in self.session_args["headers"]:
109
            self.session_args["headers"].update(default_headers)
110
111
        if "Content-Type" not in self.session_args["headers"]:
112
            self.session_args["headers"]["Content-Type"] = default_content_type
113
114
        if "proxies" not in self.session_args:
115
            self.session_args["proxies"] = default_proxies
116
117
    def __update_headers(self) -> None:
118
        """Update headers"""
119
120
        # update requests client with arguments
121
        client_args_copy = self.session_args.copy()
122
        for key, val in client_args_copy.items():
123
            if key in ("base_url", "proxies"):
124
                setattr(self.session, key, val)
125
                self.session_args.pop(key)
126
127
        # update headers as already exist within client
128
        self.session.headers.update(self.session_args.pop("headers"))
129
130
    def __add_methods(self, api_table: dict[str, Any]) -> None:
131
        """Add methods to class object"""
132
133
        # iterate over api_table keys and add key to class namespace
134
        for fun_name in api_table.keys():
135
            # setattr(self, key, self.register_api_func(key))
136
            # Not as a class attribute, but a class method
137
            self.__dict__[fun_name] = self.register_api_func(fun_name, api_table)
138
139
            # Set __doc__ for generic class method
140
            if "doc" in api_table[fun_name]:
141
                self.__dict__[fun_name].__doc__ = api_table[fun_name]["doc"]
142
143
            # add function name to the class methods
144
            self.__dict__[fun_name].__name__ = fun_name
145
146
    # dynamic api registration function
147
    def register_api_func(self, api_call: str, api_table: dict[str, Any]) -> Any:
148
        return lambda **kwargs: self.call_api_func(api_call, api_table, **kwargs)
149
150
    @staticmethod
151
    def __check_params(func: Any, kwargs: Any) -> list[Any]:
152
        """Check for mandatory parameters"""
153
154
        # Verify required variables and raise an Exception if needed
155
        mandatory_params = re.findall(r"\{\{(?P<m>[a-zA-Z1-9_]+)\}\}", func["url"])
156
157
        for param in mandatory_params:
158
            if param not in kwargs:
159
                logger.critical(
160
                    "'%s' param not specified. Mandatory params are %s"
161
                    % (param, mandatory_params)
162
                )
163
                raise Exception("mandatory param '%s' not specified" % param)
164
            else:
165
                logger.debug("Mandatory param %s found" % param)
166
167
        return mandatory_params
168
169
    # dynamic api call function
170
    def call_api_func(
171
        self, api_call: str, api_table: dict[str, Any], **kwargs: dict[str, Any]
172
    ) -> Any:
173
        # build url from api_table kwargs
174
        func = api_table[api_call]
175
176
        # check mandatory params
177
        mandatory_params = self.__check_params(func, kwargs)
178
179
        # resolving urls
180
        url = re.sub(
181
            r"\{\{(?P<m>[a-zA-Z1-9_]+)\}\}",
182
            lambda m: "%s" % kwargs.get(m.group(1)),
183
            self.session.base_url + func["url"],  # type: ignore[attr-defined]
184
        )
185
186
        # debug
187
        logger.debug("Resolved url: '%s'" % url)
188
189
        # Now I have to remove mandatory params from kwargs
190
        for param in mandatory_params:
191
            del kwargs[param]
192
193
        # Initialize with the ensembl default content type
194
        content_type: str | dict[str, Any] = ensembl_content_type
195
196
        # Override content type if it is defined by function
197
        if "content_type" in func:
198
            content_type = func["content_type"]
199
200
        # Ovveride content type if it is provied when calling function
201
        if "content_type" in kwargs:
202
            content_type = kwargs["content_type"]
203
            del kwargs["content_type"]
204
205
        # check the request type (GET or POST?)
206
        if func["method"] == "GET":
207
            logger.debug(
208
                "Submitting a GET request: url = '%s', headers = %s, params = %s"
209
                % (url, {"Content-Type": content_type}, kwargs)
210
            )
211
212
            # record this request
213
            self.last_url = url
214
            self.last_headers = {"Content-Type": content_type}
215
            self.last_params = kwargs
216
            self.last_data = {}
217
            self.last_method = "GET"
218
            self.last_attempt = 0
219
220
            resp = self.__get_response()
221
222
        elif func["method"] == "POST":
223
            # in a POST request, separate post parameters from other parameters
224
            data = {}
225
226
            # pass key=value in POST data from kwargs
227
            for key in func["post_parameters"]:
228
                if key in kwargs:
229
                    data[key] = kwargs[key]
230
                    del kwargs[key]
231
232
            logger.debug(
233
                "Submitting a POST request: url = '%s', headers = %s, params = %s, data = %s"
234
                % (url, {"Content-Type": content_type}, kwargs, data)
235
            )
236
237
            # record this request
238
            self.last_url = url
239
            self.last_headers = {"Content-Type": content_type}
240
            self.last_params = kwargs
241
            self.last_data = data
242
            self.last_method = "POST"
243
            self.last_attempt = 0
244
245
            resp = self.__get_response()
246
247
        else:
248
            raise NotImplementedError(
249
                "Method '%s' not yet implemented" % (func["method"])
250
            )
251
252
        # call response and return content
253
        return self.parseResponse(resp, content_type)
254
255
    # A function to get reponse from ensembl REST api
256
    def __get_response(self) -> Response | FakeResponse:
257
        """Call session get and post method. Return response"""
258
259
        # updating last_req time
260
        self.last_req = time.time()
261
262
        # Increment the request counter to rate limit requests
263
        self.req_count += 1
264
265
        # Evaluating the numer of request in a second (according to EnsEMBL rest specification)
266
        if self.req_count >= self.reqs_per_sec:
267
            delta = time.time() - self.last_req
268
269
            # sleep upto wall_time
270
            if delta < self.wall_time:
271
                to_sleep = self.wall_time - delta
272
                logger.debug("waiting %s" % to_sleep)
273
                time.sleep(to_sleep)
274
275
            self.req_count = 0
276
277
        # my response
278
        resp: Response | FakeResponse = Response()
279
280
        # deal with exceptions
281
        try:
282
            # another request using the correct method
283
            if self.last_method == "GET":
284
                resp = self.session.get(
285
                    self.last_url,
286
                    headers=self.last_headers,
287
                    params=self.last_params,
288
                    timeout=self.timeout,
289
                )
290
            elif self.last_method == "POST":
291
                # post parameters are load as POST data, other parameters are url parameters as GET requests
292
                resp = self.session.post(
293
                    self.last_url,
294
                    headers=self.last_headers,
295
                    data=json.dumps(self.last_data),
296
                    params=self.last_params,
297
                    timeout=self.timeout,
298
                )
299
            # other methods are verifiedby others functions
300
301
        except requests.ConnectionError as e:
302
            raise EnsemblRestServiceUnavailable(e)
303
304
        except requests.Timeout as e:
305
            logger.error("%s request timeout: %s" % (self.last_method, e))
306
307
            # create a fake response in order to redo the query
308
            resp = FakeResponse(
309
                headers=self.last_response.headers,
310
                status_code=400,
311
                text=json.dumps(
312
                    {"message": repr(e), "error": "%s timeout" % ensembl_user_agent}
313
                ),
314
            )
315
316
        # return response
317
        return resp
318
319
    # A function to deal with a generic response
320
    def parseResponse(
321
        self,
322
        resp: Response | FakeResponse,
323
        content_type: str | dict[str, Any] = "application/json",
324
    ) -> Any:
325
        """Deal with a generic REST response"""
326
327
        logger.debug("Got %s" % resp.text)
328
329
        # Record response for debug intent
330
        self.last_response = resp
331
332
        # Initialize some values. Check if I'm rate limited
333
        (
334
            self.rate_reset,
335
            self.rate_limit,
336
            self.rate_remaining,
337
            self.retry_after,
338
            self.rate_period,
339
        ) = self.__get_rate_limit(resp.headers)
340
341
        # parse status code
342
        if self.__check_retry(resp):
343
            return self.__retry_request()
344
345
        # Handle content in different way relying on content-type
346
        if content_type == "application/json":
347
            content = json.loads(resp.text)
348
        else:
349
            # Default
350
            content = resp.text
351
352
        return content
353
354
    def __check_retry(self, resp: Response | FakeResponse) -> bool:
355
        """Parse status code and print warnings. Return True if a retry is needed"""
356
357
        # default status code
358
        message = ensembl_http_status_codes[resp.status_code][1]
359
360
        # parse status codes
361
        if resp.status_code > 304:
362
            ExceptionType = EnsemblRestError
363
364
            # Try to derive a more useful message than ensembl default message
365
            if resp.status_code == 400:
366
                json_message = json.loads(resp.text)
367
                if "error" in json_message:
368
                    message = json_message["error"]
369
370
                # TODO: deal with special cases errors
371
                if message in ensembl_known_errors:
372
                    # call a function that will re-execute the REST request and then call again parseResponse
373
                    # if everithing is ok, a processed content is returned
374
                    logger.warning("EnsEMBL REST Service returned: %s" % message)
375
376
                    # return true if retry needed
377
                    return True
378
            elif resp.status_code == 500:
379
                # Retrying when we get a 500 error.
380
                # Due to Ensembl's condition on randomly returning 500s on valid requests.
381
                return True
382
            elif resp.status_code == 429:
383
                ExceptionType = EnsemblRestRateLimitError
384
385
            raise ExceptionType(
386
                message,
387
                error_code=resp.status_code,
388
                rate_reset=self.rate_reset,
389
                rate_limit=self.rate_limit,
390
                rate_remaining=self.rate_remaining,
391
                retry_after=self.retry_after,
392
            )
393
394
        # return a flag if status is ok
395
        return False
396
397
    @staticmethod
398
    def __get_rate_limit(
399
        headers: CaseInsensitiveDict[str] | dict[str, Any],
400
    ) -> tuple[int | None, int | None, int | None, float | None, int | None]:
401
        """Read rate limited attributes"""
402
403
        # initialize some values
404
        retry_after = None
405
        rate_reset = None
406
        rate_limit = None
407
        rate_remaining = None
408
        rate_period = None
409
410
        # for semplicity
411
        keys = [key.lower() for key in headers.keys()]
412
413
        if "X-RateLimit-Reset".lower() in keys:
414
            rate_reset = int(headers["X-RateLimit-Reset"])
415
            logger.debug("X-RateLimit-Reset: %s" % rate_reset)
416
417
        if "X-RateLimit-Period".lower() in keys:
418
            rate_period = int(headers["X-RateLimit-Period"])
419
            logger.debug("X-RateLimit-Period: %s" % rate_period)
420
421
        if "X-RateLimit-Limit".lower() in keys:
422
            rate_limit = int(headers["X-RateLimit-Limit"])
423
            logger.debug("X-RateLimit-Limit: %s" % rate_limit)
424
425
        if "X-RateLimit-Remaining".lower() in keys:
426
            rate_remaining = int(headers["X-RateLimit-Remaining"])
427
            logger.debug("X-RateLimit-Remaining: %s" % rate_remaining)
428
429
        if "Retry-After".lower() in keys:
430
            retry_after = float(headers["Retry-After"])
431
            logger.debug("Retry-After: %s" % retry_after)
432
433
        return rate_reset, rate_limit, rate_remaining, retry_after, rate_period
434
435
    def __retry_request(self) -> Any:
436
        """Retry last request in case of failure"""
437
438
        # update last attempt
439
        self.last_attempt += 1
440
441
        # a max of three attempts
442
        if self.last_attempt > self.max_attempts:
443
            # default status code
444
            message = ensembl_http_status_codes[self.last_response.status_code][1]
445
446
            # parse error if possible
447
            try:
448
                json_message = json.loads(self.last_response.text)
449
                if "error" in json_message:
450
                    message = json_message["error"]
451
            except ValueError:
452
                # In this case we didn't even get a JSON back.
453
                message = "Server returned invalid JSON."
454
455
            raise EnsemblRestError(
456
                "Max number of retries attempts reached. Last message was: %s"
457
                % message,
458
                error_code=self.last_response.status_code,
459
                rate_reset=self.rate_reset,
460
                rate_limit=self.rate_limit,
461
                rate_remaining=self.rate_remaining,
462
                retry_after=self.retry_after,
463
            )
464
465
        # sleep a while. Increment on each attempt
466
        to_sleep = (self.wall_time + 1) * self.last_attempt
467
468
        logger.debug("Sleeping %s" % to_sleep)
469
        time.sleep(to_sleep)
470
471
        # another request using the correct method
472
        if self.last_method == "GET":
473
            # debug
474
            logger.debug(
475
                "Retring last GET request (%s/%s): url = '%s', headers = %s, params = %s"
476
                % (
477
                    self.last_attempt,
478
                    self.max_attempts,
479
                    self.last_url,
480
                    self.last_headers,
481
                    self.last_params,
482
                )
483
            )
484
485
            resp = self.__get_response()
486
487
        elif self.last_method == "POST":
488
            # debug
489
            logger.debug(
490
                "Retring last POST request (%s/%s): url = '%s', headers = %s, params = %s, data = %s"
491
                % (
492
                    self.last_attempt,
493
                    self.max_attempts,
494
                    self.last_url,
495
                    self.last_headers,
496
                    self.last_params,
497
                    self.last_data,
498
                )
499
            )
500
501
            resp = self.__get_response()
502
        else:
503
            raise NotImplementedError(
504
                "Method '%s' not yet implemented" % (self.last_method)
505
            )
506
507
        # call response and return content
508
        return self.parseResponse(resp, self.last_headers["Content-Type"])
509
510
    def get_user_agent(self) -> str:
511
        """Return the pyEnsemblRest user agent"""
512
        return ensembl_user_agent
513