Passed
Pull Request — master (#54)
by
unknown
05:51
created

build.rsudp.c_telegram   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 190
Duplicated Lines 0 %

Test Coverage

Coverage 65.35%

Importance

Changes 0
Metric Value
wmc 24
eloc 105
dl 0
loc 190
ccs 66
cts 101
cp 0.6535
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A Telegrammer.getq() 0 10 2
A Telegrammer.__init__() 0 28 2
A Telegrammer.run() 0 17 5
B Telegrammer._when_alarm() 0 39 5
B Telegrammer._when_img() 0 41 8
A Telegrammer.auth() 0 6 2
1 1
import os, sys
2 1
import time
3 1
import rsudp.raspberryshake as rs
4 1
from rsudp import printM, printW, printE, helpers
5 1
from rsudp.test import TEST
6
# Import asyncio to run the new telegram bot code.
7 1
import asyncio
8
9
10
11
12 1
import telegram as tg
13
14 1
class Telegrammer(rs.ConsumerThread):
15
	'''
16
	 .. versionadded:: 0.4.2
17
18
	.. |telegram| raw:: html
19
20
		<a href="https://t.me/" target="_blank">Telegram</a>
21
22
	.. |sasmex_use| raw:: html
23
24
		<a href="https://t.me/sasmex" target="_blank">Mexican Early Warning System (SASMEX)</a>
25
26
	|telegram| is a free messaging service which,
27
	among other things, is suited to quickly broadcasting automatic
28
	notifications via an API.
29
	It is used by the |sasmex_use| and PanamaIGC.
30
31
	:param str token: bot token from Telegram bot creation
32
	:param str chat_id: Telegram chat ID number that this module will post to
33
	:param bool send_images: whether or not to send images. if False, only alerts will be sent.
34
	:type extra_text: bool or str
35
	:param extra_text: Approximately 3900 additional characters to post as part of the Telegram message (Telegram message limits are 4096 characters). Longer messages will be truncated.
36
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
37
38
	'''
39 1
	def __init__(self, token, chat_id, testing=False,
40
				 q=False, send_images=False, extra_text=False,
41
				 sender='Telegram'):
42
		"""
43
		Initializing the Telegram message posting thread.
44
45
		"""
46 1
		super().__init__()
47 1
		self.queue = q
48 1
		self.sender = sender
49 1
		self.alive = True
50 1
		self.send_images = send_images
51 1
		self.token = token
52 1
		self.chat_id = chat_id
53 1
		self.testing = testing
54 1
		self.fmt = '%Y-%m-%d %H:%M:%S.%f'
55 1
		self.region = ' - region: %s' % rs.region.title() if rs.region else ''
56
57 1
		self.extra_text = helpers.resolve_extra_text(extra_text, max_len=4096, sender=self.sender)
58
59
#		instead of initializing it here, we init the bot for each call.
60
#		self.auth()
61
62 1
		self.livelink = u'live feed ➡️ https://stationview.raspberryshake.org/#?net=%s&sta=%s' % (rs.net, rs.stn)
63 1
		self.message0 = '(Raspberry Shake station %s.%s%s) Event detected at' % (rs.net, rs.stn, self.region)
64 1
		self.last_message = False
65
66 1
		printM('Starting.', self.sender)
67
68
69 1
	def auth(self):
70 1
		if not self.testing:
71
			self.telegram = tg.Bot(token=self.token)
72
		else:
73 1
			printW('The Telegram module will not post to Telegram in Testing mode.',
74
					self.sender, announce=False)
75
76
77 1
	def getq(self):
78 1
		d = self.queue.get()
79 1
		self.queue.task_done()
80
81 1
		if 'TERM' in str(d):
82 1
			self.alive = False
83 1
			printM('Exiting.', self.sender)
84 1
			sys.exit()
85
		else:
86 1
			return d
87
88
#	Change the def to async
89 1
	async def _when_alarm(self, d):
90
		'''
91
		Send a telegram in an alert scenario.
92
93
		:param bytes d: queue message
94
		'''
95 1
		event_time = helpers.fsec(helpers.get_msg_time(d))
96 1
		self.last_event_str = '%s' % (event_time.strftime(self.fmt)[:22])
97 1
		message = '%s %s UTC%s - %s' % (self.message0, self.last_event_str, self.extra_text, self.livelink)
98 1
		response = None
99 1
		try:
100 1
			printM('Sending alert...', sender=self.sender)
101 1
			self.auth() #Bot init
102 1
			printM('Telegram message: %s' % (message), sender=self.sender)
103 1
			if not self.testing:
104
#				As a result of the bot being async code, we need to await the run.				
105
				response = await self.telegram.sendMessage(chat_id=self.chat_id, text=message)
106
				
107
			else:
108 1
				TEST['c_telegram'][1] = True
109
110
		except Exception as e:
111
			printE('Could not send alert - %s' % (e))
112
			try:
113
				printE('Waiting 5 seconds and trying to send again...', sender=self.sender, spaces=True)
114
				time.sleep(5)
115
				self.auth()
116
				printM('Telegram message: %s' % (message), sender=self.sender)
117
				if not self.testing:
118
#					As a result of the bot being async code, we need to await the run.
119
					response = await self.telegram.sendMessage(chat_id=self.chat_id, text=message)
120
121
				else:
122
					# if you are here in testing mode, there is a problem
123
					TEST['c_telegram'][1] = False
124
			except Exception as e:
125
				printE('Could not send alert - %s' % (e))
126
				response = None
127 1
		self.last_message = message
128
129
#	Change the def to async
130 1
	async def _when_img(self, d):
131
		'''
132
		Send a telegram image in when you get an ``IMGPATH`` message.
133
134
		:param bytes d: queue message
135
		'''
136 1
		if self.send_images:
137 1
			imgpath = helpers.get_msg_path(d)
138 1
			response = None
139 1
			if os.path.exists(imgpath):
140 1
				with open(imgpath, 'rb') as image:
141 1
					try:
142 1
						self.auth() #Bot init
143 1
						if not self.testing:
144
							printM('Uploading image to Telegram %s' % (imgpath), self.sender)
145
#							As a result of the bot being async code, we need to await the run.
146
							response = await self.telegram.sendPhoto(chat_id=self.chat_id, photo=image)
147
							printM('Sent image', sender=self.sender)
148
						else:
149 1
							printM('Image ready to send - %s' % (imgpath), self.sender)
150 1
							TEST['c_telegramimg'][1] = True
151
					except Exception as e:
152
						printE('Could not send image - %s' % (e))
153
						try:
154
							if not self.testing:
155
								printM('Waiting 5 seconds and trying to send again...', sender=self.sender)
156
								time.sleep(5.1)
157
								self.auth()
158
								printM('Uploading image to Telegram (2nd try) %s' % (imgpath), self.sender)
159
#								As a result of the bot being async code, we need to await the run.								
160
								response = await self.telegram.sendPhoto(chat_id=self.chat_id, photo=image)
161
								
162
								printM('Sent image', sender=self.sender)
163
							else:
164
								# if you are here in testing mode, there is a problem
165
								TEST['c_telegramimg'][1] = False
166
						except Exception as e:
167
							printE('Could not send image - %s' % (e))
168
							response = None
169
			else:
170
				printM('Could not find image: %s' % (imgpath), sender=self.sender)
171
172
173 1
	def run(self):
174
		"""
175
		Reads data from the queue and sends a message if it sees an ALARM or IMGPATH message
176
		"""
177 1
		while True:
178 1
			d = self.getq()
179
180 1
			if 'ALARM' in str(d):
181
#				Async send the message via the bot to Telegram			
182 1
				asyncio.run(self._when_alarm(d))
183
184 1
			elif 'IMGPATH' in str(d):
185
#				Async send the image, sometimes it takes too long, so we need to create a potential grace of max 3 sec for the async loop to finish.
186 1
				try:
187 1
					asyncio.run(asyncio.wait_for(self._when_img(d), 3))
188
				except asyncio.TimeoutError as e: #if it completely times out, catch the error.
189
					printE('Asyncio timeout error - %s' % (e))
190