1
|
|
|
#!/usr/bin/python |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
# get_image func courtesy of Slko |
4
|
|
|
from wand.image import Image |
5
|
|
|
import requests |
6
|
|
|
import os.path |
7
|
|
|
import io |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
def extract_url(entity, text): |
11
|
|
|
if entity["type"] == "text_link": |
12
|
|
|
return entity["url"] |
13
|
|
|
elif entity["type"] == "url": |
14
|
|
|
offset = entity["offset"] |
15
|
|
|
length = entity["length"] |
16
|
|
|
return text[offset:offset+length] |
17
|
|
|
else: |
18
|
|
|
return None |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
def is_image(path): |
22
|
|
|
ext = None |
23
|
|
|
image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".svg", ".tif", ".bmp", ".mp4") |
24
|
|
|
if path is None: |
25
|
|
|
return False |
26
|
|
|
for i in image_extensions: |
27
|
|
|
if path.casefold().endswith(i): |
28
|
|
|
ext = i |
29
|
|
|
return ext |
30
|
|
|
if ext == None: |
31
|
|
|
ext = ".mp4" |
32
|
|
|
return ext |
33
|
|
|
return False |
34
|
|
|
|
35
|
|
|
|
36
|
|
|
def get_image(bot, update, dl_path, filename): |
37
|
|
|
output = os.path.join(dl_path, filename) |
38
|
|
|
reply = update.message.reply_to_message |
39
|
|
|
if reply is None: |
40
|
|
|
extension = ".jpg" |
41
|
|
|
bot.getFile(update.message.photo[-1].file_id).download(output + extension) |
42
|
|
|
return extension |
43
|
|
|
# Entities; url, text_link |
44
|
|
|
if reply.entities is not None: |
45
|
|
|
urls = (extract_url(x, reply.text) for x in reply.entities) |
|
|
|
|
46
|
|
|
images = [x for x in urls if is_image(x)] |
47
|
|
|
if len(images) > 0: |
48
|
|
|
extension = is_image(images[0]) |
49
|
|
|
r = requests.get(images[0]) # use only first image url |
50
|
|
|
with open(output+extension, "wb") as f: |
51
|
|
|
f.write(r.content) |
52
|
|
|
return extension |
53
|
|
|
# Document |
54
|
|
|
if reply.document is not None and is_image(reply.document.file_name): |
55
|
|
|
extension = is_image(reply.document.file_name) |
56
|
|
|
bot.getFile(reply.document.file_id).download(output + extension) |
57
|
|
|
return extension |
58
|
|
|
# Sticker |
59
|
|
|
if reply.sticker is not None: |
60
|
|
|
extension = ".webp" |
61
|
|
|
bot.getFile(reply.sticker.file_id).download(output+extension) |
62
|
|
|
return extension |
63
|
|
|
# Photo in reply |
64
|
|
|
if reply.photo is not None: |
65
|
|
|
extension = ".jpg" |
66
|
|
|
bot.getFile(reply.photo[-1].file_id).download(output + extension) |
67
|
|
|
return extension |
68
|
|
|
return False |
69
|
|
|
|
70
|
|
|
|
71
|
|
|
def send_image(update, filepath, name, extension): |
72
|
|
|
photo_extensions = (".jpg", ".jpeg") |
73
|
|
|
doc_extensions = (".png", ".svg", ".tif", ".bmp", ".gif", ".mp4") |
74
|
|
|
sticker_extension = ".webp" |
75
|
|
|
for i in photo_extensions: |
76
|
|
|
if extension.endswith(i): |
77
|
|
|
with open(filepath + name + extension, "rb") as f: |
78
|
|
|
update.message.reply_photo(f) |
79
|
|
|
return True |
80
|
|
|
for i in doc_extensions: |
81
|
|
|
if extension.endswith(i): |
82
|
|
|
with open(filepath + name + extension, "rb") as f: |
83
|
|
|
update.message.reply_document(f) |
84
|
|
|
return True |
85
|
|
|
if extension.endswith(sticker_extension): |
86
|
|
|
with open(filepath + name + extension, "rb") as f: |
87
|
|
|
update.message.reply_sticker(f) |
88
|
|
|
return True |
89
|
|
|
|
90
|
|
|
|
91
|
|
|
def get_image_new(bot, update): |
92
|
|
|
reply = update.message.reply_to_message |
93
|
|
|
if reply is None: |
94
|
|
|
file = bot.getFile(update.message.photo[-1].file_id).download_as_bytearray() |
95
|
|
|
mimetype = Image(blob=bytes(file)).mimetype |
96
|
|
|
file = bytes(file) |
97
|
|
|
return file, mimetype |
98
|
|
|
# Entities; url, text_link |
99
|
|
|
if reply.entities is not None: |
100
|
|
|
urls = (extract_url(x, reply.text) for x in reply.entities) |
|
|
|
|
101
|
|
|
images = [x for x in urls if is_image(x)] |
102
|
|
|
if len(images) > 0: |
103
|
|
|
extension = is_image(images[0]) |
104
|
|
|
r = requests.get(images[0]) # use only first image url |
105
|
|
|
with open(output+extension, "wb") as f: |
|
|
|
|
106
|
|
|
f.write(r.content) |
107
|
|
|
return extension |
108
|
|
|
# Document |
109
|
|
|
if reply.document is not None and is_image(reply.document.file_name): |
110
|
|
|
file = bot.getFile(reply.document.file_id).download_as_bytearray() |
111
|
|
|
mimetype = Image(blob=bytes(file)).mimetype |
112
|
|
|
file = bytes(file) |
113
|
|
|
return file, mimetype |
114
|
|
|
# Sticker |
115
|
|
|
if reply.sticker is not None: |
116
|
|
|
file = bot.getFile(reply.sticker.file_id).download_as_bytearray() |
117
|
|
|
mimetype = Image(blob=bytes(file)).mimetype |
118
|
|
|
file = bytes(file) |
119
|
|
|
return file, mimetype |
120
|
|
|
# Photo in reply |
121
|
|
|
if reply.photo is not None: |
122
|
|
|
file = bot.getFile(reply.photo[-1].file_id).download_as_bytearray() |
123
|
|
|
mimetype = Image(blob=bytes(file)).mimetype |
124
|
|
|
file = bytes(file) |
125
|
|
|
return file, mimetype |
126
|
|
|
return False |
127
|
|
|
|
128
|
|
|
|
129
|
|
|
def send_image_new(update, file, mimetype): |
130
|
|
|
photo_mimetypes = ["image/jpeg"] |
131
|
|
|
doc_extensions = ["image/png", "image/svg", "image/tif", "image/bmp", "video/mp4"] |
132
|
|
|
sticker_mimetypes = "image/x-webp" |
133
|
|
|
for i in photo_mimetypes: |
134
|
|
|
if mimetype.startswith(i): |
135
|
|
|
update.message.reply_photo(file, timeout=60) |
136
|
|
|
return True |
137
|
|
|
for i in doc_extensions: |
138
|
|
|
if mimetype.startswith(i): |
139
|
|
|
update.message.reply_document(file, timeout=60) |
140
|
|
|
return True |
141
|
|
|
if mimetype.startswith(sticker_mimetypes): |
142
|
|
|
update.message.reply_sticker(file, timeout=60) |
143
|
|
|
return True |
144
|
|
|
|
145
|
|
|
|
146
|
|
|
def get_param(update, defaultvalue, min_value, max_value): |
147
|
|
|
if update.message.reply_to_message is not None: |
148
|
|
|
parts = update.message.text.split(" ", 1) |
149
|
|
|
elif update.message.caption is not None: |
150
|
|
|
parts = update.message.caption.split(" ", 1) |
151
|
|
|
elif update.message.text is not None: |
152
|
|
|
parts = update.message.text.split(" ", 2) |
153
|
|
|
else: |
154
|
|
|
return defaultvalue |
155
|
|
|
if len(parts) == 1: |
156
|
|
|
parameter = defaultvalue |
157
|
|
|
else: |
158
|
|
|
try: |
159
|
|
|
parameter = int(parts[1]) |
160
|
|
|
except: |
161
|
|
|
#update.message.reply_text("Paremeter needs to be a number!") |
162
|
|
|
return defaultvalue |
163
|
|
|
if parameter < min_value or parameter > max_value: |
164
|
|
|
errtext = "Baka, make it from " + str(min_value) + " to " + str(max_value) + "!" |
165
|
|
|
update.message.reply_text(errtext) |
166
|
|
|
return None |
167
|
|
|
return parameter |
168
|
|
|
|
169
|
|
|
|
170
|
|
|
# custom filters for message handler |
171
|
|
|
# photo with caption |
172
|
|
|
def caption_filter(text): |
173
|
|
|
return lambda msg: bool(msg.photo) and bool(msg.caption) and msg.caption.startswith(text) |
174
|
|
|
|
175
|
|
|
|
176
|
|
|
# text of choice |
177
|
|
|
def text_filter(text): |
178
|
|
|
return lambda msg: bool(text in msg.text) |
179
|
|
|
|