build.rsudp.c_telegram.Telegrammer._when_img()   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 37
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 25.0063

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 37
ccs 10
cts 28
cp 0.357
rs 7.3333
c 0
b 0
f 0
cc 8
nop 2
crap 25.0063
1 1
import os, sys
2 1
import time
3 1
import rsudp.raspberryshake as rs
4 1
from rsudp import printM, printW, printE, helpers
5 1
from rsudp.test import TEST
6 1
import telegram as tg
7
8 1
class Telegrammer(rs.ConsumerThread):
9
	'''
10
	 .. versionadded:: 0.4.2
11
12
	.. |telegram| raw:: html
13
14
		<a href="https://t.me/" target="_blank">Telegram</a>
15
16
	.. |sasmex_use| raw:: html
17
18
		<a href="https://t.me/sasmex" target="_blank">Mexican Early Warning System (SASMEX)</a>
19
20
	|telegram| is a free messaging service which,
21
	among other things, is suited to quickly broadcasting automatic
22
	notifications via an API.
23
	It is used by the |sasmex_use| and PanamaIGC.
24
25
	:param str token: bot token from Telegram bot creation
26
	:param str chat_id: Telegram chat ID number that this module will post to
27
	:param bool send_images: whether or not to send images. if False, only alerts will be sent.
28
	:type extra_text: bool or str
29
	:param extra_text: Approximately 3900 additional characters to post as part of the Telegram message (Telegram message limits are 4096 characters). Longer messages will be truncated.
30
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
31
32
	'''
33 1
	def __init__(self, token, chat_id, testing=False,
34
				 q=False, send_images=False, extra_text=False,
35
				 sender='Telegram'):
36
		"""
37
		Initializing the Telegram message posting thread.
38
39
		"""
40 1
		super().__init__()
41 1
		self.queue = q
42 1
		self.sender = sender
43 1
		self.alive = True
44 1
		self.send_images = send_images
45 1
		self.token = token
46 1
		self.chat_id = chat_id
47 1
		self.testing = testing
48 1
		self.fmt = '%Y-%m-%d %H:%M:%S.%f'
49 1
		self.region = ' - region: %s' % rs.region.title() if rs.region else ''
50
51 1
		self.extra_text = helpers.resolve_extra_text(extra_text, max_len=4096, sender=self.sender)
52
53 1
		self.auth()
54
55 1
		self.livelink = u'live feed ➡️ https://stationview.raspberryshake.org/#?net=%s&sta=%s' % (rs.net, rs.stn)
56 1
		self.message0 = '(Raspberry Shake station %s.%s%s) Event detected at' % (rs.net, rs.stn, self.region)
57 1
		self.last_message = False
58
59 1
		printM('Starting.', self.sender)
60
61
62 1
	def auth(self):
63 1
		if not self.testing:
64
			self.telegram = tg.Bot(token=self.token)
65
		else:
66 1
			printW('The Telegram module will not post to Telegram in Testing mode.',
67
					self.sender, announce=False)
68
69
70 1
	def getq(self):
71 1
		d = self.queue.get()
72 1
		self.queue.task_done()
73
74 1
		if 'TERM' in str(d):
75 1
			self.alive = False
76 1
			printM('Exiting.', self.sender)
77 1
			sys.exit()
78
		else:
79 1
			return d
80
81
82 1
	def _when_alarm(self, d):
83
		'''
84
		Send a telegram in an alert scenario.
85
86
		:param bytes d: queue message
87
		'''
88 1
		event_time = helpers.fsec(helpers.get_msg_time(d))
89 1
		self.last_event_str = '%s' % (event_time.strftime(self.fmt)[:22])
90 1
		message = '%s %s UTC%s - %s' % (self.message0, self.last_event_str, self.extra_text, self.livelink)
91 1
		response = None
92 1
		try:
93 1
			printM('Sending alert...', sender=self.sender)
94 1
			printM('Telegram message: %s' % (message), sender=self.sender)
95 1
			if not self.testing:
96
				response = self.telegram.sendMessage(chat_id=self.chat_id, text=message)
97
			else:
98 1
				TEST['c_telegram'][1] = True
99
100
		except Exception as e:
101
			printE('Could not send alert - %s' % (e))
102
			try:
103
				printE('Waiting 5 seconds and trying to send again...', sender=self.sender, spaces=True)
104
				time.sleep(5)
105
				self.auth()
106
				printM('Telegram message: %s' % (message), sender=self.sender)
107
				if not self.testing:
108
					response = self.telegram.sendMessage(chat_id=self.chat_id, text=message)
109
				else:
110
					# if you are here in testing mode, there is a problem
111
					TEST['c_telegram'][1] = False
112
			except Exception as e:
113
				printE('Could not send alert - %s' % (e))
114
				response = None
115 1
		self.last_message = message
116
117
118 1
	def _when_img(self, d):
119
		'''
120
		Send a telegram image in when you get an ``IMGPATH`` message.
121
122
		:param bytes d: queue message
123
		'''
124 1
		if self.send_images:
125 1
			imgpath = helpers.get_msg_path(d)
126 1
			response = None
127 1
			if os.path.exists(imgpath):
128 1
				with open(imgpath, 'rb') as image:
129 1
					try:
130 1
						if not self.testing:
131
							printM('Uploading image to Telegram %s' % (imgpath), self.sender)
132
							response = self.telegram.sendPhoto(chat_id=self.chat_id, photo=image)
133
							printM('Sent image', sender=self.sender)
134
						else:
135 1
							printM('Image ready to send - %s' % (imgpath), self.sender)
136 1
							TEST['c_telegramimg'][1] = True
137
					except Exception as e:
138
						printE('Could not send image - %s' % (e))
139
						try:
140
							if not self.testing:
141
								printM('Waiting 5 seconds and trying to send again...', sender=self.sender)
142
								time.sleep(5.1)
143
								self.auth()
144
								printM('Uploading image to Telegram (2nd try) %s' % (imgpath), self.sender)
145
								response = self.telegram.sendPhoto(chat_id=self.chat_id, photo=image)
146
								printM('Sent image', sender=self.sender)
147
							else:
148
								# if you are here in testing mode, there is a problem
149
								TEST['c_telegramimg'][1] = False
150
						except Exception as e:
151
							printE('Could not send image - %s' % (e))
152
							response = None
153
			else:
154
				printM('Could not find image: %s' % (imgpath), sender=self.sender)
155
156
157 1 View Code Duplication
	def run(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
158
		"""
159
		Reads data from the queue and sends a message if it sees an ALARM or IMGPATH message
160
		"""
161 1
		while True:
162 1
			d = self.getq()
163
164 1
			if 'ALARM' in str(d):
165 1
				self._when_alarm(d)
166
167 1
			elif 'IMGPATH' in str(d):
168
				self._when_img(d)
169