kuon.watcher.notifications.telegram   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 122
Duplicated Lines 0 %

Test Coverage

Coverage 21.43%

Importance

Changes 0
Metric Value
wmc 17
eloc 63
dl 0
loc 122
ccs 12
cts 56
cp 0.2143
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A Telegram.__init__() 0 13 3
A Telegram.get_me() 0 9 1
A Telegram.get_last_chat_id_and_text() 0 14 2
A Telegram.get_updates() 0 26 5
B Telegram.send_message() 0 35 6
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3 1
from typing import Union
4
5 1
import requests
6
7 1
from kuon.api_response import APIResponse
8 1
from kuon.watcher.settings import Settings
9
10
11 1
class InvalidApiSettingsException(Exception):
12 1
    pass
13
14
15 1
class Telegram(object):
16
    """
17
    Class to notify the user via telegram bot.
18
    The telegram API documentation can be found here: https://core.telegram.org/bots/api
19
    """
20
21 1
    def __init__(self) -> None:
22
        """Initializing function"""
23
        if not Settings.Notification.Telegram.token:
24
            raise InvalidApiSettingsException('You need an API token to use the Telegram API')
25
26
        self._token = Settings.Notification.Telegram.token
27
        self.api_url = "https://api.telegram.org/bot{0:s}".format(self._token)
28
29
        bot_profile = self.get_me()
30
        if not bot_profile.ok:
31
            raise InvalidApiSettingsException('The provided API key is invalid')
32
33
        self._chat_id = Settings.Notification.Telegram.chat_id
34
35 1
    def get_me(self) -> APIResponse:
36
        """getMe implementation
37
        https://core.telegram.org/bots/api#getme
38
39
        :return:
40
        """
41
        api_url = "{0:s}/getMe".format(self.api_url)
42
        link = requests.get(api_url)
43
        return APIResponse(link.text)
44
45 1
    def get_updates(self, offset: int = None, limit: int = None, timeout: int = None,
46
                    allowed_updates: Union[list, tuple] = None) -> APIResponse:
47
        """getUpdates implementation
48
        https://core.telegram.org/bots/api#getupdates
49
50
        :type offset: int
51
        :type limit: int
52
        :type timeout: int
53
        :type allowed_updates: Union[list, tuple]
54
        :return:
55
        """
56
        api_url = "{0:s}/getUpdates".format(self.api_url)
57
58
        payload = {}
59
60
        if offset:
61
            payload['offset'] = offset
62
        if limit:
63
            payload['limit'] = limit
64
        if timeout:
65
            payload['timeout'] = timeout
66
        if allowed_updates:
67
            payload['allowed_updates'] = allowed_updates
68
69
        link = requests.get(api_url)
70
        return APIResponse(link.text)
71
72 1
    def send_message(self, text: str, chat_id: int, parse_mode: str = None, disable_web_page_preview: bool = None,
73
                     disable_notification: bool = False, reply_to_message_id: int = None,
74
                     reply_markup: Union[list, tuple] = None) -> APIResponse:
75
        """sendMessage implementation
76
        https://core.telegram.org/bots/api#sendmessage
77
78
        :type text: str
79
        :type chat_id: int
80
        :type parse_mode: str
81
        :type disable_web_page_preview: bool
82
        :type disable_notification: bool
83
        :type reply_to_message_id: int
84
        :type reply_markup: Union[list, tuple]
85
        :return:
86
        """
87
        api_url = "{0:s}/sendMessage".format(self.api_url)
88
89
        payload = {
90
            'text': text,
91
            'chat_id': chat_id
92
        }
93
94
        if parse_mode:
95
            payload['parse_mode'] = parse_mode
96
        if disable_web_page_preview:
97
            payload['disable_web_page_preview'] = disable_web_page_preview
98
        if disable_notification:
99
            payload['disable_notification'] = disable_notification
100
        if reply_to_message_id:
101
            payload['reply_to_message_id'] = reply_to_message_id
102
        if reply_markup:
103
            payload['reply_markup'] = reply_markup
104
105
        link = requests.get(api_url, params=payload)
106
        return APIResponse(link.text)
107
108 1
    def get_last_chat_id_and_text(self) -> [int, str]:
109
        """Retrieve the last message and chat id
110
111
        :return:
112
        """
113
        updates = self.get_updates()
114
        if not updates.result:
115
            # Bot didn't receive any new messages
116
            return None, ""
117
118
        last_update = len(updates.result) - 1
119
        text = updates.result[last_update].message.text
120
        chat_id = updates.result[last_update].message.chat.id
121
        return chat_id, text
122